From 597c1ba8a444ac77270fb29d4eedac71d6d11cfb Mon Sep 17 00:00:00 2001 From: Christopher Bartz Date: Fri, 12 Jul 2024 13:28:54 +0200 Subject: [PATCH] remove process.wait and determine quantity after spawning all runners --- src/reactive/runner_manager.py | 29 ++++++------ tests/unit/reactive/test_runner_manager.py | 51 ++++++++++++++++------ 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/reactive/runner_manager.py b/src/reactive/runner_manager.py index 97f418782..3e3a68135 100644 --- a/src/reactive/runner_manager.py +++ b/src/reactive/runner_manager.py @@ -73,17 +73,17 @@ def reconcile(quantity: int, config: ReactiveRunnerConfig) -> int: """ actual_quantity = _determine_current_quantity() logger.info("Actual quantity of reactive runner processes: %s", actual_quantity) - delta = quantity - actual_quantity - actual_delta = delta + actual_delta = delta = quantity - actual_quantity if delta > 0: logger.info("Will spawn %d new reactive runner processes", delta) _setup_logging() for _ in range(delta): try: _spawn_runner(config) - except ReactiveRunnerError: + except _SpawnError: logger.exception("Failed to spawn a new reactive runner process") - actual_delta -= 1 + actual_quantity_after_spawning = _determine_current_quantity() + actual_delta = actual_quantity_after_spawning - actual_quantity elif delta < 0: logger.info( "%d reactive runner processes are running. " @@ -125,6 +125,10 @@ def _setup_logging() -> None: shutil.chown(REACTIVE_RUNNER_LOG_DIR, user=UBUNTU_USER, group=UBUNTU_USER) +class _SpawnError(Exception): + """Raised when spawning a runner fails.""" + + def _spawn_runner(reactive_runner_config: ReactiveRunnerConfig) -> None: """Spawn a runner. @@ -132,7 +136,7 @@ def _spawn_runner(reactive_runner_config: ReactiveRunnerConfig) -> None: reactive_runner_config: The configuration for the reactive runner. Raises: - ReactiveRunnerError: If the runner fails to spawn. + _SpawnError: If the runner fails to spawn. """ env = { "PYTHONPATH": "src:lib:venv", @@ -164,14 +168,9 @@ def _spawn_runner(reactive_runner_config: ReactiveRunnerConfig) -> None: user=UBUNTU_USER, ) - try: - process.wait(0.001) - except subprocess.TimeoutExpired: - pass - else: - if process.returncode not in (0, None): - raise ReactiveRunnerError( - f"Failed to spawn a new reactive runner process with pid {process.pid}." - f" Return code: {process.returncode}" - ) + if process.returncode not in (0, None): + raise _SpawnError( + f"Failed to spawn a new reactive runner process with pid {process.pid}." + f" Return code: {process.returncode}" + ) logger.debug("Spawned a new reactive runner process with pid %s", process.pid) diff --git a/tests/unit/reactive/test_runner_manager.py b/tests/unit/reactive/test_runner_manager.py index 189d9b93c..c3db39521 100644 --- a/tests/unit/reactive/test_runner_manager.py +++ b/tests/unit/reactive/test_runner_manager.py @@ -49,12 +49,11 @@ def secure_run_subprocess_mock_fixture(monkeypatch: pytest.MonkeyPatch) -> Magic @pytest.fixture(name="subprocess_popen_mock") def subprocess_popen_mock_fixture(monkeypatch: pytest.MonkeyPatch) -> MagicMock: """Mock the subprocess.Popen function.""" - popen_result = MagicMock(spec=subprocess.Popen, pid=1234) + popen_result = MagicMock(spec=subprocess.Popen, pid=1234, returncode=0) subprocess_popen_mock = MagicMock( spec=subprocess.Popen, return_value=popen_result, ) - popen_result.wait.side_effect = subprocess.TimeoutExpired("cmd", 1) monkeypatch.setattr("subprocess.Popen", subprocess_popen_mock) return subprocess_popen_mock @@ -69,7 +68,9 @@ def test_reconcile_spawns_runners( """ queue_name = secrets.token_hex(16) reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name) - _arrange_reactive_processes(secure_run_subprocess_mock, count=2) + _arrange_reactive_processes( + secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=5 + ) delta = reconcile(5, reactive_config) @@ -88,7 +89,9 @@ def test_reconcile_does_not_spawn_runners( """ queue_name = secrets.token_hex(16) reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name) - _arrange_reactive_processes(secure_run_subprocess_mock, count=2) + _arrange_reactive_processes( + secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=2 + ) delta = reconcile(2, reactive_config) @@ -106,7 +109,9 @@ def test_reconcile_does_not_spawn_runners_for_too_many_processes( """ queue_name = secrets.token_hex(16) reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name) - _arrange_reactive_processes(secure_run_subprocess_mock, count=2) + _arrange_reactive_processes( + secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=2 + ) delta = reconcile(1, reactive_config) assert delta == 0 @@ -151,24 +156,42 @@ def test_reconcile_spawn_runner_failed( MagicMock(returncode=1), MagicMock(returncode=0), ] - _arrange_reactive_processes(secure_run_subprocess_mock, count=0) + _arrange_reactive_processes( + secure_run_subprocess_mock, count_before_spawn=0, count_after_spawn=2 + ) delta = reconcile(3, reactive_config) assert delta == 2 -def _arrange_reactive_processes(secure_run_subprocess_mock: MagicMock, count: int): +def _arrange_reactive_processes( + secure_run_subprocess_mock: MagicMock, count_before_spawn: int, count_after_spawn: int +): """Mock reactive runner processes are active. Args: secure_run_subprocess_mock: The mock to use for the ps command. - count: The number of processes + count_before_spawn: The number of processes before spawning new ones. + count_after_spawn: The number of processes after spawning new ones. """ - process_cmds = "\n".join([f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count)]) - secure_run_subprocess_mock.return_value = CompletedProcess( - args=PS_COMMAND_LINE_LIST, - returncode=0, - stdout=f"CMD\n{process_cmds}".encode("utf-8"), - stderr=b"", + process_cmds_before = "\n".join( + [f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count_before_spawn)] ) + process_cmds_after = "\n".join( + [f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count_after_spawn)] + ) + secure_run_subprocess_mock.side_effect = [ + CompletedProcess( + args=PS_COMMAND_LINE_LIST, + returncode=0, + stdout=f"CMD\n{process_cmds_before}".encode("utf-8"), + stderr=b"", + ), + CompletedProcess( + args=PS_COMMAND_LINE_LIST, + returncode=0, + stdout=f"CMD\n{process_cmds_after}".encode("utf-8"), + stderr=b"", + ), + ]