From 985d291868edbbabc53f57a4f09e7848473c3c79 Mon Sep 17 00:00:00 2001 From: ran Date: Wed, 12 Jun 2024 21:44:27 +0800 Subject: [PATCH] [branch-3.3] Bump pulsar 3.3.0.2 (#1261) --- .../pulsar/handlers/amqp/AmqpEntryWriter.java | 2 +- .../amqp/test/AopProtocolHandlerTestBase.java | 15 ++++++++------- pom.xml | 4 ++-- .../amqp/AmqpProtocolHandlerTestBase.java | 15 ++++++++------- .../amqp/AmqpProtocolHandlerTestBase.java | 15 ++++++++------- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpEntryWriter.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpEntryWriter.java index 44e1fbbb..3b755298 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpEntryWriter.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpEntryWriter.java @@ -97,7 +97,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { log.debug("[{}] Success to write entry with position {}.", topic.getName(), position); } topic.recordAddLatency(System.nanoTime() - context.startTimeNs, TimeUnit.MICROSECONDS); - topic.getTransactionBuffer().syncMaxReadPositionForNormalPublish((PositionImpl) position); + topic.getTransactionBuffer().syncMaxReadPositionForNormalPublish((PositionImpl) position, false); context.positionFuture.complete(position); context.recycle(); } diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AopProtocolHandlerTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AopProtocolHandlerTestBase.java index d4a0bec9..a915fba7 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AopProtocolHandlerTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AopProtocolHandlerTestBase.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -204,21 +205,21 @@ public void reallyShutdown() { private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional> ensemblePlacementPolicyClass, - Map properties) { + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map properties) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional> ensemblePlacementPolicyClass, Map properties, StatsLogger statsLogger) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override diff --git a/pom.xml b/pom.xml index 409e6f41..e931ca20 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ ${maven.compiler.target} - 3.3.0-SNAPSHOT + 3.3.0.2 8.0.0 5.8.0 @@ -386,7 +386,7 @@ ossrh - https://s01.oss.sonatype.org/service/local/repositories/iostreamnative-3095/content + https://s01.oss.sonatype.org/service/local/repositories/iostreamnative-3155/content diff --git a/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 8db75078..4592a2ce 100644 --- a/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -332,21 +333,21 @@ public void reallyShutdown() { private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional> ensemblePlacementPolicyClass, - Map properties) { + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map properties) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional> ensemblePlacementPolicyClass, Map properties, StatsLogger statsLogger) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 7fe2d035..f5fa1d85 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -359,21 +360,21 @@ public void reallyShutdown() { private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional> ensemblePlacementPolicyClass, - Map properties) { + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map properties) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional> ensemblePlacementPolicyClass, Map properties, StatsLogger statsLogger) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override