diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java index fdd869594f..d6ad1edc6c 100644 --- a/core/src/main/scala/kafka/server/FairLimiter.java +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -62,6 +62,11 @@ public Handler acquire(int permit, long timeoutMs) throws InterruptedException { } } + @Override + public int maxPermits() { + return maxPermits; + } + @Override public int availablePermits() { return permits.availablePermits(); diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java index 055efd6739..3e3511de77 100644 --- a/core/src/main/scala/kafka/server/Limiter.java +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -35,6 +35,13 @@ public interface Limiter { */ Handler acquire(int permit, long timeoutMs) throws InterruptedException; + /** + * Return the maximum number of permits that can be acquired at a time. + * + * @return the maximum number of permits that can be acquired at a time + */ + int maxPermits(); + /** * Return the number of permits available. * diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java index d850f5e7a7..1fab234492 100644 --- a/core/src/main/scala/kafka/server/NoopLimiter.java +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -28,6 +28,11 @@ public Handler acquire(int permit, long timeoutMs) throws InterruptedException { return new NoopHandler(); } + @Override + public int maxPermits() { + return Integer.MAX_VALUE; + } + @Override public int availablePermits() { return Integer.MAX_VALUE; diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index c8aaceb1b2..78e91bdaae 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -499,6 +499,15 @@ class ElasticReplicaManager( responseCallback(partitionToFetchPartitionData) } } else { + // release records before delay fetch + logReadResults.foreach { case (_, logReadResult) => + logReadResult.info.records match { + case r: PooledResource => + r.release() + case _ => + } + } + // If there is not enough data to respond and there is no remote data, we will let the fetch request // wait for new data. val delayedFetch = new DelayedFetch( @@ -536,7 +545,10 @@ class ElasticReplicaManager( def bytesNeed(): Int = { // sum the sizes of topics to fetch from fetchInfos val bytesNeed = readPartitionInfo.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes } - if (bytesNeed <= 0) params.maxBytes else math.min(bytesNeed, params.maxBytes) + val bytesNeedFromParam = if (bytesNeed <= 0) params.maxBytes else math.min(bytesNeed, params.maxBytes) + + // limit the bytes need to half of the maximum permits + math.min(bytesNeedFromParam, limiter.maxPermits()) } val handler: Handler = timeoutMs match {