Skip to content

Commit

Permalink
fix routing key
Browse files Browse the repository at this point in the history
  • Loading branch information
odesenfans committed Oct 26, 2023
1 parent 91a7319 commit 6e1134b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async def get_tx_messages(

async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExchange:
mq_conn = await aio_pika.connect_robust(
host=config.rabbitmq.host.value,
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
Expand Down
9 changes: 8 additions & 1 deletion src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def get_defaults():
],
"topics": ["ALIVE", "ALEPH-TEST"],
},
"storage": {"folder": "/var/lib/pyaleph", "store_files": True, "engine": "filesystem"},
"storage": {
"folder": "/var/lib/pyaleph",
"store_files": True,
"engine": "filesystem",
},
"nuls": {
"chain_id": 8964,
"enabled": False,
Expand Down Expand Up @@ -124,8 +128,11 @@ def get_defaults():
"password": "change-me!",
"pub_exchange": "p2p-publish",
"sub_exchange": "p2p-subscribe",
# Name of the RabbitMQ exchange used for processed messages (output of the message processor).
"message_exchange": "aleph-messages",
# Name of the RabbitMQ exchange used for pending messages (input of the message processor).
"pending_message_exchange": "aleph-pending-messages",
# Name of the RabbitMQ exchange used for sync/message events (input of the TX processor).
"pending_tx_exchange": "aleph-pending-txs",
},
"redis": {
Expand Down
7 changes: 2 additions & 5 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async def process_pending_txs(self) -> None:

async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue:
mq_conn = await aio_pika.connect_robust(
host=config.rabbitmq.host.value,
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
Expand All @@ -114,14 +114,11 @@ async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue:
pending_tx_queue = await channel.declare_queue(
name="pending-tx-queue", durable=True, auto_delete=False
)
await pending_tx_queue.bind(pending_tx_exchange, routing_key="*")
await pending_tx_queue.bind(pending_tx_exchange, routing_key="#")
return pending_tx_queue


async def handle_txs_task(config: Config):
max_concurrent_tasks = config.aleph.jobs.pending_txs.max_concurrency.value
await asyncio.sleep(4)

engine = make_engine(config=config, application_name="aleph-txs")
session_factory = make_session_factory(engine)

Expand Down

0 comments on commit 6e1134b

Please sign in to comment.