Skip to content

Commit

Permalink
fix(core): release all records before delayed fetch (#2009)
Browse files Browse the repository at this point in the history
* fix(core): release all records before delayed fetch

Signed-off-by: SSpirits <[email protected]>

* fix(core): release all records before delayed fetch

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Sep 10, 2024
1 parent 2046e12 commit d5956cf
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 1 deletion.
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/FairLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
}
}

@Override
public int maxPermits() {
return maxPermits;
}

@Override
public int availablePermits() {
return permits.availablePermits();
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/Limiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface Limiter {
*/
Handler acquire(int permit, long timeoutMs) throws InterruptedException;

/**
* Return the maximum number of permits that can be acquired at a time.
*
* @return the maximum number of permits that can be acquired at a time
*/
int maxPermits();

/**
* Return the number of permits available.
*
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/NoopLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
return new NoopHandler();
}

@Override
public int maxPermits() {
return Integer.MAX_VALUE;
}

@Override
public int availablePermits() {
return Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,15 @@ class ElasticReplicaManager(
responseCallback(partitionToFetchPartitionData)
}
} else {
// release records before delay fetch
logReadResults.foreach { case (_, logReadResult) =>
logReadResult.info.records match {
case r: PooledResource =>
r.release()
case _ =>
}
}

// If there is not enough data to respond and there is no remote data, we will let the fetch request
// wait for new data.
val delayedFetch = new DelayedFetch(
Expand Down Expand Up @@ -536,7 +545,10 @@ class ElasticReplicaManager(
def bytesNeed(): Int = {
// sum the sizes of topics to fetch from fetchInfos
val bytesNeed = readPartitionInfo.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes }
if (bytesNeed <= 0) params.maxBytes else math.min(bytesNeed, params.maxBytes)
val bytesNeedFromParam = if (bytesNeed <= 0) params.maxBytes else math.min(bytesNeed, params.maxBytes)

// limit the bytes need to half of the maximum permits
math.min(bytesNeedFromParam, limiter.maxPermits())
}

val handler: Handler = timeoutMs match {
Expand Down

0 comments on commit d5956cf

Please sign in to comment.