From 3fcd307de8c6a3d22b57f7bffd09d216e95d8e3e Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 4 Jan 2024 09:46:08 +0800 Subject: [PATCH] perf(server): speed up delayed fetch (#633) Signed-off-by: Ning Yu --- .../scala/kafka/server/DelayedFetch.scala | 7 +- .../main/scala/kafka/server/FairLimiter.java | 91 ++++++++++++ core/src/main/scala/kafka/server/Limiter.java | 49 +++++++ .../scala/kafka/server/MemoryLimiter.java | 58 -------- .../main/scala/kafka/server/NoopLimiter.java | 42 ++++++ .../scala/kafka/server/ReplicaManager.scala | 132 ++++++++++-------- 6 files changed, 257 insertions(+), 122 deletions(-) create mode 100644 core/src/main/scala/kafka/server/FairLimiter.java create mode 100644 core/src/main/scala/kafka/server/Limiter.java delete mode 100644 core/src/main/scala/kafka/server/MemoryLimiter.java create mode 100644 core/src/main/scala/kafka/server/NoopLimiter.java diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index ac00975d20..53259ef70a 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -46,6 +46,9 @@ class DelayedFetch( fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], replicaManager: ReplicaManager, quota: ReplicaQuota, + // AutoMQ for Kafka inject start + limiter: Limiter = NoopLimiter.INSTANCE, + // AutoMQ for Kafka inject end responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ) extends DelayedOperation(params.maxWaitMs) { @@ -168,7 +171,8 @@ class DelayedFetch( params, fetchInfos, quota, - readFromPurgatory = true + readFromPurgatory = true, + limiter = limiter, ) ReadHint.clear() // AutoMQ for Kafka inject end @@ -189,4 +193,3 @@ object DelayedFetchMetrics extends KafkaMetricsGroup { val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower")) val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer")) } - diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java new file mode 100644 index 0000000000..75ae74bf9e --- /dev/null +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A fair limiter whose {@link #acquire} method is fair, i.e. the waiting threads are served in the order of arrival. + */ +public class FairLimiter implements Limiter { + private final int maxPermits; + /** + * The lock used to protect @{link #acquireLocked} + */ + private final Lock lock = new ReentrantLock(true); + private final Semaphore permits; + + public FairLimiter(int size) { + maxPermits = size; + permits = new Semaphore(size); + } + + @Override + public Handler acquire(int permit) throws InterruptedException { + lock.lock(); + try { + permits.acquire(permit); + return new FairHandler(permit); + } finally { + lock.unlock(); + } + } + + @Override + public Handler acquire(int permit, long timeoutMs) throws InterruptedException { + long start = System.nanoTime(); + if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { + try { + // calculate the time left for {@code acquireLocked} + long elapsed = System.nanoTime() - start; + long left = TimeUnit.MILLISECONDS.toNanos(timeoutMs) - elapsed; + // note: {@code left} may be negative here, but it's OK for acquireLocked + return acquireLocked(permit, left); + } finally { + lock.unlock(); + } + } else { + // tryLock timeout, return null + return null; + } + } + + private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException { + if (permit > maxPermits) { + permit = maxPermits; + } + boolean acquired = permits.tryAcquire(permit, timeoutNs, TimeUnit.NANOSECONDS); + return acquired ? new FairHandler(permit) : null; + } + + public class FairHandler implements Handler { + private final int permit; + + public FairHandler(int permit) { + this.permit = permit; + } + + @Override + public void close() { + permits.release(permit); + } + } +} diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java new file mode 100644 index 0000000000..86e9c71f7d --- /dev/null +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +/** + * A limiter that limits the number of permits that can be acquired at a time. + */ +public interface Limiter { + + /** + * Acquire permits, if not enough, block until enough. + * + * @param permit the number of permits to acquire, should not be negative + * @return a handler to release the permits, never null. The handler should be closed after use. + * @throws InterruptedException if interrupted while waiting + */ + Handler acquire(int permit) throws InterruptedException; + + /** + * Acquire permits, if not enough, block until enough or timeout. + * + * @param permit the number of permits to acquire, should not be negative + * @param timeoutMs the maximum time to wait for the permits, in milliseconds. A non-positive value means not to wait. + * @return a handler to release the permits or null if timeout. If not null, the handler should be closed after use. + * @throws InterruptedException if interrupted while waiting + */ + Handler acquire(int permit, long timeoutMs) throws InterruptedException; + + /** + * A handler to release acquired permits. + */ + interface Handler extends AutoCloseable { + } +} diff --git a/core/src/main/scala/kafka/server/MemoryLimiter.java b/core/src/main/scala/kafka/server/MemoryLimiter.java deleted file mode 100644 index 41ab954683..0000000000 --- a/core/src/main/scala/kafka/server/MemoryLimiter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server; - -import java.util.concurrent.Semaphore; - -public class MemoryLimiter { - private final int maxPermits; - private final Semaphore permits; - - public MemoryLimiter(int size) { - maxPermits = size; - permits = new Semaphore(size); - } - - /** - * Acquire permits, if not enough, block until enough. - * Note: the acquire is fair, the acquired will be permitted in the acquire order. - */ - public synchronized Handler acquire(int permit) throws InterruptedException { - if (permit > maxPermits) { - permit = maxPermits; - } - boolean acquireRst = permits.tryAcquire(permit); - if (!acquireRst) { - permits.acquire(permit); - } - return new Handler(permit); - } - - public class Handler implements AutoCloseable { - private final int permit; - - public Handler(int permit) { - this.permit = permit; - } - - @Override - public void close() { - permits.release(permit); - } - } -} diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java new file mode 100644 index 0000000000..0cc8763783 --- /dev/null +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +/** + * A limiter that does nothing. + */ +public class NoopLimiter implements Limiter { + + public static final NoopLimiter INSTANCE = new NoopLimiter(); + + @Override + public Handler acquire(int permit) throws InterruptedException { + return new NoopHandler(); + } + + @Override + public Handler acquire(int permit, long timeoutMs) throws InterruptedException { + return new NoopHandler(); + } + + public static class NoopHandler implements Handler { + @Override + public void close() { + } + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 510f707578..e75b233451 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -27,7 +27,9 @@ import kafka.log._ import kafka.log.streamaspect.{ElasticLogManager, ReadHint} import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online +import kafka.server.Limiter.Handler import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.ReplicaManager.createLogReadResult import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.server.metadata.ZkMetadataCache import kafka.utils.Implicits._ @@ -71,6 +73,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.util.Using /* * Result metadata of a log append operation on the log @@ -187,6 +190,20 @@ object HostedPartition { object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" + + // AutoMQ for Kafka inject start + def createLogReadResult(e: Throwable): LogReadResult = { + LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + divergingEpoch = None, + highWatermark = UnifiedLog.UnknownOffset, + leaderLogStartOffset = UnifiedLog.UnknownOffset, + leaderLogEndOffset = UnifiedLog.UnknownOffset, + followerLogStartOffset = UnifiedLog.UnknownOffset, + fetchTimeMs = -1L, + lastStableOffset = None, + exception = Option(e)) + } + // AutoMQ for Kafka inject end } class ReplicaManager(val config: KafkaConfig, @@ -251,8 +268,8 @@ class ReplicaManager(val config: KafkaConfig, val fastFetchExecutor = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)) val slowFetchExecutor = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)) - val fastFetchLimiter = new MemoryLimiter(100 * 1024 * 1024) - val slowFetchLimiter = new MemoryLimiter(100 * 1024 * 1024) + val fastFetchLimiter = new FairLimiter(100 * 1024 * 1024) // 100MiB + val slowFetchLimiter = new FairLimiter(100 * 1024 * 1024) // 100MiB private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork(): Unit = { @@ -1070,18 +1087,8 @@ class ReplicaManager(val config: KafkaConfig, ): Unit = { def responseEmpty(e: Throwable): Unit = { - val error = if (e == null) { Errors.NONE } else { Errors.forException(e) } val fetchPartitionData = fetchInfos.map { case (tp, _) => - tp -> FetchPartitionData( - error = error, - highWatermark = -1L, - lastStableOffset = None, - logStartOffset = -1L, - abortedTransactions = None, - preferredReadReplica = None, - records = MemoryRecords.EMPTY, - isReassignmentFetch = false, - divergingEpoch = None) + tp -> createLogReadResult(e).toFetchPartitionData(false) } responseCallback(fetchPartitionData) } @@ -1092,72 +1099,27 @@ class ReplicaManager(val config: KafkaConfig, responseEmpty(e) } - val start = System.nanoTime() - - def checkMaxWaitMs(): Boolean = { - if (params.maxWaitMs <= 0) { - // If the max wait time is 0, then no need to check quota or linger. - true - } else { - val waitedTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) - if (waitedTimeMs < params.maxWaitMs) { - return true - } - warn(s"Returning emtpy fetch response for fetch request ${fetchInfos} since the " + - s"wait time ${waitedTimeMs} exceed ${params.maxWaitMs} ms.") - responseEmpty(null) - false - } - } - - // sum the sizes of topics to fetch from fetchInfos - var bytesNeed = fetchInfos.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes } - bytesNeed = math.min(bytesNeed, params.maxBytes) - // The fetching is done is a separate thread pool to avoid blocking io thread. fastFetchExecutor.submit(new Runnable { override def run(): Unit = { - val fastFetchLimiterHandler = fastFetchLimiter.acquire(bytesNeed) - if (!checkMaxWaitMs()) { - fastFetchLimiterHandler.close() - return - } try { ReadHint.markReadAll() ReadHint.markFastRead() - fetchMessages0(params, fetchInfos, quota, response => { - try { - responseCallback.apply(response) - } finally { - fastFetchLimiterHandler.close() - } - }) + // no timeout for fast read + fetchMessages0(params, fetchInfos, quota, fastFetchLimiter, 0, responseCallback) ReadHint.clear() } catch { case e: Throwable => - fastFetchLimiterHandler.close() val ex = FutureUtil.cause(e) val fastReadFailFast = ex.isInstanceOf[FastReadFailFastException] if (fastReadFailFast) { slowFetchExecutor.submit(new Runnable { override def run(): Unit = { - val slowFetchLimiterHandler = slowFetchLimiter.acquire(bytesNeed) - if (!checkMaxWaitMs()) { - slowFetchLimiterHandler.close() - return - } try { ReadHint.markReadAll() - fetchMessages0(params, fetchInfos, quota, response => { - try { - responseCallback.apply(response) - } finally { - slowFetchLimiterHandler.close() - } - }) + fetchMessages0(params, fetchInfos, quota, slowFetchLimiter, params.maxWaitMs, responseCallback) } catch { case slowEx: Throwable => - slowFetchLimiterHandler.close() handleError(slowEx) } } @@ -1174,11 +1136,13 @@ class ReplicaManager(val config: KafkaConfig, params: FetchParams, fetchInfos: Seq[(TopicIdPartition, PartitionData)], quota: ReplicaQuota, + limiter: Limiter, + timeoutMs: Long, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { // check if this fetch request can be satisfied right away // AutoMQ for Kafka inject start - val logReadResults = readFromLocalLogV2(params, fetchInfos, quota, readFromPurgatory = false) + val logReadResults = readFromLocalLogV2(params, fetchInfos, quota, readFromPurgatory = false, limiter, timeoutMs) // AutoMQ for Kafka inject end var bytesReadable: Long = 0 var errorReadingData = false @@ -1227,6 +1191,8 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus = fetchPartitionStatus, replicaManager = this, quota = quota, + // always use the fast fetch limiter in delayed fetch operations + limiter = fastFetchLimiter, responseCallback = responseCallback ) @@ -1240,6 +1206,48 @@ class ReplicaManager(val config: KafkaConfig, } } + /** + * A Wrapper of [[readFromLocalLogV2]] which acquire memory permits from limiter. + * It has the same behavior as [[readFromLocalLogV2]] using the default [[NoopLimiter]]. + * A non-positive `timeoutMs` means no timeout. + */ + def readFromLocalLogV2( + params: FetchParams, + readPartitionInfo: Seq[(TopicIdPartition, PartitionData)], + quota: ReplicaQuota, + readFromPurgatory: Boolean, + limiter: Limiter = NoopLimiter.INSTANCE, + timeoutMs: Long = 0): Seq[(TopicIdPartition, LogReadResult)] = { + + def bytesNeed(): Int = { + // sum the sizes of topics to fetch from fetchInfos + val bytesNeed = readPartitionInfo.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes } + math.min(bytesNeed, params.maxBytes) + } + + def emptyResult(): Seq[(TopicIdPartition, LogReadResult)] = { + readPartitionInfo.map { case (tp, _) => + tp -> createLogReadResult(null) + } + } + + val handler: Handler = timeoutMs match { + case t if t > 0 => limiter.acquire(bytesNeed(), t) + case _ => limiter.acquire(bytesNeed()) + } + + if (handler == null) { + // handler maybe null if it timed out to acquire from limiter + // TODO add metrics for this + // warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.") + emptyResult() + } else { + Using.resource(handler) { _ => + readFromLocalLogV2(params, readPartitionInfo, quota, readFromPurgatory) + } + } + } + /** * Parallel read from multiple topic partitions at the given offset up to maxSize bytes */