Skip to content

Commit

Permalink
Use inheritance to re-use code.
Browse files Browse the repository at this point in the history
  • Loading branch information
koparasy committed Dec 18, 2023
1 parent af888e8 commit aa35f04
Showing 1 changed file with 17 additions and 23 deletions.
40 changes: 17 additions & 23 deletions src/AMSWorkflow/ams_wf/ams_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,17 @@ def name(self):
return self._name


class SequentialExecutor:
class AMSJobScheduler:
def __init__(self, stager_job_generator, config):
self._flux_handle = flux.Flux()
logger.debug("Preparing user app job specification")
self._user_app = JobSpec("user_app", config["user_app"], exclusive=True)
self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True)
self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True)
self._stager = JobSpec("ams_stager", stager_job_generator(config), exclusive=True)


class AMSSequentialJobScheduler(AMSJobScheduler):
def __init__(self, config):
def create_fs_stager_job_descr(user_descr):
config = dict()
Expand All @@ -97,7 +107,7 @@ def create_fs_stager_job_descr(user_descr):
"--class",
user_descr["stager"]["pruner_class"],
"--load",
"./build_borax/examples/prune.py",
user_descr["stager"]["pruner_path"],
] + user_descr["stager"]["pruner_args"]

config["resources"] = {
Expand All @@ -110,17 +120,7 @@ def create_fs_stager_job_descr(user_descr):

return config

self._flux_handle = flux.Flux()
logger.debug("Preparing user app job specification")
self._user_app = JobSpec("user_app", config["user_app"], exclusive=True)
self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True)
self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True)
self._stager = JobSpec("ams_stager", create_fs_stager_job_descr(config), exclusive=True)

# logger.debug("Preparing ml sub selection specification")
# self._ml_subselect = JobSpec("ml_subselect", config["ml_subselect"])
# Build the pruning module
# TODO Add pruner stage here
super().__init__(config, create_fs_stager_job_descr)

def execute(self):
def execute_and_wait(job_descr, handle):
Expand All @@ -134,16 +134,10 @@ def execute_and_wait(job_descr, handle):
return False
return True

if not execute_and_wait(self._user_app, self._flux_handle):
return False

if not execute_and_wait(self._stager, self._flux_handle):
return False
if not execute_and_wait(self._ml_pruner, self._flux_handle):
return False
for step in [self._user_app, self._stager, self._ml_pruner, self._ml_train]:
if not execute_and_wait(step, self._flux_handle):
return False

if not execute_and_wait(self._ml_train, self._flux_handle):
return False
return True


Expand All @@ -161,7 +155,7 @@ def deploy(config):
# TODO Launch concurrent execution
pass
elif config["execution_mode"] == "sequential":
executor = SequentialExecutor(config)
executor = AMSSequentialJobScheduler(config)
return executor.execute()


Expand Down

0 comments on commit aa35f04

Please sign in to comment.