From e4a97d54867af0addafd425fd075b46bd0053b13 Mon Sep 17 00:00:00 2001 From: Ruoming Pang Date: Sun, 8 Dec 2024 01:56:41 +0000 Subject: [PATCH] Fixes bastion.py and cleaner.py. --- axlearn/cloud/common/bastion.py | 103 ++++++++++++++++---------------- axlearn/cloud/common/cleaner.py | 2 +- 2 files changed, 54 insertions(+), 51 deletions(-) diff --git a/axlearn/cloud/common/bastion.py b/axlearn/cloud/common/bastion.py index 6c8bdd28f..e47fd1cbd 100644 --- a/axlearn/cloud/common/bastion.py +++ b/axlearn/cloud/common/bastion.py @@ -750,7 +750,11 @@ def _append_to_project_history( ): now = datetime.now(timezone.utc) for project_id, limits in schedule_results.project_limits.items(): - job_verdicts = schedule_results.job_verdicts.get(project_id, {}) + job_verdicts = { + job_id: verdict + for job_id, verdict in schedule_results.job_verdicts.items() + if jobs[job_id].project_id == project_id + } verdicts = [] for job_id, verdict in job_verdicts.items(): verdicts.append((job_id, verdict.should_run(), verdict.metadata)) @@ -1076,56 +1080,55 @@ def _update_jobs(self): verbosity=schedule_options["verbosity"], ) self._append_to_project_history(schedulable_jobs, schedule_results) - for verdicts in schedule_results.job_verdicts.values(): - for job_name, verdict in verdicts.items(): - job = self._active_jobs[job_name] - assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE} - - if verdict: - old_tier = job.state.metadata.get("tier") - new_tier = verdict.metadata.get("tier") - changed_tiers = old_tier != new_tier - - jobspec_changed = job.state.metadata.get("updated") - - # Jobspec changed, trigger a restart of the runner. - if jobspec_changed: - self._append_to_job_history( - job, - msg="UPDATING: Detected updated jobspec. Will restart the runner " - "by sending to PENDING state", - state=JobLifecycleState.UPDATING, - ) - job.state.status = JobStatus.PENDING - elif job.state.status == JobStatus.PENDING or not changed_tiers: - # Resume if not running, or keep running if scheduling tier did not change. - job.state.status = JobStatus.ACTIVE - else: - # Job changed scheduling tiers, and must be restarted on the new tier. - # NOTE: this can possibly lead to thrashing of jobs that frequently switch - # tiers. One option is track per-job tier changes and hold off on promoting - # low priority to high priority if it was demoted recently. - # TODO(markblee): Add instrumentation to track frequency of tier changes to - # see whether this is necessary. - assert job.state.status == JobStatus.ACTIVE and changed_tiers - self._append_to_job_history( - job, - msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}", - state=JobLifecycleState.RESCHEDULING, - ) - job.state.status = JobStatus.PENDING + for job_name, verdict in schedule_results.job_verdicts.items(): + job = self._active_jobs[job_name] + assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE} + + if verdict: + old_tier = job.state.metadata.get("tier") + new_tier = verdict.metadata.get("tier") + changed_tiers = old_tier != new_tier + + jobspec_changed = job.state.metadata.get("updated") + + # Jobspec changed, trigger a restart of the runner. + if jobspec_changed: + self._append_to_job_history( + job, + msg="UPDATING: Detected updated jobspec. Will restart the runner " + "by sending to PENDING state", + state=JobLifecycleState.UPDATING, + ) + job.state.status = JobStatus.PENDING + elif job.state.status == JobStatus.PENDING or not changed_tiers: + # Resume if not running, or keep running if scheduling tier did not change. + job.state.status = JobStatus.ACTIVE else: - # Pre-empt/stay queued. - if job.command_proc is not None and _is_proc_complete(job.command_proc): - # As a slight optimization, we avoid pre-empting ACTIVE jobs that are - # complete, since we can directly transition to CLEANING. - job.state.status = JobStatus.ACTIVE - else: - job.state.status = JobStatus.PENDING - # Pending jobs which are not rescheduled should have no tier information. - verdict.metadata.pop("tier", None) - - job.state.metadata = verdict.metadata + # Job changed scheduling tiers, and must be restarted on the new tier. + # NOTE: this can possibly lead to thrashing of jobs that frequently switch + # tiers. One option is track per-job tier changes and hold off on promoting + # low priority to high priority if it was demoted recently. + # TODO(markblee): Add instrumentation to track frequency of tier changes to + # see whether this is necessary. + assert job.state.status == JobStatus.ACTIVE and changed_tiers + self._append_to_job_history( + job, + msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}", + state=JobLifecycleState.RESCHEDULING, + ) + job.state.status = JobStatus.PENDING + else: + # Pre-empt/stay queued. + if job.command_proc is not None and _is_proc_complete(job.command_proc): + # As a slight optimization, we avoid pre-empting ACTIVE jobs that are + # complete, since we can directly transition to CLEANING. + job.state.status = JobStatus.ACTIVE + else: + job.state.status = JobStatus.PENDING + # Pending jobs which are not rescheduled should have no tier information. + verdict.metadata.pop("tier", None) + + job.state.metadata = verdict.metadata # TODO(markblee): Parallelize this. for job_name, job in self._active_jobs.items(): diff --git a/axlearn/cloud/common/cleaner.py b/axlearn/cloud/common/cleaner.py index f697889e0..1cfbc530e 100644 --- a/axlearn/cloud/common/cleaner.py +++ b/axlearn/cloud/common/cleaner.py @@ -89,6 +89,6 @@ def sweep(self, jobs: dict[str, JobSpec]) -> Sequence[str]: schedule_result = scheduler.schedule( dict(my_job=job_spec.metadata), ) - if schedule_result.job_verdicts[job_spec.metadata.project_id]["my_job"].over_limits: + if schedule_result.job_verdicts["my_job"].over_limits: result.append(job_name) return result