Skip to content

Commit

Permalink
fix: run all requested workers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeshultz committed Nov 27, 2023
1 parent 9779c4b commit 0f1f7f2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
5 changes: 3 additions & 2 deletions examples/redis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ async def exec_event2(log: ContractLog):
# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
block_count = state.block_count if hasattr(state, "block_count") else 0
return {
"message": f"Worker stopped after handling {state.block_count} blocks.",
"block_count": state.block_count,
"message": f"Worker stopped after handling {block_count} blocks.",
"block_count": block_count,
}


Expand Down
22 changes: 13 additions & 9 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,20 @@ def _network_callback(ctx, param, val):

async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
try:
tasks = []
with ThreadPoolExecutor(max_workers=worker_count) as pool:
receiver = Receiver(
broker=broker,
executor=pool,
validate_params=True,
max_async_tasks=1,
max_prefetch=0,
)
broker.is_worker_process = True
await receiver.listen()
for _ in range(worker_count):
receiver = Receiver(
broker=broker,
executor=pool,
validate_params=True,
max_async_tasks=1,
max_prefetch=0,
)
broker.is_worker_process = True
tasks.append(receiver.listen())

await asyncio.gather(*tasks)
finally:
await shutdown_broker(broker, shutdown_timeout)

Expand Down

0 comments on commit 0f1f7f2

Please sign in to comment.