Skip to content

Commit

Permalink
Fixes bastion.py and cleaner.py.
Browse files Browse the repository at this point in the history
  • Loading branch information
ruomingp committed Dec 8, 2024
1 parent 177c934 commit e4a97d5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 51 deletions.
103 changes: 53 additions & 50 deletions axlearn/cloud/common/bastion.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,11 @@ def _append_to_project_history(
):
now = datetime.now(timezone.utc)
for project_id, limits in schedule_results.project_limits.items():
job_verdicts = schedule_results.job_verdicts.get(project_id, {})
job_verdicts = {
job_id: verdict
for job_id, verdict in schedule_results.job_verdicts.items()
if jobs[job_id].project_id == project_id
}
verdicts = []
for job_id, verdict in job_verdicts.items():
verdicts.append((job_id, verdict.should_run(), verdict.metadata))
Expand Down Expand Up @@ -1076,56 +1080,55 @@ def _update_jobs(self):
verbosity=schedule_options["verbosity"],
)
self._append_to_project_history(schedulable_jobs, schedule_results)
for verdicts in schedule_results.job_verdicts.values():
for job_name, verdict in verdicts.items():
job = self._active_jobs[job_name]
assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE}

if verdict:
old_tier = job.state.metadata.get("tier")
new_tier = verdict.metadata.get("tier")
changed_tiers = old_tier != new_tier

jobspec_changed = job.state.metadata.get("updated")

# Jobspec changed, trigger a restart of the runner.
if jobspec_changed:
self._append_to_job_history(
job,
msg="UPDATING: Detected updated jobspec. Will restart the runner "
"by sending to PENDING state",
state=JobLifecycleState.UPDATING,
)
job.state.status = JobStatus.PENDING
elif job.state.status == JobStatus.PENDING or not changed_tiers:
# Resume if not running, or keep running if scheduling tier did not change.
job.state.status = JobStatus.ACTIVE
else:
# Job changed scheduling tiers, and must be restarted on the new tier.
# NOTE: this can possibly lead to thrashing of jobs that frequently switch
# tiers. One option is track per-job tier changes and hold off on promoting
# low priority to high priority if it was demoted recently.
# TODO(markblee): Add instrumentation to track frequency of tier changes to
# see whether this is necessary.
assert job.state.status == JobStatus.ACTIVE and changed_tiers
self._append_to_job_history(
job,
msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}",
state=JobLifecycleState.RESCHEDULING,
)
job.state.status = JobStatus.PENDING
for job_name, verdict in schedule_results.job_verdicts.items():
job = self._active_jobs[job_name]
assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE}

if verdict:
old_tier = job.state.metadata.get("tier")
new_tier = verdict.metadata.get("tier")
changed_tiers = old_tier != new_tier

jobspec_changed = job.state.metadata.get("updated")

# Jobspec changed, trigger a restart of the runner.
if jobspec_changed:
self._append_to_job_history(
job,
msg="UPDATING: Detected updated jobspec. Will restart the runner "
"by sending to PENDING state",
state=JobLifecycleState.UPDATING,
)
job.state.status = JobStatus.PENDING
elif job.state.status == JobStatus.PENDING or not changed_tiers:
# Resume if not running, or keep running if scheduling tier did not change.
job.state.status = JobStatus.ACTIVE
else:
# Pre-empt/stay queued.
if job.command_proc is not None and _is_proc_complete(job.command_proc):
# As a slight optimization, we avoid pre-empting ACTIVE jobs that are
# complete, since we can directly transition to CLEANING.
job.state.status = JobStatus.ACTIVE
else:
job.state.status = JobStatus.PENDING
# Pending jobs which are not rescheduled should have no tier information.
verdict.metadata.pop("tier", None)

job.state.metadata = verdict.metadata
# Job changed scheduling tiers, and must be restarted on the new tier.
# NOTE: this can possibly lead to thrashing of jobs that frequently switch
# tiers. One option is track per-job tier changes and hold off on promoting
# low priority to high priority if it was demoted recently.
# TODO(markblee): Add instrumentation to track frequency of tier changes to
# see whether this is necessary.
assert job.state.status == JobStatus.ACTIVE and changed_tiers
self._append_to_job_history(
job,
msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}",
state=JobLifecycleState.RESCHEDULING,
)
job.state.status = JobStatus.PENDING
else:
# Pre-empt/stay queued.
if job.command_proc is not None and _is_proc_complete(job.command_proc):
# As a slight optimization, we avoid pre-empting ACTIVE jobs that are
# complete, since we can directly transition to CLEANING.
job.state.status = JobStatus.ACTIVE
else:
job.state.status = JobStatus.PENDING
# Pending jobs which are not rescheduled should have no tier information.
verdict.metadata.pop("tier", None)

job.state.metadata = verdict.metadata

# TODO(markblee): Parallelize this.
for job_name, job in self._active_jobs.items():
Expand Down
2 changes: 1 addition & 1 deletion axlearn/cloud/common/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ def sweep(self, jobs: dict[str, JobSpec]) -> Sequence[str]:
schedule_result = scheduler.schedule(
dict(my_job=job_spec.metadata),
)
if schedule_result.job_verdicts[job_spec.metadata.project_id]["my_job"].over_limits:
if schedule_result.job_verdicts["my_job"].over_limits:
result.append(job_name)
return result

0 comments on commit e4a97d5

Please sign in to comment.