You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Since a lot of users might be coming here from a Celery background where it's common to submit a task and poll it's results, it'd be good to demonstrate this with the following, somewhere in the docs.
importasyncioimporttimeimportdramatiqfromdramatiq.brokers.redisimportRedisBrokerfromdramatiq.middlewareimportAsyncIOfromdramatiq.resultsimportResultsfromdramatiq.results.backendimportDEFAULT_TIMEOUTfromdramatiq.results.backends.redisimportRedisBackendfromdramatiq.results.errorsimportResultMissing, ResultTimeoutfromquartimportQuart, jsonifyclassRedisBackendExt(RedisBackend):
defget_result_from_message_key(self, message_key, *, block=False, timeout=None):
iftimeoutisNone:
timeout=DEFAULT_TIMEOUTifblock:
timeout=int(timeout/1000)
iftimeout==0:
data=self.client.rpoplpush(message_key, message_key)
else:
data=self.client.brpoplpush(message_key, message_key, timeout)
ifdataisNone:
raiseResultTimeout(message_key)
else:
data=self.client.lindex(message_key, 0)
ifdataisNone:
raiseResultMissing(message_key)
returnself.unwrap_result(self.encoder.decode(data))
backend=RedisBackendExt()
broker=RedisBroker(host='localhost', port=6379)
broker.add_middleware(Results(backend=backend))
broker.add_middleware(AsyncIO())
dramatiq.set_broker(broker)
app=Quart(__name__) # totally decoupled from dramatiq@dramatiq.actor(store_results=True)asyncdefcut_granite_with_toothpick(enqueue_time: float):
print(f'{enqueue_time=}')
awaitasyncio.sleep(4)
return'finally, the granite is cut...'@app.get('/message/<string:message_key>')asyncdefget_result(message_key):
result=backend.get_result_from_message_key(message_key)
returnjsonify({'result': result}), 200@app.get('/')asyncdefindex():
enqueue_time=time.time()
message=cut_granite_with_toothpick.send(enqueue_time)
message_key=backend.build_message_key(message)
print(f'http://127.0.0.1:8000/message/{message_key}')
returnjsonify({'message_key': message_key}), 202if__name__=='__main__':
app.run(host='127.0.0.1', port=8000, debug=True)
# run this with python3.13 test.py# also run dramatiq --processes 2 --threads 2 test
The text was updated successfully, but these errors were encountered:
Since a lot of users might be coming here from a Celery background where it's common to submit a task and poll it's results, it'd be good to demonstrate this with the following, somewhere in the docs.
The text was updated successfully, but these errors were encountered: