diff --git a/toolcraft/__init__.py b/toolcraft/__init__.py index 8ab6674..901b36a 100644 --- a/toolcraft/__init__.py +++ b/toolcraft/__init__.py @@ -44,21 +44,41 @@ __version__ = "0.1.4a23" +# todo: make sure that dearpygui works and opens in windowed mode on ibm lsf +# todo: improve load time ... have settings module overriden properly +# todo: for each subproject have a prompt to override settings and save in config file only when updated +# while also allow to override in settings module of each subproject +# only when they override the settings with UI only then add to respective config file in project + +import time +_now = time.time() from . import settings +print("settings", time.time() - _now) from . import logger +print("logger", time.time() - _now) from . import error +print("error", time.time() - _now) from . import util +print("util", time.time() - _now) from . import marshalling +print("marshalling", time.time() - _now) from . import parallel +print("parallel", time.time() - _now) from . import storage +print("storage", time.time() - _now) from . import server +print("server", time.time() - _now) from . import job +print("job", time.time() - _now) from . import richy +print("richy", time.time() - _now) from . import texipy +print("texipy", time.time() - _now) try: import dearpygui.dearpygui as _dpg from . import gui + print("gui", time.time() - _now) except ImportError: ... diff --git a/toolcraft/job/__base__.py b/toolcraft/job/__base__.py index e76b7b8..aa32727 100644 --- a/toolcraft/job/__base__.py +++ b/toolcraft/job/__base__.py @@ -3,21 +3,17 @@ """ import abc import datetime -import enum import os import inspect import pathlib -import shutil import typing as t import dataclasses import subprocess import itertools -import rich import yaml import sys import asyncio -import hashlib import types from .. import logger @@ -27,13 +23,15 @@ from .. import storage as s from .. import richy from .. import settings +from ..settings import Settings _now = datetime.datetime.now -try: + +if Settings.USE_NP_TF_KE_PA_MARSHALLING: import tensorflow as tf from tensorflow.python.training.tracking import util as tf_util -except ImportError: +else: tf = None tf_util = None @@ -572,7 +570,7 @@ def path(self) -> s.Path: todo: integrate this with storage with partition_columns ... (not important do only if necessary) """ - _ret = self.runner.cwd + _ret = self.runner.results_dir _ret /= self.method.__func__.__name__ if bool(self.experiment): for _ in self.experiment.group_by: @@ -1068,7 +1066,7 @@ class Monitor: @property @util.CacheResult def path(self) -> s.Path: - _ret = self.runner.cwd / _MONITOR_FOLDER + _ret = self.runner.results_dir / _MONITOR_FOLDER if not _ret.exists(): _ret.mkdir(create_parents=True) return _ret @@ -1396,9 +1394,9 @@ def init(self): @dataclasses.dataclass(frozen=True) @m.RuleChecker( things_to_be_cached=[ - 'cwd', 'flow', 'monitor', 'registered_experiments', + 'cwd', 'results_dir', 'flow', 'monitor', 'registered_experiments', ], - things_not_to_be_overridden=['cwd', 'py_script', 'monitor'], + things_not_to_be_overridden=['cwd', 'results_dir', 'py_script', 'monitor'], # we do not want any fields for Runner class restrict_dataclass_fields_to=[], ) @@ -1459,6 +1457,20 @@ def copy_src_dst(self) -> t.Tuple[str, str]: msgs=["Cannot use copy cli command", "Please implement property copy_src_dst to use copy cli command"] ) + @property + @util.CacheResult + def results_dir(self) -> s.Path: + """ + results dir where results will be stored for this runner + """ + _py_script = self.py_script + _folder_name = _py_script.name.replace(".py", "") + _folder_name += f"_{self.hex_hash[:5]}" + _ret = s.Path(suffix_path=_folder_name, fs_name='RESULTS') + if not _ret.exists(): + _ret.mkdir(create_parents=True) + return _ret + @property @util.CacheResult def cwd(self) -> s.Path: @@ -1466,11 +1478,10 @@ def cwd(self) -> s.Path: todo: adapt code so that the cwd can be on any other file system instead of CWD """ _py_script = self.py_script - _folder_name = _py_script.name.replace(".py", "") - _ret = s.Path(suffix_path=_folder_name, fs_name='CWD') + _ret = s.Path(suffix_path=".", fs_name='CWD') e.code.AssertError( value1=_ret.local_path.absolute().as_posix(), - value2=(_py_script.parent / _folder_name).absolute().as_posix(), + value2=_py_script.parent.absolute().as_posix(), msgs=[ f"This is unexpected ... ", f"The cwd for job runner is {_ret.local_path.absolute().as_posix()}", @@ -1499,15 +1510,14 @@ def init(self): # setup logger import logging # note that this should always be local ... dont use `self.cwd` - _log_file = self.py_script.parent / self.py_script.name.replace(".py", "") / "runner.log" - _log_file.parent.mkdir(parents=True, exist_ok=True) + _log_file = self.results_dir / "runner.log" logger.setup_logging( propagate=False, level=logging.NOTSET, handlers=[ # logger.get_rich_handler(), # logger.get_stream_handler(), - logger.get_file_handler(_log_file), + logger.get_file_handler(_log_file.local_path), ], ) diff --git a/toolcraft/job/cli.py b/toolcraft/job/cli.py index 3596a85..e97cae7 100644 --- a/toolcraft/job/cli.py +++ b/toolcraft/job/cli.py @@ -1,11 +1,15 @@ import enum import os import pathlib +import zipfile import time import psutil import typer +from typing_extensions import Annotated import sys +import os +import shutil import dataclasses import typing as t import subprocess @@ -72,7 +76,13 @@ def nxdi(): @_APP.command(help="Run the job") def run( - job: str = typer.Argument(..., help="Job ID in format or ", show_default=False, ), + job: Annotated[ + str, + typer.Argument( + help="Job ID in format or ", + show_default=False, + ) + ], ): """ Run a job in runner. @@ -261,6 +271,113 @@ def _j_view(_j: Job) -> gui.widget.Widget: gui.Engine.run(_dashboard) +@_APP.command(help="Archive/partition/upload the results folder") +def archive( + part_size: Annotated[int, typer.Option(help="Max part size in MB to break the resulting archive file.")] = None, + transmft: Annotated[bool, typer.Option(help="Upload resulting files to cloud drive and make script to download them.")] = False, +): + + # -------------------------------------------------------------- 01 + # start + _rp = _RUNNER.richy_panel + # validation + if transmft: + if part_size is not None: + raise e.validation.NotAllowed( + msgs=["When using transmft do not supply part_size as we hardcode it to 399MB"] + ) + part_size = 399 + + # -------------------------------------------------------------- 02 + # make archive + _rp.update( + f"archiving results dir {_RUNNER.results_dir.local_path.as_posix()} " + f"{'' if part_size is None else 'and making parts '} ..." + ) + _zip_base_name = _RUNNER.results_dir.name + _cwd = _RUNNER.cwd.local_path.resolve().absolute() + _archive_folder = _RUNNER.results_dir.local_path.parent / f"{_zip_base_name}_archive" + _archive_folder.mkdir() + _big_zip_file = _archive_folder / f"{_zip_base_name}.zip" + _src_dir = _RUNNER.results_dir.local_path.expanduser().resolve(strict=True) + _files_and_folders_to_compress = 0 + for _file in _src_dir.rglob('*'): + _files_and_folders_to_compress += 1 + _rp.update(f"zipping {_files_and_folders_to_compress} items") + _zipping_track = _rp.add_task(task_name="zipping", total=_files_and_folders_to_compress) + with zipfile.ZipFile(_big_zip_file, 'w', zipfile.ZIP_DEFLATED) as _zf: + for _file in _src_dir.rglob('*'): + _zipping_track.update(advance=1) + __file = _file.relative_to(_src_dir.parent) + _rp.update(f"zipping {__file} ...") + _zf.write(_file, __file) + _chapters = 1 + if part_size is not None: + _BUF = 10 * 1024 * 1024 * 1024 # 10GB - max memory buffer size to use for read + _part_size_in_bytes = part_size * 1024 *1024 + _ugly_buf = '' + with open(_big_zip_file, 'rb') as _src: + while True: + _rp.update(f"splitting large zip in part {_chapters}") + _part_file = _big_zip_file.parent / f"{_big_zip_file.name}.{_chapters:03d}" + with open(_part_file, 'wb') as _tgt: + _written = 0 + while _written < _part_size_in_bytes: + if len(_ugly_buf) > 0: + _tgt.write(_ugly_buf) + _tgt.write(_src.read(min(_BUF, _part_size_in_bytes - _written))) + _written += min(_BUF, _part_size_in_bytes - _written) + _ugly_buf = _src.read(1) + if len(_ugly_buf) == 0: + break + if len(_ugly_buf) == 0: + if _chapters == 1: + _part_file.unlink() + break + _chapters += 1 + if _chapters > 1: + _rp.update(f"removing large zip file") + _big_zip_file.unlink() + + # -------------------------------------------------------------- 03 + # look for archives and upload them + if transmft: + _rp.update(f"performing uploads to transcend") + _rp.stop() + if _chapters == 1: + print(f"Uploading file part {_big_zip_file.as_posix()}") + _cmd_tokens = [ + "transmft", "-p", f"{_big_zip_file.as_posix()}", + ] + subprocess.run(_cmd_tokens, shell=False) + elif _chapters > 1: + for _f in _archive_folder.glob(f"{_zip_base_name}.zip.*"): + print(f"Uploading file part {_f.as_posix()}") + _cmd_tokens = [ + "transmft", "-p", f"{_f.as_posix()}", + ] + subprocess.run(_cmd_tokens, shell=False) + else: + raise e.code.ShouldNeverHappen(msgs=[f"unknown value -- {_chapters}"]) + _trans_log_file = _archive_folder / f"trans.log" + shutil.move(_cwd / "trans.log", _trans_log_file) + _trans_file_keys = [ + _.split(" ")[0] for _ in _trans_log_file.read_text().split("\n") if + _ != "" + ] + _ps1_script_file = _archive_folder / f"get.ps1" + _ps1_script_file.write_text( + "\n".join( + [f"transmft -g {_}" for _ in _trans_file_keys] + ) + ) + print("*"*30) + print(_ps1_script_file.read_text()) + print("*"*30) + _rp.start() + _rp.set_final_message(_ps1_script_file.read_text()) + + @_APP.command(help="Copies from server to cwd.") def copy(): """ diff --git a/toolcraft/job/cli_launch.py b/toolcraft/job/cli_launch.py index fd53097..6e06fae 100644 --- a/toolcraft/job/cli_launch.py +++ b/toolcraft/job/cli_launch.py @@ -17,6 +17,7 @@ import time import psutil import typer +from typing_extensions import Annotated import typing as t import subprocess @@ -40,9 +41,9 @@ @_APP.command(help="Launches all the jobs in runner on lsf infrastructure.") def lsf( - email: bool = typer.Option(default=False, help="Set this if you want to receive email after lsf job completion."), - cpus: int = typer.Option(default=None, help="Number of processors to use for lsf job."), - memory: int = typer.Option(default=None, help="Amount of memory to reserve for lsf job."), + email: Annotated[bool, typer.Option(help="Set this if you want to receive email after lsf job completion.")] = False, + cpus: Annotated[int, typer.Option(help="Number of processors to use for lsf job.")] = None, + memory: Annotated[int, typer.Option(help="Amount of memory to reserve for lsf job.")] = None, ): """ @@ -115,7 +116,7 @@ def lsf( @_APP.command(help="Launches all the jobs in runner on local machine.") def local( - single_cpu: bool = typer.Option(default=False, help="Launches on single CPU in sequence (good for debugging)") + single_cpu: Annotated[bool, typer.Option(help="Launches on single CPU in sequence (good for debugging)")] = False ): """ todo: remote linux instances via wsl via ssh https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/WSL.html diff --git a/toolcraft/marshalling.py b/toolcraft/marshalling.py index 198a494..3b5c425 100644 --- a/toolcraft/marshalling.py +++ b/toolcraft/marshalling.py @@ -2,26 +2,17 @@ import dataclasses import datetime import hashlib -import sys import inspect -import time import types -import pathlib -import contextvars import enum -import traceback import typing as t import rich -import asyncio -from rich import progress -from rich import spinner -from rich import columns -import numpy as np -import pyarrow as pa import yaml +import os from . import error as e from . import logger, settings, util +from .settings import Settings if settings.DPG_WORKS: @@ -72,67 +63,74 @@ def __call__(self, fn: t.Callable): # HASH_Suffix.CONFIG = ".hashconfig" # META_Suffix.INFO = ".metainfo" -if settings.TF_KERAS_WORKS: - # Handle serialization for keras loss, optimizer and layer - # noinspection PyUnresolvedReferences + +def _tf_serialize(_data): + """ + Handle serialization for keras loss, optimizer and layer + """ + import keras as ke import tensorflow as tf + if isinstance(_data, dict): + _r = {} + for _k, _v in _data.items(): + _r[_k] = _tf_serialize(_v) + return _r + elif isinstance(_data, ke.losses.Loss): + _data = ke.losses.serialize(_data) + _data['__keras_instance__'] = "loss" + return _data + elif isinstance(_data, ke.layers.Layer): + _data = ke.layers.serialize(_data) + _data['__keras_instance__'] = "layer" + return _data + elif isinstance(_data, ke.optimizers.Optimizer): + _data = ke.optimizers.serialize(_data) + _data['__keras_instance__'] = "optimizer" + return _data + elif isinstance(_data, tf.TensorSpec): + _data = { + "__keras_instance__": "tf_spec", + "shape": _data.shape.as_list(), + "dtype": _data.dtype.name, + "name": _data.name, + } + return _data + else: + return _data + +def _tf_deserialize(_data): + """ + Handle deserialization for keras loss, optimizer and layer + """ - def _tf_serialize(_data): - if isinstance(_data, dict): + import keras as ke + import tensorflow as tf + if isinstance(_data, dict): + if '__keras_instance__' in _data.keys(): + _keras_instance_type = _data['__keras_instance__'] + del _data['__keras_instance__'] + if _keras_instance_type == "loss": + return ke.losses.deserialize(_data, custom_objects=CUSTOM_KERAS_CLASSES_MAP['loss']) + elif _keras_instance_type == "layer": + return ke.layers.deserialize(_data) + elif _keras_instance_type == "optimizer": + return ke.optimizers.deserialize(_data) + elif _keras_instance_type == "tf_spec": + return tf.TensorSpec(**_data) + else: + raise e.code.CodingError( + msgs=[ + f"Unknown keras instance type {_keras_instance_type!r}" + ] + ) + else: _r = {} for _k, _v in _data.items(): - _r[_k] = _tf_serialize(_v) + _r[_k] = _tf_deserialize(_v) return _r - elif isinstance(_data, ke.losses.Loss): - _data = ke.losses.serialize(_data) - _data['__keras_instance__'] = "loss" - return _data - elif isinstance(_data, ke.layers.Layer): - _data = ke.layers.serialize(_data) - _data['__keras_instance__'] = "layer" - return _data - elif isinstance(_data, ke.optimizers.Optimizer): - _data = ke.optimizers.serialize(_data) - _data['__keras_instance__'] = "optimizer" - return _data - elif isinstance(_data, tf.TensorSpec): - _data = { - "__keras_instance__": "tf_spec", - "shape": _data.shape.as_list(), - "dtype": _data.dtype.name, - "name": _data.name, - } - return _data - else: - return _data - - def _tf_deserialize(_data): - if isinstance(_data, dict): - if '__keras_instance__' in _data.keys(): - _keras_instance_type = _data['__keras_instance__'] - del _data['__keras_instance__'] - if _keras_instance_type == "loss": - return ke.losses.deserialize(_data, custom_objects=CUSTOM_KERAS_CLASSES_MAP['loss']) - elif _keras_instance_type == "layer": - return ke.layers.deserialize(_data) - elif _keras_instance_type == "optimizer": - return ke.optimizers.deserialize(_data) - elif _keras_instance_type == "tf_spec": - return tf.TensorSpec(**_data) - else: - raise e.code.CodingError( - msgs=[ - f"Unknown keras instance type {_keras_instance_type!r}" - ] - ) - else: - _r = {} - for _k, _v in _data.items(): - _r[_k] = _tf_deserialize(_v) - return _r - else: - return _data + else: + return _data class _ReadOnlyClass(type): @@ -1420,7 +1418,11 @@ def yaml_tag(cls) -> str: # this is needed as with multiprocessing the main module gets extra __mp_ _module = cls.__module__ if _module in ['__main__', '__mp_main__']: - _module = "_main_" + try: + _script_path = inspect.getfile(cls).split(os.sep) + _module = f"{_script_path[-2]}/{_script_path[-1]}" + except OSError: + _module = "_main_" # return return f"!{_module}:{cls.__name__}" @@ -1937,7 +1939,7 @@ def __post_init__(self): # when converted to dataclass cannot clall __init__ you need to override __post_init__ to call # Tracker.__init__ ... # Temporary workaround is to create fake HashableClass instance once all modules are loaded in your library - if settings.DO_RULE_CHECK: + if Settings.DO_RULE_CHECK: from . import richy _rc_keys = list(_RULE_CHECKERS_TO_BE_CHECKED.keys()) # _modules = [_.decorated_class for _ in _RULE_CHECKERS_TO_BE_CHECKED.values()] @@ -1989,7 +1991,7 @@ def __setstate__(self, state: t.Dict[str, "SUPPORTED_HASHABLE_OBJECTS_TYPE"]): Similar to ... used by pickle deserialization >>> self.from_dict """ - if settings.TF_KERAS_WORKS: + if settings.USE_NP_TF_KE_PA_MARSHALLING: # Handle deserialization for keras loss, optimizer and layer # noinspection PyUnresolvedReferences,PyProtectedMember,PyShadowingNames import keras as ke @@ -2071,7 +2073,7 @@ def as_dict(self) -> t.Dict[str, "SUPPORTED_HASHABLE_OBJECTS_TYPE"]: """ _ret = {} _field_names = self.dataclass_field_names - if settings.TF_KERAS_WORKS: + if Settings.USE_NP_TF_KE_PA_MARSHALLING: for f_name in _field_names: _ret[f_name] = _tf_serialize(getattr(self, f_name)) else: @@ -2084,7 +2086,7 @@ def as_dict(self) -> t.Dict[str, "SUPPORTED_HASHABLE_OBJECTS_TYPE"]: def from_dict( cls, yaml_state: t.Dict[str, "SUPPORTED_HASHABLE_OBJECTS_TYPE"], **kwargs ) -> "HashableClass": - if settings.TF_KERAS_WORKS: + if settings.USE_NP_TF_KE_PA_MARSHALLING: for _n in list(yaml_state.keys()): yaml_state[_n] = _tf_deserialize(yaml_state[_n]) # noinspection PyTypeChecker @@ -2166,18 +2168,20 @@ def check_for_storage_hashable(self, field_key: str = ""): _v.check_for_storage_hashable(field_key=f"{field_key}.{_f}") -if settings.TF_KERAS_WORKS: +if Settings.USE_NP_TF_KE_PA_MARSHALLING: # noinspection PyUnresolvedReferences,PyProtectedMember import keras as ke import tensorflow as tf + import numpy as np + import pyarrow as pa # noinspection PyUnresolvedReferences # from keras.optimizers.optimizer_experimental import \ # optimizer as optimizer_experimental SUPPORTED_HASHABLE_OBJECTS_TYPE = t.Union[ int, float, str, slice, list, dict, tuple, - np.float32, np.int64, np.int32, datetime.datetime, None, FrozenEnum, - HashableClass, pa.Schema, + HashableClass, + np.float32, np.int64, np.int32, pa.Schema, ke.losses.Loss, ke.layers.Layer, ke.optimizers.Optimizer, # optimizer_experimental.Optimizer, @@ -2185,8 +2189,7 @@ def check_for_storage_hashable(self, field_key: str = ""): ] else: SUPPORTED_HASHABLE_OBJECTS_TYPE = t.Union[int, float, str, slice, list, dict, tuple, - np.float32, np.int64, np.int32, datetime.datetime, None, FrozenEnum, - HashableClass, pa.Schema, ] + HashableClass, ] # noinspection PyUnresolvedReferences SUPPORTED_HASHABLE_OBJECTS = SUPPORTED_HASHABLE_OBJECTS_TYPE.__args__ diff --git a/toolcraft/settings.py b/toolcraft/settings.py index 42a9070..835f17b 100644 --- a/toolcraft/settings.py +++ b/toolcraft/settings.py @@ -12,7 +12,6 @@ # noinspection PyUnresolvedReferences,PyCompatibility import __main__ as main -DO_RULE_CHECK = True ENV_DIR = pathlib.Path(sys.exec_prefix) @@ -40,14 +39,6 @@ # detect if in interactive mode INTERACTIVE = not hasattr(main, '__file__') -LOGGER_USE_FILE_HANDLER = False - -try: - import keras as ke - import tensorflow as tf - TF_KERAS_WORKS = True -except ImportError: - TF_KERAS_WORKS = False # make config @@ -60,7 +51,7 @@ TC_CONFIG = toml.load(TC_CONFIG_FILE.as_posix()) -class FileHash: +class Settings: # time interval between to check hashes on disk # note that this is a list ... any one of the values in list will be picked # for determining if to do hash check or not ... this distributes the hash @@ -74,3 +65,8 @@ class FileHash: # consecutive runs DEBUG_HASHABLE_STATE = False + + DO_RULE_CHECK = False + LOGGER_USE_FILE_HANDLER = False + USE_NP_TF_KE_PA_MARSHALLING = False + diff --git a/toolcraft/storage/file_group.py b/toolcraft/storage/file_group.py index 229f18c..36dd916 100644 --- a/toolcraft/storage/file_group.py +++ b/toolcraft/storage/file_group.py @@ -101,7 +101,7 @@ def check_interval_choice(self) -> int: in different results ... this ensures that for instance the randomness is consistent ... """ - return random.choice(settings.FileHash.CHECK_INTERVALS_IN_SEC) + return random.choice(settings.Settings.CHECK_INTERVALS_IN_SEC) @property def periodic_check_needed(self) -> bool: @@ -595,7 +595,7 @@ def on_enter(self): # if config.DEBUG_HASHABLE_STATE we will create files two times # to confirm if states are consistent and hence it will help us to # debug DEBUG_HASHABLE_STATE - if settings.FileHash.DEBUG_HASHABLE_STATE: + if settings.Settings.DEBUG_HASHABLE_STATE: _info_backup_path = self.info.backup_path _config_backup_path = self.config.backup_path _info_backup_exists = _info_backup_path.exists() @@ -1128,7 +1128,7 @@ def delete(self, *, force: bool = False) -> t.Any: # ---------------------------------------------------------------01 _rp = self.richy_panel - if settings.FileHash.DEBUG_HASHABLE_STATE: + if settings.Settings.DEBUG_HASHABLE_STATE: # if config.DEBUG_HASHABLE_STATE we know what we are doing ... we # are debugging and there will be one time programmatically delete # so set the response automatically for FileGroup diff --git a/toolcraft/storage/state.py b/toolcraft/storage/state.py index 4e5feb4..2e9f8dc 100644 --- a/toolcraft/storage/state.py +++ b/toolcraft/storage/state.py @@ -68,7 +68,7 @@ def is_available(self) -> bool: @util.CacheResult def backup_path(self) -> Path: e.code.AssertError( - value1=settings.FileHash.DEBUG_HASHABLE_STATE, value2=True, + value1=settings.Settings.DEBUG_HASHABLE_STATE, value2=True, msgs=[ f"This property can be used only when you have configured " f"`config.DEBUG_HASHABLE_STATE=True`"