diff --git a/src/aleph/chains/chain_data_service.py b/src/aleph/chains/chain_data_service.py index bbc7fa741..6458f5923 100644 --- a/src/aleph/chains/chain_data_service.py +++ b/src/aleph/chains/chain_data_service.py @@ -222,7 +222,7 @@ async def get_tx_messages( async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExchange: mq_conn = await aio_pika.connect_robust( - host=config.rabbitmq.host.value, + host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, diff --git a/src/aleph/config.py b/src/aleph/config.py index 9cded5516..9e16362a8 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -174,7 +174,9 @@ def get_defaults(): "sub_exchange": "p2p-subscribe", # Name of the exchange used to publish processed messages (output of the message processor). "message_exchange": "aleph-messages", + # Name of the RabbitMQ exchange used for pending messages (input of the message processor). "pending_message_exchange": "aleph-pending-messages", + # Name of the RabbitMQ exchange used for sync/message events (input of the TX processor). "pending_tx_exchange": "aleph-pending-txs", }, "redis": { diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index d90837226..97df33dc9 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -99,7 +99,7 @@ async def process_pending_txs(self) -> None: async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: mq_conn = await aio_pika.connect_robust( - host=config.rabbitmq.host.value, + host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, @@ -113,14 +113,11 @@ async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: pending_tx_queue = await channel.declare_queue( name="pending-tx-queue", durable=True, auto_delete=False ) - await pending_tx_queue.bind(pending_tx_exchange, routing_key="*") + await pending_tx_queue.bind(pending_tx_exchange, routing_key="#") return pending_tx_queue async def handle_txs_task(config: Config): - max_concurrent_tasks = config.aleph.jobs.pending_txs.max_concurrency.value - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-txs") session_factory = make_session_factory(engine)