Skip to content

Commit

Permalink
perf(server): speed up delayed fetch (#633)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Jan 4, 2024
1 parent 0396ffd commit 3fcd307
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 122 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class DelayedFetch(
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
replicaManager: ReplicaManager,
quota: ReplicaQuota,
// AutoMQ for Kafka inject start
limiter: Limiter = NoopLimiter.INSTANCE,
// AutoMQ for Kafka inject end
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
) extends DelayedOperation(params.maxWaitMs) {

Expand Down Expand Up @@ -168,7 +171,8 @@ class DelayedFetch(
params,
fetchInfos,
quota,
readFromPurgatory = true
readFromPurgatory = true,
limiter = limiter,
)
ReadHint.clear()
// AutoMQ for Kafka inject end
Expand All @@ -189,4 +193,3 @@ object DelayedFetchMetrics extends KafkaMetricsGroup {
val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))
}

91 changes: 91 additions & 0 deletions core/src/main/scala/kafka/server/FairLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* A fair limiter whose {@link #acquire} method is fair, i.e. the waiting threads are served in the order of arrival.
*/
public class FairLimiter implements Limiter {
private final int maxPermits;
/**
* The lock used to protect @{link #acquireLocked}
*/
private final Lock lock = new ReentrantLock(true);
private final Semaphore permits;

public FairLimiter(int size) {
maxPermits = size;
permits = new Semaphore(size);
}

@Override
public Handler acquire(int permit) throws InterruptedException {
lock.lock();
try {
permits.acquire(permit);
return new FairHandler(permit);
} finally {
lock.unlock();
}
}

@Override
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
long start = System.nanoTime();
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
// calculate the time left for {@code acquireLocked}
long elapsed = System.nanoTime() - start;
long left = TimeUnit.MILLISECONDS.toNanos(timeoutMs) - elapsed;
// note: {@code left} may be negative here, but it's OK for acquireLocked
return acquireLocked(permit, left);
} finally {
lock.unlock();
}
} else {
// tryLock timeout, return null
return null;
}
}

private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException {
if (permit > maxPermits) {
permit = maxPermits;
}
boolean acquired = permits.tryAcquire(permit, timeoutNs, TimeUnit.NANOSECONDS);
return acquired ? new FairHandler(permit) : null;
}

public class FairHandler implements Handler {
private final int permit;

public FairHandler(int permit) {
this.permit = permit;
}

@Override
public void close() {
permits.release(permit);
}
}
}
49 changes: 49 additions & 0 deletions core/src/main/scala/kafka/server/Limiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

/**
* A limiter that limits the number of permits that can be acquired at a time.
*/
public interface Limiter {

/**
* Acquire permits, if not enough, block until enough.
*
* @param permit the number of permits to acquire, should not be negative
* @return a handler to release the permits, never null. The handler should be closed after use.
* @throws InterruptedException if interrupted while waiting
*/
Handler acquire(int permit) throws InterruptedException;

/**
* Acquire permits, if not enough, block until enough or timeout.
*
* @param permit the number of permits to acquire, should not be negative
* @param timeoutMs the maximum time to wait for the permits, in milliseconds. A non-positive value means not to wait.
* @return a handler to release the permits or null if timeout. If not null, the handler should be closed after use.
* @throws InterruptedException if interrupted while waiting
*/
Handler acquire(int permit, long timeoutMs) throws InterruptedException;

/**
* A handler to release acquired permits.
*/
interface Handler extends AutoCloseable {
}
}
58 changes: 0 additions & 58 deletions core/src/main/scala/kafka/server/MemoryLimiter.java

This file was deleted.

42 changes: 42 additions & 0 deletions core/src/main/scala/kafka/server/NoopLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

/**
* A limiter that does nothing.
*/
public class NoopLimiter implements Limiter {

public static final NoopLimiter INSTANCE = new NoopLimiter();

@Override
public Handler acquire(int permit) throws InterruptedException {
return new NoopHandler();
}

@Override
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
return new NoopHandler();
}

public static class NoopHandler implements Handler {
@Override
public void close() {
}
}
}
Loading

0 comments on commit 3fcd307

Please sign in to comment.