diff --git a/remotecv/unique_queue.py b/remotecv/unique_queue.py index 6a539fa..b494683 100644 --- a/remotecv/unique_queue.py +++ b/remotecv/unique_queue.py @@ -2,7 +2,7 @@ from pyres.worker import Worker from remotecv.timing import get_time, get_interval -from remotecv.utils import context, logger +from remotecv.utils import config, context, logger class UniqueQueue(ResQ): @@ -84,3 +84,10 @@ def reserve(self, timeout=10): get_interval(start_time, get_time()), ) return job + + def register_worker(self): + super().register_worker() + if config.worker_ttl: + self.resq.redis.expire( + f"resque:worker:{str(self)}:started", config.worker_ttl + ) diff --git a/remotecv/worker.py b/remotecv/worker.py index 9f72b54..ad9f812 100644 --- a/remotecv/worker.py +++ b/remotecv/worker.py @@ -267,6 +267,14 @@ def import_modules(): type=click.INT, help="Timeout in seconds for image detection", ) +@optgroup.option( + "--worker-ttl", + envvar="WORKER_TTL", + show_envvar=True, + default=None, + type=click.INT, + help="TTL in seconds for worker", +) @optgroup.option( "--sentry-url", envvar="SENTRY_URL", @@ -314,6 +322,7 @@ def main(**params): config.polling_interval = params["polling_interval"] config.timeout = params["timeout"] + config.worker_ttl = params["worker_ttl"] config.server_port = params["server_port"] config.log_level = params["level"].upper() config.loader = import_module(params["loader"]) diff --git a/tests/test_unique_queue.py b/tests/test_unique_queue.py index 1ccb8f7..5bcc544 100644 --- a/tests/test_unique_queue.py +++ b/tests/test_unique_queue.py @@ -95,10 +95,10 @@ def test_should_send_metrics(self): class UniqueWorkerTestCase(TestCase): def setUp(self): - redis = redis_client() + self.redis = redis_client() self.unique_queue = UniqueWorker( queues=["Detect"], - server=redis, + server=self.redis, timeout=None, ) @@ -109,3 +109,19 @@ def test_should_send_queue_metrics(self): context.metrics.timing.assert_called_once_with( "worker.read_queue.time", mock.ANY ) + + def test_should_create_unique_worker_without_ttl(self): + config.worker_ttl = None + worker = UniqueWorker(server=self.redis, queues=["Detect"]) + worker.register_worker() + expect( + self.redis.ttl(f"resque:worker:{str(worker)}:started") + ).to_equal(-1) + + def test_should_create_unique_worker_with_ttl(self): + config.worker_ttl = 60 + worker = UniqueWorker(server=self.redis, queues=["Detect"]) + worker.register_worker() + expect( + self.redis.ttl(f"resque:worker:{str(worker)}:started") + ).Not.to_equal(-1)