diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..e6de5ec --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,24 @@ +Copyright © 2019, Christian O'Reilly +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..6fa6f9d --- /dev/null +++ b/README.rst @@ -0,0 +1,49 @@ +.. -*- mode: rst -*- + +SLURm Pipelines in PYthon (SLURPPY) +=================================== + +SLURPPY is a light-weight Python package to manage computationally-intensive processing +pipelines using SLURM through Python. + +This software is not currently aimed for distribution because it is still unstable and +in designing phases. However, this code is Open-Source and can therefore be used and re-used +at your own convenience. + +Licensing +^^^^^^^^^ + +SLURPPY is **BSD-licenced** (3 clause): + + This software is OSI Certified Open Source Software. + OSI Certified is a certification mark of the Open Source Initiative. + + Copyright (c) 2011-2019, authors of MNE-Python. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the names of MNE-Python authors nor the names of any + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + **This software is provided by the copyright holders and contributors + "as is" and any express or implied warranties, including, but not + limited to, the implied warranties of merchantability and fitness for + a particular purpose are disclaimed. In no event shall the copyright + owner or contributors be liable for any direct, indirect, incidental, + special, exemplary, or consequential damages (including, but not + limited to, procurement of substitute goods or services; loss of use, + data, or profits; or business interruption) however caused and on any + theory of liability, whether in contract, strict liability, or tort + (including negligence or otherwise) arising in any way out of the use + of this software, even if advised of the possibility of such + damage.** diff --git a/configs/default.yaml b/configs/default.yaml new file mode 100644 index 0000000..3748653 --- /dev/null +++ b/configs/default.yaml @@ -0,0 +1,13 @@ +slurm: + default: + mem_per_cpu: "10G" + ntask: 1 + time: "6:00:00" + small: + mem_per_cpu: "1G" + ntask: 1 + time: "0:10:00" + +analysis: {} + +paths: {} diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5aef279 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +description-file = README.rst diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5d40108 --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup, find_packages + +setup( + name='slurppy', + version='0.0.1', + url='https://github.com/christian-oreilly/slurppy.git', + author="Christian O'Reilly", + author_email='christian.oreilly@gmail.com', + description='Light-weight package to manage computationally-intensive processing pipelines using SLURM.', + packages=find_packages(), + install_requires=["matplotlib", "numpy", "pyyaml", "jinja2", "blockdiag", "pytest"], +) diff --git a/slurppy/__init__.py b/slurppy/__init__.py new file mode 100644 index 0000000..1672f71 --- /dev/null +++ b/slurppy/__init__.py @@ -0,0 +1,5 @@ +from .pipeline import Pipeline +from .processingstep import ProcessingStep +from .job import Job +from .campaign import Campaign +from .config import Config diff --git a/slurppy/campaign.py b/slurppy/campaign.py new file mode 100644 index 0000000..6969a6f --- /dev/null +++ b/slurppy/campaign.py @@ -0,0 +1,226 @@ +import numpy as np +from collections import OrderedDict +import pickle +from pathlib import Path +import subprocess +import matplotlib.pyplot as plt +import matplotlib.image as mpimg + +from .job import Job +from .config import Config + + +class Campaign: + def __init__(self, small=False, resume=True, config=None, name="campaign", + pipeline=None, job_dependencies=None, verbose=True): + self.small = small + self.resume = resume + self.jobs = OrderedDict() + self.verbose = verbose + self.name = name + self._pipeline = pipeline + + if job_dependencies is None: + self.job_dependencies = {} + else: + self.job_dependencies = job_dependencies + + self.config = config + + output_path = Path() + if "paths" in self.config: + if "output_root" in self.config["paths"]: + output_path = Path(self.config["paths"]["output_root"]) + self.file_name = (output_path / self.name).with_suffix(".cpg") + + @property + def config(self): + return self._config + + @config.setter + def config(self, config): + + if isinstance(config, str): + config_paths = [Path(config)] + load_default = False + elif isinstance(config, Path): + config_paths = [config] + load_default = False + elif isinstance(config, (Config, dict)) or config is None: + config_paths = [] + load_default = True + else: + raise TypeError("Received a config object of an unrecognized type ({}).".format(type(config))) + + self._config = Config.get_config(config_paths=config_paths, load_default=load_default) + + if isinstance(config, (Config, dict)): + self._config.update(config) + + if self.pipeline is not None: + self.pipeline.config = self._config + + assert(self._config is not None) + + + @property + def pipeline(self): + return self._pipeline + + @pipeline.setter + def pipeline(self, pipeline): + self._pipeline = pipeline + self._pipeline.config = self.config + + def set_workflow(self, pipeline=None, job_dependencies=None, verbose=None): + if pipeline is None: + return + + self.pipeline = pipeline + self.pipeline.config = self.config + + if verbose is None: + verbose = self.verbose + + if job_dependencies is not None: + self.job_dependencies = job_dependencies + + for job_name, processing_step in self.pipeline.items(): + + if job_name in self.job_dependencies: + for job_dependency in self.job_dependencies[job_name]: + if job_dependency not in self.jobs: + raise ValueError(job_name + " depends on " + job_dependency + + ", but no such job has been ran.") + processing_step.dep_after = [self.jobs[job_dependency].job_ids + for job_dependency in self.job_dependencies[job_name]] + + self.jobs[job_name] = Job(job_name=job_name, small=self.small, + resume=self.resume, config=self.config, + verbose=verbose, processing_step=processing_step) + + def run(self, include=None, exclude=None, test=False, verbose=None): + if include is not None: + exclude = [job_name for job_name in self.step_names if job_name not in include] + elif exclude is None: + exclude = [] + + for job_name in self.jobs: + if job_name not in exclude: + self.jobs[job_name].run(verbose=verbose, test=test) + + def cancel(self): + for job in self.jobs.values(): + job.cancel() + + def get_status(self): + return {job.name: job.get_status() for job in self.jobs.values()} + + def print_status(self): + for job in self.jobs.values(): + print("=" * int(np.ceil((100 - len(job.name) - 1) / 2.0)) + " " + job.name + " " + + "=" * int(np.floor((100 - len(job.name) - 1) / 2.0))) + job.print_status() + print(" ") + + def print_log(self, job_id, tail=None, head=None): + for job in self.jobs.values(): + if job_id in job.job_ids.values(): + keys = list(job.job_ids.keys()) + values = list(job.job_ids.values()) + with open(job.file_names_log[keys[values.index(job_id)]], "r") as log_file: + log_text = log_file.read() + if head is not None: + print("\n".join(log_text.split("\n")[:head])) + elif tail is not None: + print("\n".join(log_text.split("\n")[-tail:])) + else: + print(log_text) + return + + def print_script(self, job_id): + for job in self.jobs.values(): + if job_id in job.job_ids.values(): + keys = list(job.job_ids.keys()) + values = list(job.job_ids.values()) + with open(job.file_names_slurm[keys[values.index(job_id)]], "r") as slurm_file: + print(slurm_file.read()) + return + + def relaunch_job(self, job_id, dep_sup=None): + for job in self.jobs.values(): + for job_key, id_ in job.job_ids.items(): + if job_id == id_: + new_id = job.run_a_job(job_key, dep_sup=dep_sup) + if new_id is not None: + print("Job {} launched.".format(new_id)) + + def print_job_id_info(self, job_id): + for job in self.jobs.values(): + for job_key, id_ in job.job_ids.items(): + if job_id == id_: + print("#"*45 + " JOB INFO " + "#"*45) + print("job name:", job.specific_names[job_key]) + print("job key:", job_key) + print("job id:", job_id) + print("job depends on ids:", job.get_dependency_ids(job_key)) + print("#"*100) + + def _check_file_name_(self, file_name): + if file_name is not None: + if isinstance(file_name, str): + file_name = Path(file_name) + self.file_name = file_name + + def save(self, file_name=None): + self._check_file_name_(file_name) + + with self.file_name.open("wb") as f: + pickle.dump(self, f) + print("Saving campaign as {}.".format(self.file_name)) + + def load(self, file_name=None): + self._check_file_name_(file_name) + with self.file_name.open("rb") as f: + self = pickle.load(f) + + return self + + def load_or_run(self, rerun=False, file_name=None, **run_kwargs): + """ + This commands load the campaign if it already exists (i.e., if its the file pointed to by + file_name or self.file_name exists) and if rerun is False. Else, it run it and save it. + :param rerun: Specify whether the campaign should be rerun if it already exists. + :param file_name: Path where to save the campaign to or load the campaign from. + :param run_kwargs: Arguments to be passed to the self.run() method. + :return: None + """ + + if rerun: + self.run(**run_kwargs) + self.save(file_name) + return + + try: + self.load(file_name) + except IOError: + self.run(**run_kwargs) + self.save(file_name) + + def show_workflow(self): + block_test = "blockdiag {\n" + + for children in list(self.job_dependencies.keys()): + for parent in self.job_dependencies[children]: + block_test += " {} -> {};\n".format(parent, children) + block_test += "}\n" + + with open("diagram", "w") as f: + f.write(block_test) + + subprocess.check_output(["blockdiag", "--size=2000x2000", "diagram"]) + + fig, axes = plt.subplots(1, 1, figsize=(15, 15)) + img = mpimg.imread('diagram.png') + axes.imshow(img) + plt.axis('off') diff --git a/slurppy/config.py b/slurppy/config.py new file mode 100644 index 0000000..4d457df --- /dev/null +++ b/slurppy/config.py @@ -0,0 +1,108 @@ +from yaml import load, dump +from pathlib import Path +import collections + +# python 3.8+ compatibility +try: + collectionsAbc = collections.abc +except ImportError: + collectionsAbc = collections + +try: + from yaml import CLoader as Loader, CDumper as Dumper +except ImportError: + from yaml import Loader, Dumper + +# Recursive updates. Default dictionnary update is not recursive, which +# cause dict within dict to be simply overwritten rather than merged +def update(d, u): + for k, v in u.items(): + dv = d.get(k, {}) + if not isinstance(dv, collectionsAbc.Mapping): + d[k] = v + elif isinstance(v, collectionsAbc.Mapping): + d[k] = update(dv, v) + else: + d[k] = v + return d + + +def join(loader, tag_suffix, node): + seq = loader.construct_sequence(node) + return ''.join([str(i) for i in seq]) + + +class SlurppyLoader(Loader): + pass + + +SlurppyLoader.add_multi_constructor("!join", join) + + +class Config(collections.MutableMapping): + + def __init__(self, *args, **kwargs): + self.store = dict() + self.update(dict(*args, **kwargs)) + + def __getitem__(self, key): + return self.store[key] + + def __setitem__(self, key, value): + self.store[key] = value + + def __delitem__(self, key): + del self.store[key] + + def __iter__(self): + return iter(self.store) + + def __len__(self): + return len(self.store) + + def __repr__(self): + return self.store.__repr__() + + def __contains__(self, item): + return item in self.store + + def update(self, u): + update(self.store, u) + + @staticmethod + def perso_default_path(): + return Path.home() / ".slurppy_config.yaml" + + @staticmethod + def default_path(): + perso_default = Config.perso_default_path() + if perso_default.exists(): + return perso_default + return Path(__file__).parent.parent / "configs" / "default.yaml" + + @staticmethod + def get_config(config_paths=(), load_default=True): + self = Config() + + if load_default: + self.path = Config.default_path() + with self.path.open('r') as stream: + self.update(load(stream, Loader=SlurppyLoader)) + + if isinstance(config_paths, str): + config_paths = [config_paths] + for config_path in config_paths: + with Path(config_path).open('r') as stream: + config_supp = load(stream, Loader=SlurppyLoader) + if config_supp is not None: + self.update(config_supp) + + return self + + def save(self, path=None): + if path is None: + path = Config.default_path() + if isinstance(path, str): + path = Path(path) + with path.open("w") as stream: + dump(self.store, stream, Dumper=Dumper) diff --git a/slurppy/job.py b/slurppy/job.py new file mode 100644 index 0000000..0a0bc45 --- /dev/null +++ b/slurppy/job.py @@ -0,0 +1,240 @@ + +import subprocess +import sys +import numpy as np +from itertools import product +from collections import OrderedDict +from jinja2 import Template +from warnings import warn +from pathlib import Path + +from .config import Config + + +def yn_choice(message, default='y'): + choices = 'Y/n' if default.lower() in ('y', 'yes') else 'y/N' + choice = input("%s (%s) " % (message, choices)) + values = ('y', 'yes', '') if choices == 'Y/n' else ('y', 'yes') + return choice.strip().lower() in values + + +class KeyDict(OrderedDict): + + def __repr__(self): + return "{" + ", ".join(["{}: {}".format(key, val) for key, val in self.items()]) + "}" + + def __hash__(self): + return hash(tuple(sorted(self.items()))) + + def __contains__(self, item): + for (key, value) in zip(item.keys(), item.values()): + if (key, value) not in list(zip(self.keys(), self.values())): + return False + return True + + +class Job: + + def __init__(self, job_name, processing_step, fct_kwargs=None, small=False, resume=True, + verbose=True, **kwargs): + + self.name = job_name + self.small = small + self.resume = resume + self.verbose = verbose + self.processing_step = processing_step + + for key, val in kwargs.items(): + setattr(self, key, val) + + if fct_kwargs is None: + self.fct_kwargs = {} + else: + self.fct_kwargs = fct_kwargs + + self.job_ids = {} + self.specific_names = {} + self.file_names_slurm = {} + self.file_names_log = {} + self.properties = OrderedDict() + + self.generate_batch_scripts() + + @staticmethod + def format_args(key, val): + if isinstance(val, str): + return "{}='{}'".format(key, val) + else: + return "{}={}".format(key, val) + + def _check_config_(self): + config = self.processing_step.config + + config_changed = False + if "email" not in config["slurm"]: + config["slurm"]["email"] = "" + if "send_emails" not in config["slurm"]: + config["slurm"]["send_emails"] = False + if "account" not in config["slurm"]: + config["slurm"]["account"] = input("Enter a SLURM account: ") + config_changed = True + if "venv_path" not in config["paths"]: + config["paths"]["venv_path"] = str(Path(sys.executable).parent) + warn("Your configuration file do not contain the key ['paths']['venv_path']. Defaulting to {}" + .format(config["paths"]["venv_path"])) + config_changed = True + if config_changed: + if yn_choice("You changed your configuration. Do you want to save the changes?", default='y'): + if yn_choice("Save as the default configuration file (i.e., {})?".format(Config.perso_default_path()), + default='y'): + config.save(Config.perso_default_path()) + else: + path = input("Enter the path where to save this new configuration file:") + config.save(path) + + def generate_batch_scripts(self): + self.properties = self.processing_step.get_loop_properties() + + for item in product(*self.properties.values()): + job_key = KeyDict(((key, val) for key, val in zip(self.properties.keys(), item))) + + specific_job_name = "_".join(np.concatenate(([self.name], item))) + self.specific_names[job_key] = specific_job_name + + # Formatting the python command + kwargs = self.fct_kwargs.copy() + kwargs["small"] = self.small + kwargs["resume"] = self.resume + if self.processing_step.config is not None: + kwargs["config"] = self.processing_step.config + kwargs.update(job_key) + arg_str = [self.format_args(key, val) for key, val in kwargs.items()] + python_code = 'from {module} import {fct}; {fct}({kwd})'.format(module=self.processing_step.import_module, + fct=self.processing_step.fct_str, + kwd=", ".join(arg_str)) + + command = 'python -c "{}"'.format(python_code) + + self._check_config_() + config = self.processing_step.config + if self.name in config["slurm"]: + config_task_root = config["slurm"][self.name] + elif hasattr(self, "slurm_config_key"): + config_task_root = config["slurm"][self.slurm_config_key] + elif "default" in config["slurm"]: + config_task_root = config["slurm"]["default"] + warn("Using config key ['slurm']['default'] for {} since not entry was found for this task." + .format(self.name)) + else: + config_task_root = {} + warn("Using no SLURM configuration dictionnary for {} because no entry was found for this task and" + + " there is no key ['slurm']['default'] currently defined in the configuration file used." + .format(self.name)) + + if self.small: + config_task_root = config_task_root["small"] + + if "slurm_dir" in config["paths"]: + slurm_path = Path(config["paths"]["slurm_dir"]) + else: + slurm_path = Path() + slurm_path = (slurm_path / specific_job_name).with_suffix(".sh") + + if "log_dir" in config["paths"]: + log_path = Path(config["paths"]["log_dir"]) + else: + log_path = Path() + log_path = (log_path / specific_job_name).with_suffix(".log") + + template_path = Path(__file__).parent.parent / "templates" / "slurm_template.jinja" + with template_path.open("r") as file_jinja: + jinja_template = file_jinja.read() + + kwargs = {"job_name": specific_job_name, + "email": config["slurm"]["email"], + "send_emails": config["slurm"]["send_emails"], + "file_name_log": str(log_path), + "account": config["slurm"]["account"], + "venv_path": config["paths"]["venv_path"], + "command": command} + kwargs.update(config_task_root) + + slurm_script = Template(jinja_template).render(**kwargs) + + if self.verbose: + print("Saving ", slurm_path) + + log_path.parent.mkdir(parents=True, exist_ok=True) + slurm_path.parent.mkdir(parents=True, exist_ok=True) + with slurm_path.open("w") as file_slurm: + file_slurm.write(slurm_script) + + self.file_names_slurm[job_key] = slurm_path + self.file_names_log[job_key] = log_path + + def run_a_job(self, job_key, verbose=None, test=False, dep_sup=None): + args = ["sbatch"] + dep_ids = self.processing_step.get_dependency_ids(job_key) + if len(dep_ids): + args.append("--dependency=afterok:" + ":".join(np.array(dep_ids, dtype=str))) + if dep_sup is not None: + if isinstance(dep_sup, (str, int)): + args.append("--dependency=afterok:{}".format(dep_sup)) + elif isinstance(dep_sup, list): + args.append("--dependency=afterok:" + ":".join(np.array(dep_sup, dtype=str))) + elif isinstance(dep_sup, dict): + for key, val in dep_sup.items(): + args.append("--dependency={}:{}".format(key, val)) + else: + raise TypeError + args.append(str(self.file_names_slurm[job_key])) + + if verbose: + print(" ".join(args)) + + if test: + print(" ".join(args)) + self.job_ids[job_key] = 99999999 + else: + res = subprocess.check_output(args).strip() + + if verbose: + print(res.decode(), file=sys.stdout) + + if not res.startswith(b"Submitted batch"): + warn("There has been an error launching the job {}.".format(job_key)) + return + + self.job_ids[job_key] = int(res.split()[-1]) + return self.job_ids[job_key] + + def run(self, verbose=None, test=False): + if verbose is None: + verbose = self.verbose + + for item in product(*self.properties.values()): + job_key = KeyDict(((key, val) for key, val in zip(self.properties.keys(), item))) + self.run_a_job(job_key, verbose=verbose, test=test) + + def cancel(self): + for job_id in self.job_ids.values(): + subprocess.check_output(["scancel", str(job_id)]) + + @staticmethod + def _get_job_status(job_id): + command = ["sacct", "-j", str(job_id), "--state=BF,CA,CD,DL,F,NF,OOM,PD,PR,R,RQ,RS,RV,S,TO", + "-X", "--format=State"] + result = subprocess.check_output(command).decode().split() + if len(result) >= 3: + return result[2] + else: + return "UNKNOWN" + + def get_status(self): + return {self._get_job_status(job_id) for job_id in self.job_ids.values()} + + def print_status(self): + for job_key in self.job_ids: + print('{:<15}{:<70}{:<15}'.format(self.job_ids[job_key], + self.specific_names[job_key], + self._get_job_status(self.job_ids[job_key]))) diff --git a/slurppy/pipeline.py b/slurppy/pipeline.py new file mode 100644 index 0000000..82f6818 --- /dev/null +++ b/slurppy/pipeline.py @@ -0,0 +1,59 @@ +from collections import OrderedDict +from pathlib import Path +import pickle + + +class Pipeline: + def __init__(self, name="slurppy_pipeline"): + self.processing_steps = OrderedDict() + self._config = {} + self.name = name + + self.file_name = None + + @property + def config(self): + return self._config + + @config.setter + def config(self, config): + if config is None: + return + + self._config = config + for step in self.processing_steps.values(): + step.config = config + + def add_step(self, processing_step): + self.processing_steps[processing_step.name] = processing_step + self.processing_steps[processing_step.name].config = self.config + + def items(self): + return self.processing_steps.items() + + @property + def step_names(self): + return list(self.processing_steps.keys()) + + def _check_file_name_(self, file_name): + if file_name is None: + if self.file_name is None: + self.file_name = (Path(self.config["paths"]["output_root"]) / self.name).with_suffix(".pln") + else: + if isinstance(file_name, str): + file_name = Path(file_name) + self.file_name = file_name + + def save(self, file_name=None): + self._check_file_name_(file_name) + + with self.file_name.open("wb") as f: + pickle.dump(self, f) + print("Saving campaign as {}.".format(self.file_name)) + + def load(self, file_name=None): + self._check_file_name_(file_name) + with self.file_name.open("rb") as f: + self = pickle.load(f) + + return self \ No newline at end of file diff --git a/slurppy/processingstep.py b/slurppy/processingstep.py new file mode 100644 index 0000000..e68f473 --- /dev/null +++ b/slurppy/processingstep.py @@ -0,0 +1,97 @@ +import numpy as np +from collections import OrderedDict + + +class ProcessingStep: + + def __init__(self, name, candidate_properties, fct_str, import_module, constraints=()): + self.name = name + self._config = {"analysis": {}} + + if candidate_properties is not None: + self.candidate_properties = sorted(candidate_properties) + else: + self.candidate_properties = None + + self.fct_str = fct_str + self.import_module = import_module + self.constraints = constraints + self.dep_after = [] + + @property + def config(self): + return self._config + + @config.setter + def config(self, config): + if config is None: + return + + self._config = config + if "analysis" not in self._config: + self.config["analysis"] = {} + if self._config["analysis"] is None: + self.config["analysis"] = {} + + if self.candidate_properties is None: + dep_after_properties = [] + if self.dep_after is not None: + for dependency in self.dep_after: + dep_after_properties.extend(list(list(dependency.keys())[0].keys())) + dep_after_properties = np.unique(dep_after_properties) + + self.candidate_properties = np.unique(np.concatenate((dep_after_properties, + self._config["analysis"]))) + + #@property + #def config_path(self): + # if self._config is None: + # return None + # return self._config.path + + def get_dependency_ids(self, job_key): + dep_job_ids = [] + if self.dep_after is not None: + for dependency in self.dep_after: + dep_job_ids.extend([dep_job_id for key_dep, dep_job_id in dependency.items() + if key_dep in job_key]) + return dep_job_ids + + def get_loop_properties(self): + + properties = OrderedDict() + + dep_after_key0 = [] + if self.dep_after is not None: + for dependency in self.dep_after: + if len(dependency): + dep_after_key0.extend(list(dependency.keys())[0]) + dep_after_key0 = np.unique(dep_after_key0).tolist() + + for property_name in self.candidate_properties: + + # If candidate properties present in the dep_after, take its values from there + if property_name in dep_after_key0: + property_values = [] + for dependency in self.dep_after: + for dep_after_key in dependency.keys(): + property_values.append(dep_after_key[property_name]) + if property_name in properties: + assert (np.all(properties[property_name] == np.unique(property_values))) + else: + properties[property_name] = np.unique(property_values) + + else: + if property_name in self.config["analysis"]: + # if len(self.config["analysis"][property_name]) > 1: + properties[property_name] = self.config["analysis"][property_name] + + for property_name in properties: + if property_name in self.constraints: + if isinstance(self.constraints[property_name], str): + self.constraints[property_name] = [self.constraints[property_name]] + filtered_property_values = [p for p in properties[property_name] + if p in self.constraints[property_name]] + properties[property_name] = filtered_property_values + + return properties diff --git a/slurppy/tests/__init__.py b/slurppy/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slurppy/tests/test_campaign.py b/slurppy/tests/test_campaign.py new file mode 100644 index 0000000..84ca7bc --- /dev/null +++ b/slurppy/tests/test_campaign.py @@ -0,0 +1,23 @@ +from pathlib import Path +from slurppy import Pipeline, ProcessingStep, Campaign +import pytest +from shutil import which + + +#@pytest.mark.dependency(depends=['slurppy/slurppy/tests/test_pipeline.py::test_pipeline_add_step'], scope="session") +def test_pipeline_add_step(): + path = Path(__file__).parent / "test_artifacts" / "pipeline.pln" + pipeline = Pipeline().load(path) + + campaign = Campaign(name="test_campaign") + + # To avoid input asked to the used during tests + if "account" not in campaign.config["slurm"]: + campaign.config["slurm"]["account"] = "dummy_account" + if "venv_path" not in campaign.config["paths"]: + campaign.config["paths"]["venv_path"] = "dummy_venv" + + campaign.set_workflow(pipeline, job_dependencies={}) + campaign.show_workflow() + campaign.load_or_run(rerun=True, test=(which('sbatch') is None)) + #campaign.print_status() diff --git a/slurppy/tests/test_pipeline.py b/slurppy/tests/test_pipeline.py new file mode 100644 index 0000000..4e4b746 --- /dev/null +++ b/slurppy/tests/test_pipeline.py @@ -0,0 +1,23 @@ +from pathlib import Path +from slurppy import Pipeline, ProcessingStep +import pytest + + +def dummy_func(dummy_arg): + print(dummy_arg) + + +@pytest.mark.dependency(scope="session") +def test_pipeline_add_step(): + pipeline = Pipeline() + + pipeline.add_step(ProcessingStep( + name="test_dummy_func", + candidate_properties=["dummy_arg"], + fct_str="dummy_func", + import_module="slurppy.tests.test_pipeline" + )) + + path = Path(__file__).parent / "test_artifacts" / "pipeline.pln" + path.parent.mkdir(parents=True, exist_ok=True) + pipeline.save(path) diff --git a/templates/slurm_template.jinja b/templates/slurm_template.jinja new file mode 100644 index 0000000..fd64ac3 --- /dev/null +++ b/templates/slurm_template.jinja @@ -0,0 +1,29 @@ +#!/cvmfs/soft.computecanada.ca/nix/var/nix/profiles/16.09/bin/sh +# +#SBATCH --job-name={{job_name}} +#SBATCH --output={{file_name_log}} +{%- if send_emails is sameas true -%} +#SBATCH --mail-type=END,FAIL # Mail events (NONE, BEGIN, END, FAIL, ALL) +#SBATCH --mail-user={{email}} +{%- endif -%} +{%- if ntask is defined -%} +#SBATCH --ntasks={{ntask}} +{%- else -%} +#SBATCH --ntasks=1 +{%- endif -%} +{%- if nodes is defined -%} +#SBATCH --nodes={{nodes}} +{%- else -%} +#SBATCH --nodes=1 +{%- endif -%} +{%- if cpus_per_task is defined -%} +#SBATCH --cpus-per-task={{cpus_per_task}} +{%- endif -%} +#SBATCH --time={{time}} +{%- if mem_per_cpu is defined -%} +#SBATCH --mem-per-cpu={{mem_per_cpu}} +{%- endif -%} +#SBATCH --account={{account}} + +source {{venv_path}} +{{command}}