Skip to content

Commit

Permalink
JAMES-3605 TerminationReconnectionHandler should not create the termi…
Browse files Browse the repository at this point in the history
…nation queue upon reconnection

Otherwise error:
```
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-version' for queue 'terminationSubscriber291d56fc-1e2d-4fb3-a455-bad297263f43' in vhost '/': received none but current is the value '2' of type 'signedint', class-id=50, method-id=10)
```

And cause the remaining reconnection handlers to fail.
  • Loading branch information
quantranhong1999 authored and Arsnael committed Dec 5, 2024
1 parent 8b94aec commit d4aadcb
Showing 1 changed file with 2 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,25 @@

package org.apache.james.task.eventsourcing.distributed;

import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;

import jakarta.inject.Inject;

import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import reactor.core.publisher.Mono;

public class TerminationReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TerminationReconnectionHandler.class);

private final TerminationQueueName queueName;
private final RabbitMQTerminationSubscriber terminationSubscriber;
private final RabbitMQConfiguration configuration;

@Inject
public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) {
this.queueName = queueName;
public TerminationReconnectionHandler(RabbitMQTerminationSubscriber terminationSubscriber) {
this.terminationSubscriber = terminationSubscriber;
this.configuration = configuration;
}

@Override
public Publisher<Void> handleReconnection(Connection connection) {
return Mono.fromRunnable(() -> createTerminationQueue(connection))
.then(Mono.fromRunnable(terminationSubscriber::restart));
}

private void createTerminationQueue(Connection connection) {
try (Channel channel = connection.createChannel()) {
QueueArguments.Builder builder = QueueArguments.builder();
configuration.getQueueTTL().ifPresent(builder::queueTTL);
channel.queueDeclare(queueName.asString(),
evaluateDurable(!DURABLE, configuration.isQuorumQueuesUsed()),
evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed()),
evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed()),
builder.build());
} catch (Exception e) {
LOGGER.error("Error recovering connection", e);
}
return Mono.fromRunnable(terminationSubscriber::restart);
}
}

0 comments on commit d4aadcb

Please sign in to comment.