Skip to content

Commit

Permalink
JAMES-3605 Implement DeletedMessageVaultWorkQueueReconnectionHandler
Browse files Browse the repository at this point in the history
So the consumer for deleted message vault queue could reconnect if needed.
  • Loading branch information
quantranhong1999 authored and Arsnael committed Dec 5, 2024
1 parent d4aadcb commit b6ed512
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.modules.mailbox;

import jakarta.inject.Inject;

import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.reactivestreams.Publisher;

import com.rabbitmq.client.Connection;

import reactor.core.publisher.Mono;

public class DeletedMessageVaultWorkQueueReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler {
private final DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback;

@Inject
public DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback) {
this.distributedDeletedMessageVaultDeletionCallback = distributedDeletedMessageVaultDeletionCallback;
}

@Override
public Publisher<Void> handleReconnection(Connection connection) {
return Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.blob.api.BlobId;
import org.apache.james.core.Username;
import org.apache.james.lifecycle.api.Startable;
Expand Down Expand Up @@ -168,7 +169,7 @@ DeleteMessageListener.DeletedMessageCopyCommand asPojo(MailboxId.Factory mailbox
private final MailboxId.Factory mailboxIdFactory;
private final MessageId.Factory messageIdFactory;
private final BlobId.Factory blobIdFactory;
private Receiver receiver;
private final ReceiverProvider receiverProvider;
private Disposable disposable;

@Inject
Expand All @@ -178,7 +179,8 @@ public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
DeletedMessageVaultDeletionCallback callback,
MailboxId.Factory mailboxIdFactory,
MessageId.Factory messageIdFactory,
BlobId.Factory blobIdFactory) {
BlobId.Factory blobIdFactory,
ReceiverProvider receiverProvider) {
this.sender = sender;
this.rabbitMQConfiguration = rabbitMQConfiguration;
this.callback = callback;
Expand All @@ -187,6 +189,7 @@ public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
this.blobIdFactory = blobIdFactory;
this.objectMapper = new ObjectMapper();
this.channelPool = channelPool;
this.receiverProvider = receiverProvider;
}

public void init() {
Expand Down Expand Up @@ -215,17 +218,28 @@ public void init() {
.then()
.block();

receiver = channelPool.createReceiver();
disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS))
disposable = consumeDeletedMessageVaultWorkQueue();
}

private Disposable consumeDeletedMessageVaultWorkQueue() {
return Flux.using(
receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)),
Receiver::close)
.flatMap(this::handleMessage)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}

public void restart() {
Disposable previousConsumer = disposable;
disposable = consumeDeletedMessageVaultWorkQueue();
previousConsumer.dispose();
}

@PreDestroy
public void stop() {
Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
Optional.ofNullable(receiver).ifPresent(Receiver::close);
}

private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.james.modules.mailbox;

import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.modules.vault.DeletedMessageVaultModule;
import org.apache.james.utils.InitializationOperation;
Expand Down Expand Up @@ -68,6 +69,9 @@ protected void configure() {
.addBinding()
.to(DistributedDeletedMessageVaultDeletionCallback.class);
bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);

Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
}

@ProvidesIntoSet
Expand Down

0 comments on commit b6ed512

Please sign in to comment.