diff --git a/src/aleph/chains/chain_data_service.py b/src/aleph/chains/chain_data_service.py index bbc7fa741..6458f5923 100644 --- a/src/aleph/chains/chain_data_service.py +++ b/src/aleph/chains/chain_data_service.py @@ -222,7 +222,7 @@ async def get_tx_messages( async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExchange: mq_conn = await aio_pika.connect_robust( - host=config.rabbitmq.host.value, + host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, diff --git a/src/aleph/config.py b/src/aleph/config.py index e996c7130..ab717a535 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -48,7 +48,11 @@ def get_defaults(): ], "topics": ["ALIVE", "ALEPH-TEST"], }, - "storage": {"folder": "/var/lib/pyaleph", "store_files": True, "engine": "filesystem"}, + "storage": { + "folder": "/var/lib/pyaleph", + "store_files": True, + "engine": "filesystem", + }, "nuls": { "chain_id": 8964, "enabled": False, @@ -124,8 +128,11 @@ def get_defaults(): "password": "change-me!", "pub_exchange": "p2p-publish", "sub_exchange": "p2p-subscribe", + # Name of the RabbitMQ exchange used for processed messages (output of the message processor). "message_exchange": "aleph-messages", + # Name of the RabbitMQ exchange used for pending messages (input of the message processor). "pending_message_exchange": "aleph-pending-messages", + # Name of the RabbitMQ exchange used for sync/message events (input of the TX processor). "pending_tx_exchange": "aleph-pending-txs", }, "redis": { diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 5d0df4371..c3a280341 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -100,7 +100,7 @@ async def process_pending_txs(self) -> None: async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: mq_conn = await aio_pika.connect_robust( - host=config.rabbitmq.host.value, + host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, @@ -114,14 +114,11 @@ async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: pending_tx_queue = await channel.declare_queue( name="pending-tx-queue", durable=True, auto_delete=False ) - await pending_tx_queue.bind(pending_tx_exchange, routing_key="*") + await pending_tx_queue.bind(pending_tx_exchange, routing_key="#") return pending_tx_queue async def handle_txs_task(config: Config): - max_concurrent_tasks = config.aleph.jobs.pending_txs.max_concurrency.value - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-txs") session_factory = make_session_factory(engine)