diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java index fc0f012e..8ae57982 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java @@ -15,26 +15,29 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; -import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.util.Backoff; /** @@ -64,11 +67,13 @@ protected enum State { Stopped, Starting, Started, Stopping } - private static final int defaultReadMaxSizeBytes = 5 * 1024 * 1024; private int routeQueueSize = 200; private volatile int pendingQueueSize = 0; private static final AtomicIntegerFieldUpdater PENDING_SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AmqpExchangeReplicator.class, "pendingQueueSize"); + private int readBatchSize; + private final int readMaxSizeBytes; + private final int readMaxBatchSize; private static final int FALSE = 0; private static final int TRUE = 1; @@ -89,6 +94,10 @@ protected AmqpExchangeReplicator(PersistentExchange persistentExchange, Executor this.scheduledExecutorService = topic.getBrokerService().executor(); this.initMaxRouteQueueSize(routeQueueSize); this.routeExecutor = routeExecutor; + this.readMaxBatchSize = Math.min(this.routeQueueSize, + topic.getBrokerService().getPulsar().getConfig().getDispatcherMaxReadBatchSize()); + this.readBatchSize = this.readMaxBatchSize; + this.readMaxSizeBytes = topic.getBrokerService().getPulsar().getConfig().getDispatcherMaxReadSizeBytes(); STATE_UPDATER.set(this, AmqpExchangeReplicator.State.Stopped); this.name = "[AMQP Replicator for " + topic.getName() + " ]"; } @@ -175,11 +184,16 @@ private void readMoreEntries() { } int availablePermits = getAvailablePermits(); if (availablePermits > 0) { + int messagesToRead = Math.min(availablePermits, readBatchSize); + // avoid messageToRead is 0 + messagesToRead = Math.max(messagesToRead, 1); + if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { + log.info("{} Schedule read of {} messages.", name, messagesToRead); if (log.isDebugEnabled()) { - log.debug("{} Schedule read of {} messages.", name, availablePermits); + log.debug("{} Schedule read of {} messages.", name, messagesToRead); } - cursor.asyncReadEntriesOrWait(availablePermits, defaultReadMaxSizeBytes, this, null, null); + cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, null); } else { if (log.isDebugEnabled()) { log.debug("{} Not schedule read due to pending read. Messages to read {}.", @@ -201,6 +215,23 @@ private int getAvailablePermits() { } return 0; } + + if (topic.getDispatchRateLimiter().isPresent() + && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) { + long availableOnByte = topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(); + long availableOnMsg = topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(); + if (availableOnByte == 0 || availableOnMsg == 0) { + if (log.isDebugEnabled()) { + log.debug("{} Dispatch rate limit is reached, availableOnByte: {}, availableOnMsg: {}.", + name, availableOnByte, availableOnMsg); + } + return -1; + } + if (availableOnMsg > 0) { + availablePermits = Math.min(availablePermits, (int) availableOnMsg); + } + } + return availablePermits; } @@ -210,26 +241,66 @@ public void readEntriesComplete(List list, Object o) { log.debug("{} Read entries complete. Entries size: {}", name, list.size()); } HAVE_PENDING_READ_UPDATER.set(this, FALSE); - if (list == null || list.isEmpty()) { + if (CollectionUtils.isEmpty(list)) { long delay = readFailureBackoff.next(); log.warn("{} The read entry list is empty, will retry in {} ms. ReadPosition: {}, LAC: {}.", name, delay, cursor.getReadPosition(), topic.getManagedLedger().getLastConfirmedEntry()); scheduledExecutorService.schedule(this::readMoreEntries, delay, TimeUnit.MILLISECONDS); return; } + + if (readBatchSize < readMaxBatchSize) { + int newReadBatchSize = Math.min(readBatchSize * 2, readMaxBatchSize); + if (log.isDebugEnabled()) { + log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize, + newReadBatchSize); + } + + readBatchSize = newReadBatchSize; + } + readFailureBackoff.reduceToHalf(); - List> bufList = new ArrayList<>(list.size()); + routeExecutor.execute(() -> this.handleEntries(list)); + } + + private void handleEntries(List list) { + PENDING_SIZE_UPDATER.addAndGet(this, list.size()); + + List>> propsList = new ArrayList<>(); + boolean encounterError = false; for (Entry entry : list) { - bufList.add( - Pair.of(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()), entry.getDataBuffer())); + if (encounterError) { + entry.release(); + continue; + } + + Map props; + try { + MessageImpl message = MessageImpl.deserialize(entry.getDataBuffer()); + props = message.getMessageBuilder().getPropertiesList().stream() + .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); + } catch (Exception e) { + log.error("Failed to deserialize entry dataBuffer for topic: {}, rewind cursor.", name, e); + encounterError = true; + propsList.clear(); + continue; + } + + propsList.add(Pair.of(entry.getPosition(), props)); + topic.getDispatchRateLimiter().ifPresent( + limiter -> limiter.consumeDispatchQuota(1, entry.getLength())); + entry.release(); + } + if (encounterError) { + cursor.rewind(); + PENDING_SIZE_UPDATER.set(this, 0); + this.readMoreEntries(); + return; } - routeExecutor.execute(() -> this.readComplete(bufList)); - } - private void readComplete(List> list) { - for (Pair entry : list) { - PENDING_SIZE_UPDATER.incrementAndGet(this); - readProcess(entry.getRight(), entry.getLeft()).whenCompleteAsync((ignored, exception) -> { + for (var posAndProps : propsList) { + final Position position = posAndProps.getLeft(); + routeIndex(posAndProps.getRight(), position).whenCompleteAsync((ignored, exception) -> { if (exception != null) { log.error("{} Error producing messages", name, exception); this.cursor.rewind(); @@ -237,19 +308,16 @@ private void readComplete(List> list) { if (log.isDebugEnabled()) { log.debug("{} Route message successfully.", name); } - AmqpExchangeReplicator.this.cursor - .asyncDelete(entry.getLeft(), this, entry.getLeft()); + AmqpExchangeReplicator.this.cursor.asyncDelete(position, this, position); } - if (PENDING_SIZE_UPDATER.decrementAndGet(this) < routeQueueSize * 0.5 - && HAVE_PENDING_READ_UPDATER.get(this) == FALSE) { + if (PENDING_SIZE_UPDATER.decrementAndGet(this) <= 0) { this.readMoreEntries(); } }, routeExecutor); - entry.getRight().release(); } } - public abstract CompletableFuture readProcess(ByteBuf data, Position position); + public abstract CompletableFuture routeIndex(Map props, Position position); @Override public void readEntriesFailed(ManagedLedgerException exception, Object o) { @@ -266,6 +334,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object o) { log.debug("{} Read entries from bookie failed, retrying in {} s", name, waitTimeMs / 1000, exception); } HAVE_PENDING_READ_UPDATER.set(this, FALSE); + readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize(); scheduledExecutorService.schedule(this::readMoreEntries, waitTimeMs, TimeUnit.MILLISECONDS); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java index 3ffcf40a..5ce1e931 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Sets; -import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; import io.streamnative.pulsar.handlers.amqp.AmqpEntryWriter; import io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator; @@ -35,7 +34,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -51,9 +49,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -126,16 +122,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis if (messageReplicator == null) { messageReplicator = new AmqpExchangeReplicator(this, routeExecutor, routeQueueSize) { @Override - public CompletableFuture readProcess(ByteBuf data, Position position) { - Map props; - try { - MessageImpl message = MessageImpl.deserialize(data); - props = message.getMessageBuilder().getPropertiesList().stream() - .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); - } catch (Exception e) { - log.error("Failed to deserialize entry dataBuffer. exchangeName: {}", exchangeName, e); - return FutureUtil.failedFuture(e); - } + public CompletableFuture routeIndex(Map props, Position position) { List> routeFutureList = new ArrayList<>(); if (exchangeType == Type.Direct) { diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java index e6294be2..3c288798 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java @@ -148,6 +148,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }); when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); + when(pulsarService.getConfig()).thenReturn(serviceConfiguration); when(pulsarService.getOrderedExecutor()).thenReturn( OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build()); when(serviceConfiguration.getNumIOThreads()).thenReturn(2 * Runtime.getRuntime().availableProcessors()); diff --git a/pom.xml b/pom.xml index b68ee51a..409e6f41 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ ${maven.compiler.target} - 3.3.0.1 + 3.3.0-SNAPSHOT 8.0.0 5.8.0 diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 417cc3f4..7fe2d035 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -102,6 +102,8 @@ public abstract class AmqpProtocolHandlerTestBase { @Getter private List aopAdminPortList = new ArrayList<>(); + private final Integer inFlightSizeInMB = 5; + public AmqpProtocolHandlerTestBase() { resetConfig(); } @@ -128,6 +130,7 @@ protected void resetConfig() { amqpConfig.setBrokerShutdownTimeoutMs(0L); amqpConfig.setDefaultNumPartitions(1); amqpConfig.setTransactionCoordinatorEnabled(true); + amqpConfig.setManagedLedgerMaxReadsInFlightSizeInMB(inFlightSizeInMB); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java index 650220ca..69147c4c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.amqp.rabbitmq; +import static org.junit.Assert.assertTrue; + import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; @@ -33,9 +35,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; @@ -512,4 +516,38 @@ public void handleDelivery(String consumerTag, channel.close(); conn.close(); } + + @Test(timeOut = 15_000) + public void testConsumeMoreThanInFlightSize() throws Exception { + Connection conn = getConnection("vhost1", true); + Channel channel = conn.createChannel(); + + String qu = randQuName(); + channel.queueDeclare(qu, true, false, false, null); + + long totalContentSize = conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024 * 1024 * 2; + AtomicLong receiveContentSize = new AtomicLong(0); + + CountDownLatch latch = new CountDownLatch(1); + channel.basicConsume(qu, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + if (receiveContentSize.addAndGet(body.length) >= totalContentSize) { + latch.countDown(); + } + } + }); + + AtomicLong sendContentSize = new AtomicLong(0); + byte[] content = RandomUtils.nextBytes(1024 * 100); + do { + channel.basicPublish("", qu, null, content); + } while (sendContentSize.addAndGet(content.length) < totalContentSize); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + channel.close(); + conn.close(); + } + }