diff --git a/examples/redis/main.py b/examples/redis/main.py index 43928d2c..52064e83 100644 --- a/examples/redis/main.py +++ b/examples/redis/main.py @@ -62,9 +62,10 @@ async def exec_event2(log: ContractLog): # Just in case you need to release some resources or something @app.on_worker_shutdown() def worker_shutdown(state): + block_count = state.block_count if hasattr(state, "block_count") else 0 return { - "message": f"Worker stopped after handling {state.block_count} blocks.", - "block_count": state.block_count, + "message": f"Worker stopped after handling {block_count} blocks.", + "block_count": block_count, } diff --git a/silverback/_cli.py b/silverback/_cli.py index 29a48007..a0a6c098 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -46,16 +46,20 @@ def _network_callback(ctx, param, val): async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): try: + tasks = [] with ThreadPoolExecutor(max_workers=worker_count) as pool: - receiver = Receiver( - broker=broker, - executor=pool, - validate_params=True, - max_async_tasks=1, - max_prefetch=0, - ) - broker.is_worker_process = True - await receiver.listen() + for _ in range(worker_count): + receiver = Receiver( + broker=broker, + executor=pool, + validate_params=True, + max_async_tasks=1, + max_prefetch=0, + ) + broker.is_worker_process = True + tasks.append(receiver.listen()) + + await asyncio.gather(*tasks) finally: await shutdown_broker(broker, shutdown_timeout)