diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java index f21e4061faa..121d2a3dfed 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java @@ -19,58 +19,25 @@ package org.apache.james.task.eventsourcing.distributed; -import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; -import static org.apache.james.backends.rabbitmq.Constants.DURABLE; -import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete; -import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; -import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; - import jakarta.inject.Inject; -import org.apache.james.backends.rabbitmq.QueueArguments; -import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; public class TerminationReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(TerminationReconnectionHandler.class); - - private final TerminationQueueName queueName; private final RabbitMQTerminationSubscriber terminationSubscriber; - private final RabbitMQConfiguration configuration; @Inject - public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) { - this.queueName = queueName; + public TerminationReconnectionHandler(RabbitMQTerminationSubscriber terminationSubscriber) { this.terminationSubscriber = terminationSubscriber; - this.configuration = configuration; } @Override public Publisher handleReconnection(Connection connection) { - return Mono.fromRunnable(() -> createTerminationQueue(connection)) - .then(Mono.fromRunnable(terminationSubscriber::restart)); - } - - private void createTerminationQueue(Connection connection) { - try (Channel channel = connection.createChannel()) { - QueueArguments.Builder builder = QueueArguments.builder(); - configuration.getQueueTTL().ifPresent(builder::queueTTL); - channel.queueDeclare(queueName.asString(), - evaluateDurable(!DURABLE, configuration.isQuorumQueuesUsed()), - evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed()), - evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed()), - builder.build()); - } catch (Exception e) { - LOGGER.error("Error recovering connection", e); - } + return Mono.fromRunnable(terminationSubscriber::restart); } }