From b6ed512b41fae241e30b5691f875c38a19e78195 Mon Sep 17 00:00:00 2001 From: Quan Tran Date: Wed, 4 Dec 2024 14:57:16 +0700 Subject: [PATCH] JAMES-3605 Implement DeletedMessageVaultWorkQueueReconnectionHandler So the consumer for deleted message vault queue could reconnect if needed. --- ...sageVaultWorkQueueReconnectionHandler.java | 43 +++++++++++++++++++ ...edDeletedMessageVaultDeletionCallback.java | 24 ++++++++--- .../DistributedDeletedMessageVaultModule.java | 4 ++ 3 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java new file mode 100644 index 00000000000..0893a66d345 --- /dev/null +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java @@ -0,0 +1,43 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.modules.mailbox; + +import jakarta.inject.Inject; + +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.reactivestreams.Publisher; + +import com.rabbitmq.client.Connection; + +import reactor.core.publisher.Mono; + +public class DeletedMessageVaultWorkQueueReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { + private final DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback; + + @Inject + public DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback) { + this.distributedDeletedMessageVaultDeletionCallback = distributedDeletedMessageVaultDeletionCallback; + } + + @Override + public Publisher handleReconnection(Connection connection) { + return Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart); + } +} diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java index 6be72463398..6bed7178c55 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java @@ -35,6 +35,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; +import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.blob.api.BlobId; import org.apache.james.core.Username; import org.apache.james.lifecycle.api.Startable; @@ -168,7 +169,7 @@ DeleteMessageListener.DeletedMessageCopyCommand asPojo(MailboxId.Factory mailbox private final MailboxId.Factory mailboxIdFactory; private final MessageId.Factory messageIdFactory; private final BlobId.Factory blobIdFactory; - private Receiver receiver; + private final ReceiverProvider receiverProvider; private Disposable disposable; @Inject @@ -178,7 +179,8 @@ public DistributedDeletedMessageVaultDeletionCallback(Sender sender, DeletedMessageVaultDeletionCallback callback, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, - BlobId.Factory blobIdFactory) { + BlobId.Factory blobIdFactory, + ReceiverProvider receiverProvider) { this.sender = sender; this.rabbitMQConfiguration = rabbitMQConfiguration; this.callback = callback; @@ -187,6 +189,7 @@ public DistributedDeletedMessageVaultDeletionCallback(Sender sender, this.blobIdFactory = blobIdFactory; this.objectMapper = new ObjectMapper(); this.channelPool = channelPool; + this.receiverProvider = receiverProvider; } public void init() { @@ -215,17 +218,28 @@ public void init() { .then() .block(); - receiver = channelPool.createReceiver(); - disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)) + disposable = consumeDeletedMessageVaultWorkQueue(); + } + + private Disposable consumeDeletedMessageVaultWorkQueue() { + return Flux.using( + receiverProvider::createReceiver, + receiver -> receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)), + Receiver::close) .flatMap(this::handleMessage) .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } + public void restart() { + Disposable previousConsumer = disposable; + disposable = consumeDeletedMessageVaultWorkQueue(); + previousConsumer.dispose(); + } + @PreDestroy public void stop() { Optional.ofNullable(disposable).ifPresent(Disposable::dispose); - Optional.ofNullable(receiver).ifPresent(Receiver::close); } private Mono handleMessage(AcknowledgableDelivery delivery) { diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java index 68c1744ea97..2a2078ed491 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java @@ -20,6 +20,7 @@ package org.apache.james.modules.mailbox; import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.mailbox.cassandra.DeleteMessageListener; import org.apache.james.modules.vault.DeletedMessageVaultModule; import org.apache.james.utils.InitializationOperation; @@ -68,6 +69,9 @@ protected void configure() { .addBinding() .to(DistributedDeletedMessageVaultDeletionCallback.class); bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON); + + Multibinder reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class); + reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class); } @ProvidesIntoSet