diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 854f17a939..931ebbb653 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -6,6 +6,7 @@ from artiq.experiment import * from artiq.master.scheduler import Scheduler +from sipyco.sync_struct import process_mod class EmptyExperiment(EnvExperiment): @@ -50,27 +51,17 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush=False): - return [ - {"action": "setitem", "key": rid, "value": - {"pipeline": "main", "status": "pending", "priority": priority, - "expid": expid, "due_date": None, "flush": flush, - "repo_msg": None}, - "path": []}, - {"action": "setitem", "key": "status", "value": "preparing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "prepare_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "running", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "run_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "analyzing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "deleting", - "path": [rid]}, - {"action": "delitem", "key": rid, "path": []} - ] +def _make_status_events(rids): + return { + rid: { + key: asyncio.Event() + for key in ( + "pending", "preparing", "prepare_done", + "running", "run_done", "analyzing", + "deleting", "paused", "flushing" + ) + } for rid in rids + } class _RIDCounter: @@ -87,50 +78,82 @@ class SchedulerCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + self.experiments = {} + self.status_events = {} + self.handlers = {} + self.scheduler = Scheduler(_RIDCounter(0), self.handlers, None, None) + self.scheduler.notifier.publish = self._notify + + def configure(self, rids): + self.status_events = _make_status_events(rids) + + def _notify(self, mod): + process_mod(self.experiments, mod) + for rid, events in self.status_events.items(): + if (mod["action"] == "setitem" and + mod["key"] == rid and + mod["value"]["status"] == "pending"): + events["pending"].set() + if (mod["action"] == "setitem" and + mod["key"] == "status" and + mod["path"] == [rid] and + mod["value"] in events): + events[mod["value"]].set() + + def assertStatusEqual(self, rid, status): + actual = self.experiments[rid]["status"] + if status != actual: + raise AssertionError(f"RID {rid} should have status {status}, " + f"got {actual}") + + def assertArriveStatus(self, rid, status, timeout=5): + try: + self.loop.run_until_complete( + asyncio.wait_for( + self.status_events[rid][status].wait(), timeout)) + except asyncio.TimeoutError: + raise AssertionError(f"Rid {rid} did not arrive " + f"{status} within {timeout}s") + + def assertFirstArriveStatus(self, first_rid, rids, status): + futures = { + rid: asyncio.ensure_future(self.status_events[rid][status].wait()) + for rid in rids + } + done, pending = self.loop.run_until_complete(asyncio.wait( + futures.values(), + return_when=asyncio.FIRST_COMPLETED)) + for task in pending: + task.cancel() + if futures[first_rid] not in done: + raise AssertionError( + f"RID {first_rid} did not arrive {status} first" + ) def test_steps(self): - loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) + self.configure([0, 1]) expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid) - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() + self.scheduler.start() # Verify that a timed experiment far in the future does not # get run, even if it has high priority. late = time() + 100000 - expect.insert(0, - {"action": "setitem", "key": 0, "value": - {"pipeline": "main", "status": "pending", "priority": 99, - "expid": expid, "due_date": late, "flush": False, - "repo_msg": None}, - "path": []}) - scheduler.submit("main", expid, 99, late, False) + self.scheduler.submit("main", expid, 99, late, False) # This one (RID 1) gets run instead. - scheduler.submit("main", expid, 0, None, False) + self.scheduler.submit("main", expid, 0, None, False) + + self.assertArriveStatus(1, "deleting") + self.assertStatusEqual(0, "pending") - loop.run_until_complete(done.wait()) - scheduler.notifier.publish = None - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.scheduler.stop()) def test_pending_priority(self): """Check due dates take precedence over priorities when waiting to prepare.""" - loop = self.loop - handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) - handlers["scheduler_check_pause"] = scheduler.check_pause + self.configure([0, 1, 2]) + self.handlers["scheduler_check_pause"] = self.scheduler.check_pause expid_empty = _get_expid("EmptyExperiment") @@ -144,145 +167,20 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - expect = [ - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": low_priority, - "pipeline": "main", - "due_date": None, - "status": "pending", - "expid": expid_bg, - "flush": False - }, - "key": 0 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": high_priority, - "pipeline": "main", - "due_date": late, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 1 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": middle_priority, - "pipeline": "main", - "due_date": early, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 2 - }, - { - "path": [0], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "paused", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "run_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "analyzing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "deleting", - "key": "status" - }, - { - "path": [], - "action": "delitem", - "key": 2 - }, - ] - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() - - scheduler.submit("main", expid_bg, low_priority) - scheduler.submit("main", expid_empty, high_priority, late) - scheduler.submit("main", expid_empty, middle_priority, early) - - loop.run_until_complete(done.wait()) - scheduler.notifier.publish = None - loop.run_until_complete(scheduler.stop()) + self.scheduler.start() + self.scheduler.submit("main", expid_bg, low_priority) + self.scheduler.submit("main", expid_empty, high_priority, late) + self.scheduler.submit("main", expid_empty, middle_priority, early) - def test_pause(self): - loop = self.loop + self.assertArriveStatus(0, "paused") + self.assertFirstArriveStatus(2, [1, 2], "running") + + self.loop.run_until_complete(self.scheduler.stop()) + def test_pause(self): + self.configure([0, 1]) termination_ok = False + def check_termination(mod): nonlocal termination_ok self.assertEqual( @@ -290,139 +188,62 @@ def check_termination(mod): {"action": "setitem", "key": "termination_ok", "value": (False, True), "path": []}) termination_ok = True - handlers = { - "update_dataset": check_termination - } - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) + self.handlers["update_dataset"] = check_termination expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid) - background_running = asyncio.Event() - empty_ready = asyncio.Event() - empty_completed = asyncio.Event() - background_completed = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - empty_completed.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) - self.assertFalse(scheduler.check_pause(0)) - scheduler.submit("main", expid, 0, None, False) - self.assertFalse(scheduler.check_pause(0)) - loop.run_until_complete(empty_ready.wait()) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(empty_completed.wait()) - self.assertFalse(scheduler.check_pause(0)) - + self.scheduler.start() + # check_pause is True when rid with higher priority is prepare_done + self.scheduler.submit("main", expid_bg, -99, None, False) + self.assertArriveStatus(0, "running") + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.submit("main", expid, 0, None, False) + self.assertFalse(self.scheduler.check_pause(0)) + self.assertArriveStatus(1, "prepare_done") + self.assertTrue(self.scheduler.check_pause(0)) + self.assertArriveStatus(1, "deleting") + self.assertFalse(self.scheduler.check_pause(0)) + + # check_pause is True when request_termination is called self.assertFalse(termination_ok) - scheduler.request_termination(0) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(background_completed.wait()) + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.request_termination(0) + self.assertTrue(self.scheduler.check_pause(0)) + self.assertArriveStatus(0, "deleting") self.assertTrue(termination_ok) - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.scheduler.stop()) def test_close_with_active_runs(self): """Check scheduler exits with experiments still running""" - loop = self.loop - - scheduler = Scheduler(_RIDCounter(0), {}, None, None) - + self.configure([0, 1]) expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL expid = _get_expid("EmptyExperiment") - background_running = asyncio.Event() - empty_ready = asyncio.Event() - background_completed = asyncio.Event() - def notify(mod): - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) - - scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(empty_ready.wait()) + self.scheduler.start() + self.scheduler.submit("main", expid_bg, -99, None, False) + self.assertArriveStatus(0, "running") + + self.scheduler.submit("main", expid, 0, None, False) + self.assertArriveStatus(1, "prepare_done") # At this point, (at least) BackgroundExperiment is still running; make # sure we can stop the scheduler without hanging. - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.scheduler.stop()) def test_flush(self): - loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) + self.configure([0, 1]) expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid, 1, True) - expect.insert(1, {"key": "status", - "path": [1], - "value": "flushing", - "action": "setitem"}) - first_preparing = asyncio.Event() - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "preparing", - "key": "status", - "action": "setitem"}: - first_preparing.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(first_preparing.wait()) - scheduler.submit("main", expid, 1, None, True) - loop.run_until_complete(done.wait()) - loop.run_until_complete(scheduler.stop()) + self.scheduler.start() + self.scheduler.submit("main", expid, 0, None, False) + self.assertArriveStatus(0, "preparing") + self.scheduler.submit("main", expid, 1, None, True) + self.assertArriveStatus(1, "flushing") + self.loop.run_until_complete(self.scheduler.stop()) def tearDown(self): self.loop.close()