diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index b057741fe7..c6d8d19371 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -29,6 +29,7 @@ import com.automq.stream.s3.failover.ForceCloseStorageFailureHandler; import com.automq.stream.s3.failover.HaltStorageFailureHandler; import com.automq.stream.s3.failover.StorageFailureHandlerChain; +import com.automq.stream.s3.failover.StorageHandlerChain; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.objects.ObjectManager; @@ -125,12 +126,12 @@ public void start() { this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory); this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage); this.writeAheadLog = buildWAL(); - StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain(); - this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler); + StorageHandlerChain storageHandlerChain = new StorageFailureHandlerChain(); + this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageHandlerChain); // stream object compactions share the same object storage with stream set object compactions this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionobjectStorage, this.config, networkInboundLimiter, networkOutboundLimiter); - storageFailureHandler.addHandler(new ForceCloseStorageFailureHandler(streamClient)); - storageFailureHandler.addHandler(new HaltStorageFailureHandler()); + storageHandlerChain.addHandler(new ForceCloseStorageFailureHandler(streamClient)); + storageHandlerChain.addHandler(new HaltStorageFailureHandler()); this.streamClient.registerStreamLifeCycleListener(localIndexCache); this.kvClient = new ControllerKVClient(this.requestSender); this.failover = failover(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 76373b37dd..fa76583cd4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -19,7 +19,7 @@ import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; import com.automq.stream.s3.failover.Failover; -import com.automq.stream.s3.failover.StorageFailureHandler; +import com.automq.stream.s3.failover.StorageHandlerChain; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; @@ -112,7 +112,7 @@ public class S3Storage implements Storage { private final ObjectManager objectManager; private final ObjectStorage objectStorage; private final S3BlockCache blockCache; - private final StorageFailureHandler storageFailureHandler; + private final StorageHandlerChain storageHandlerChain; /** * Stream callback locks. Used to ensure the stream callbacks will not be called concurrently. * @@ -128,7 +128,7 @@ public class S3Storage implements Storage { @SuppressWarnings("this-escape") public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager, - S3BlockCache blockCache, ObjectStorage objectStorage, StorageFailureHandler storageFailureHandler) { + S3BlockCache blockCache, ObjectStorage objectStorage, StorageHandlerChain storageHandlerChain) { this.config = config; this.maxDeltaWALCacheSize = config.walCacheSize(); this.deltaWAL = deltaWAL; @@ -137,7 +137,7 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana this.streamManager = streamManager; this.objectManager = objectManager; this.objectStorage = objectStorage; - this.storageFailureHandler = storageFailureHandler; + this.storageHandlerChain = storageHandlerChain; this.drainBackoffTask = this.backgroundExecutor.scheduleWithFixedDelay(this::tryDrainBackoffRecords, 100, 100, TimeUnit.MILLISECONDS); S3StreamMetricsManager.registerInflightWALUploadTasksCountSupplier(this.inflightWALUploadTasks::size); S3StreamMetricsManager.registerDeltaWalPendingUploadBytesSupplier(this.pendingUploadBytes::get); @@ -468,7 +468,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f appendResult.future().whenComplete((nil, ex) -> { if (ex != null) { LOGGER.error("append WAL fail, request {}", request, ex); - storageFailureHandler.handle(ex); + storageHandlerChain.handle(ex); return; } handleAppendCallback(request); diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java index 1c1798cbf4..e712517255 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java +++ b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java @@ -16,7 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StorageFailureHandlerChain implements StorageFailureHandler { +public class StorageFailureHandlerChain implements StorageHandlerChain { private static final Logger LOGGER = LoggerFactory.getLogger(StorageFailureHandlerChain.class); private final List handlers = new ArrayList<>(); @@ -31,6 +31,7 @@ public void handle(Throwable ex) { } } + @Override public void addHandler(StorageFailureHandler handler) { handlers.add(handler); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java new file mode 100644 index 0000000000..1a2f4bf697 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java @@ -0,0 +1,16 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.failover; + +public interface StorageHandlerChain extends StorageFailureHandler { + void addHandler(StorageFailureHandler handler); +} diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java index b3ae931517..c00d6eac6f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java @@ -15,7 +15,7 @@ import com.automq.stream.s3.cache.ReadDataBlock; import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; import com.automq.stream.s3.cache.blockcache.StreamReaders; -import com.automq.stream.s3.failover.StorageFailureHandler; +import com.automq.stream.s3.failover.StorageHandlerChain; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamState; import com.automq.stream.s3.model.StreamRecordBatch; @@ -81,7 +81,7 @@ public void setup() { objectStorage = new MemoryObjectStorage(); storage = new S3Storage(config, wal, streamManager, objectManager, new StreamReaders(config.blockCacheSize(), objectManager, objectStorage, - new DefaultObjectReaderFactory(objectStorage)), objectStorage, mock(StorageFailureHandler.class)); + new DefaultObjectReaderFactory(objectStorage)), objectStorage, mock(StorageHandlerChain.class)); } @Test