From a5b465016e11167deb01c7cc47f8a6d2e196ee2e Mon Sep 17 00:00:00 2001 From: koparasy Date: Mon, 18 Dec 2023 03:50:11 -0800 Subject: [PATCH] Use inheritance to re-use code. --- src/AMSWorkflow/ams_wf/ams_deploy.py | 40 ++++++++++++---------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/AMSWorkflow/ams_wf/ams_deploy.py b/src/AMSWorkflow/ams_wf/ams_deploy.py index 9ada1950..08ef4427 100644 --- a/src/AMSWorkflow/ams_wf/ams_deploy.py +++ b/src/AMSWorkflow/ams_wf/ams_deploy.py @@ -74,7 +74,17 @@ def name(self): return self._name -class SequentialExecutor: +class AMSJobScheduler: + def __init__(self, stager_job_generator, config): + self._flux_handle = flux.Flux() + logger.debug("Preparing user app job specification") + self._user_app = JobSpec("user_app", config["user_app"], exclusive=True) + self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True) + self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True) + self._stager = JobSpec("ams_stager", stager_job_generator(config), exclusive=True) + + +class AMSSequentialJobScheduler(AMSJobScheduler): def __init__(self, config): def create_fs_stager_job_descr(user_descr): config = dict() @@ -97,7 +107,7 @@ def create_fs_stager_job_descr(user_descr): "--class", user_descr["stager"]["pruner_class"], "--load", - "./build_borax/examples/prune.py", + user_descr["stager"]["pruner_path"], ] + user_descr["stager"]["pruner_args"] config["resources"] = { @@ -110,17 +120,7 @@ def create_fs_stager_job_descr(user_descr): return config - self._flux_handle = flux.Flux() - logger.debug("Preparing user app job specification") - self._user_app = JobSpec("user_app", config["user_app"], exclusive=True) - self._ml_train = JobSpec("ml_training", config["ml_training"], exclusive=True) - self._ml_pruner = JobSpec("ml_pruner", config["ml_pruner"], exclusive=True) - self._stager = JobSpec("ams_stager", create_fs_stager_job_descr(config), exclusive=True) - - # logger.debug("Preparing ml sub selection specification") - # self._ml_subselect = JobSpec("ml_subselect", config["ml_subselect"]) - # Build the pruning module - # TODO Add pruner stage here + super().__init__(config, create_fs_stager_job_descr) def execute(self): def execute_and_wait(job_descr, handle): @@ -134,16 +134,10 @@ def execute_and_wait(job_descr, handle): return False return True - if not execute_and_wait(self._user_app, self._flux_handle): - return False - - if not execute_and_wait(self._stager, self._flux_handle): - return False - if not execute_and_wait(self._ml_pruner, self._flux_handle): - return False + for step in [self._user_app, self._stager, self._ml_pruner, self._ml_train]: + if not execute_and_wait(step, self._flux_handle): + return False - if not execute_and_wait(self._ml_train, self._flux_handle): - return False return True @@ -161,7 +155,7 @@ def deploy(config): # TODO Launch concurrent execution pass elif config["execution_mode"] == "sequential": - executor = SequentialExecutor(config) + executor = AMSSequentialJobScheduler(config) return executor.execute()