-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Use StorageManager
in execute_DAG
#258
base: main
Are you sure you want to change the base?
Changes from 74 commits
ffdb901
4ad7fb1
eb19e0f
8e6a78c
3be13c0
dda02e2
472151e
b692c2f
7432ff4
319d0d0
5650609
ddbbd19
c5ce48a
236b263
b805aac
ed5e83c
6181039
1880f73
1e4ca2c
a6d26f3
aabbc33
b4d73b3
7af006e
58a58bc
8e429f5
b70df48
08e3ac2
ca7871b
2aa0616
7c03dcd
383075e
6365398
d35bd60
7cc10f9
ab025f1
cd70ab2
ac1b1d0
ea054be
90f2597
a2e05b2
80eccc4
e9ed7a8
b4e1d42
2b070ca
4fd4a66
5e56461
73d3a1e
524cc6e
dd1b6dc
a575dd3
24d3820
0c973b7
9260a2d
d97fca4
a01feaf
ba6fcff
11810eb
908329c
d8507d4
a659afa
32b1fe4
5d0df5f
1418aee
e057332
bdb37bb
e39a790
1cfe910
78e003b
265e786
006d787
ce12326
9afeb2f
b3fea47
2420f6c
fe64baa
288e6cc
bc1f36f
aaa2aab
cf60d1b
d0af5cf
4861b3e
c0a37b3
3d7ca7b
529b8a6
c412611
bf49c34
10a40eb
4b2510c
3390bb0
da9955e
b72e6f9
409ec06
b22445b
8534d71
01db5b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,106 @@ | ||
The GUFE Storage System | ||
======================= | ||
|
||
Storage lifetimes | ||
----------------- | ||
|
||
Storage docs. | ||
The storage system in GUFE is heavily tied to the GUFE protocol system. The | ||
key concept here is that the different levels of the GUFE protocol system; | ||
campaign, DAG, and unit; inherently imply different lifetimes for the data | ||
that is created. Those different lifetimes define the stages of the GUFE | ||
storage system. | ||
|
||
In an abstract sense, as used by protocol developers, these three levels | ||
correspond to three lifetimes of data: | ||
|
||
* ``scratch``: This is temporary data that is only needed for the lifetime | ||
of a :class:`.ProtocolUnit`. This data is not guaranteed to be available | ||
beyond the single :class:`.ProtocolUnit` where it is created, but may be | ||
reused within that :class:`.ProtocolUnit`. | ||
* ``shared``: This is data that is shared between different units in a | ||
:class:`.ProtocolDAG`. For example, a single equilibration stage might be | ||
shared between multiple production runs. The output snapshot of the | ||
equilibration would be suitable for as something to put in ``shared`` | ||
data. This data is guaranteed to be present from when it is created until | ||
the end of the :class:`.ProtocolDAG`, but is not guaranteed to exist after | ||
the :class:`.ProtocolDAG` terminates. | ||
* ``permanent``: This is the data that will be needed beyond the scope of a | ||
single rough estimate of the calculation. This could include anything that | ||
an extension of the simulation would require, or things that require | ||
network-scale analysis. Anything stored here will be usable after the | ||
calculation has completed. | ||
|
||
The ``scratch`` area is always a local directory. However, ``shared`` and | ||
``permanent`` can be external (remote) resources, using the | ||
:class:`.ExternalResource` API. | ||
|
||
As a practical matter, the GUFE storage system can be handled with a | ||
:class:`.StorageManager`. This automates some aspects of the transfer | ||
between stages of the GUFE storage system, and simplifies the API for | ||
protocol authors. In detail, this provides protocol authors with | ||
``PathLike`` objects for ``scratch``, ``shared``, and ``permanent``. All | ||
three of these objects actually point to special subdirectories of the | ||
local scratch space for a specific unit, but are managed by context | ||
managers at the executor level, which handle the process of moving objects | ||
from local staging directories to the actual ``shared`` and ``permanent`` | ||
locations, which can be external resources. | ||
|
||
|
||
External resource utilities | ||
--------------------------- | ||
|
||
For flexible data storage, GUFE defines the :class:`.ExternalResource` API, | ||
which allows data be stored/loaded in a way that is agnostic to the | ||
underlying data store, as long as the store can be represented as a | ||
key-value store. Withing GUFE, we provide several examples, including | ||
:class:`.FileStorage` and :class:`.MemoryStorage` (primarily useful for | ||
testing.) The specific ``shared`` and ``permanent`` resources, as provided | ||
to the executor, can be instances of an :class:`.ExternalResource`. | ||
|
||
.. note:: | ||
|
||
The ``shared`` space must be a resource where an uploaded object is | ||
instantaneously available, otherwise later protocol units may fail if the | ||
shared result is unavailable. This means that files or approaches based | ||
on ``scp`` or ``sftp`` are fine, but things like cloud storage, where the | ||
existence of a new document may take time to propagate through the | ||
network, are not recommended for ``shared``. | ||
|
||
|
||
Details: Manangement of the Storage Lifetime | ||
-------------------------------------------- | ||
|
||
The concepts of the storage lifetimes are important for protocol authors, | ||
but details of implementation are left to the specific executor. In order to | ||
facilitate correct treatment of the storage lifecycle, GUFE provides a few | ||
helpers. The information in this section is mostly of interest to authors of | ||
executors. The helpers are: | ||
|
||
* :class:`.StorageManager`: This is the overall façade interface for | ||
interacting with the rest of the storage lifecycle tools. It provides two | ||
methods to generate context managers; one for the :class:`.ProtocolDAG` | ||
level of the lifecycle, and one for the :class:`.ProtocoUnit` level of the | ||
lifecycle. This class is designed for the use case that the entire DAG is | ||
run in serial within a single process. Subclasses of this can be created | ||
for other execution architectures, where the main logic changes would be | ||
in the methods that return those context managers. | ||
* :class:`.StagingRegistry`: This handles the logic around staging paths | ||
within a :class:`.ProtocolUnit`. Think of this as an abstract | ||
representation of a local directory. Paths within it register with it, and | ||
it handles deletion of the temporary local files when not needed, as well | ||
as the download of remote files when necessary for reading. There are two | ||
important subclasses of this: :class:`.SharedStaging` for a ``shared`` | ||
resource, and :class:`.PermanentStaging` for a ``permanent`` resource. | ||
* :class:`.StagingPath`: This represents a file within the | ||
:class:`.StagingRegistry`. It contains both the key (label) used in the | ||
key-value store, as well as the actual local path to the file. When its | ||
``__fspath__`` method is called, it registers itself with its | ||
:class:`.StagingRegistry`, which handles managing it over its lifecycle. | ||
|
||
In practice, the executor uses the :class:`.StorageManager` to create a | ||
:class:`.DAGContextManager` at the level of a DAG, and then uses the | ||
:class:`.DAGContextManager` to create a context to run a unit. That context | ||
creates a :class:`.SharedStaging` and a :class:`.PermanentStaging` | ||
associated with the specific unit. Those staging directories, with the | ||
scratch directory, are provided to the :class:`.ProtocolUnit`, so that | ||
these are the only objects protocol authors need to interact with. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,13 @@ | |
ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context | ||
) | ||
|
||
from ..storage.storagemanager import StorageManager | ||
from ..storage.externalresource.filestorage import FileStorage | ||
from ..storage.externalresource.base import ExternalStorage | ||
|
||
import logging | ||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class DAGMixin: | ||
_protocol_units: list[ProtocolUnit] | ||
|
@@ -345,9 +352,67 @@ def _from_dict(cls, dct: dict): | |
return cls(**dct) | ||
|
||
|
||
class ReproduceOldBehaviorStorageManager(StorageManager): | ||
# Default behavior has scratch at {dag_label}/scratch/{unit_label} and | ||
# shared at {dag_label}/{unit_label}. This little class makes changes | ||
# that get us back to the original behavior of this class: scratch at | ||
# {dag_label}/scratch_{unit_label} and shared at | ||
# {dag_label}/shared_{unit_label}. | ||
def _scratch_loc(self, dag_label, unit_label, attempt): | ||
return ( | ||
self.scratch_root | ||
/ f"{dag_label}/scratch_{unit_label}_attempt_{attempt}" | ||
) | ||
|
||
def make_label(self, dag_label, unit_label, attempt): | ||
return f"{dag_label}/shared_{unit_label}_attempt_{attempt}" | ||
|
||
@classmethod | ||
def from_old_args( | ||
cls, | ||
shared_basedir: PathLike, | ||
scratch_basedir: PathLike, *, | ||
keep_shared: bool = False, | ||
keep_scratch: bool = False, | ||
): | ||
""" | ||
Create an new storage manager based on the old execute_DAG args. | ||
|
||
Parameters | ||
---------- | ||
shared_basedir : Path | ||
Filesystem path to use for shared space that persists across whole DAG | ||
execution. Used by a `ProtocolUnit` to pass file contents to dependent | ||
class:``ProtocolUnit`` instances. | ||
scratch_basedir : Path | ||
Filesystem path to use for `ProtocolUnit` `scratch` space. | ||
keep_shared : bool | ||
If True, don't remove shared directories for `ProtocolUnit`s after | ||
the `ProtocolDAG` is executed. | ||
keep_scratch : bool | ||
If True, don't remove scratch directories for a `ProtocolUnit` after | ||
it is executed. | ||
""" | ||
# doing this here makes it easier to test than putting in | ||
# execute_DAG | ||
shared_basedir = Path(shared_basedir) | ||
shared = FileStorage(shared_basedir.parent) | ||
storage_manager = cls( | ||
scratch_root=scratch_basedir, | ||
shared_root=shared, | ||
permanent_root=shared, | ||
keep_scratch=keep_scratch, | ||
keep_shared=keep_shared, | ||
keep_staging=True, | ||
keep_empty_dirs=True, | ||
staging=Path(""), # use the actual directories as the staging | ||
) | ||
return storage_manager | ||
|
||
|
||
def execute_DAG(protocoldag: ProtocolDAG, *, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you keeping this for comparison for the PR? I don't think we need to keep the old behaviour around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we told Joe that the API was stable enough that he could use it in workflows. So yeah, I spent a lot of time ensuring that there would be a code path that doesn't break the current API. |
||
shared_basedir: Path, | ||
scratch_basedir: Path, | ||
shared_basedir: PathLike, | ||
scratch_basedir: PathLike, | ||
keep_shared: bool = False, | ||
keep_scratch: bool = False, | ||
raise_error: bool = True, | ||
|
@@ -385,52 +450,92 @@ def execute_DAG(protocoldag: ProtocolDAG, *, | |
The result of executing the `ProtocolDAG`. | ||
|
||
""" | ||
# the directory given as shared_root is actually the directory for this | ||
# DAG; the "shared_root" for the storage manager is the parent. We'll | ||
# force permanent to be the same. | ||
storage_manager = ReproduceOldBehaviorStorageManager.from_old_args( | ||
shared_basedir=shared_basedir, | ||
scratch_basedir=scratch_basedir, | ||
keep_shared=keep_shared, | ||
keep_scratch=keep_scratch | ||
) | ||
dag_label = shared_basedir.name | ||
return new_execute_DAG(protocoldag, dag_label, storage_manager, | ||
raise_error, n_retries) | ||
|
||
|
||
def new_execute_DAG( # TODO: this is a terrible name | ||
protocoldag: ProtocolDAG, | ||
dag_label: str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is a DAG label now required? and why can't the label just be taken off the input DAG? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaks current behavior, where the equivalent of the DAG label is given by the user as the |
||
storage_manager: StorageManager, | ||
raise_error: bool = False, | ||
n_retries: int = 0 | ||
) -> ProtocolDAGResult: | ||
""" | ||
Locally execute a full :class:`ProtocolDAG` in serial and in-process. | ||
|
||
Alternate input signature to generalize execute_DAG | ||
|
||
Parameters | ||
---------- | ||
protocoldag : ProtocolDAG | ||
The :class:``ProtocolDAG`` to execute. | ||
dag_label : str | ||
Label to use for the DAG | ||
storage_manager : StorageManager | ||
The :class:`.StorageManager` to handle storing files. | ||
raise_error : bool | ||
If True, raise an exception if a ProtocolUnit fails, default True | ||
if False, any exceptions will be stored as `ProtocolUnitFailure` | ||
objects inside the returned `ProtocolDAGResult` | ||
n_retries : int | ||
the number of times to attempt, default 0, i.e. try once and only once | ||
|
||
Returns | ||
------- | ||
ProtocolDAGResult | ||
The result of executing the `ProtocolDAG`. | ||
""" | ||
# this simplifies setup of execute_DAG by allowing you to directly | ||
# provide the storage_manager; the extra options in the old one just | ||
# configure the storage_manager | ||
if n_retries < 0: | ||
raise ValueError("Must give positive number of retries") | ||
|
||
# iterate in DAG order | ||
results: dict[GufeKey, ProtocolUnitResult] = {} | ||
all_results = [] # successes AND failures | ||
shared_paths = [] | ||
for unit in protocoldag.protocol_units: | ||
# translate each `ProtocolUnit` in input into corresponding | ||
# `ProtocolUnitResult` | ||
inputs = _pu_to_pur(unit.inputs, results) | ||
|
||
attempt = 0 | ||
while attempt <= n_retries: | ||
shared = shared_basedir / f'shared_{str(unit.key)}_attempt_{attempt}' | ||
shared_paths.append(shared) | ||
shared.mkdir() | ||
|
||
scratch = scratch_basedir / f'scratch_{str(unit.key)}_attempt_{attempt}' | ||
scratch.mkdir() | ||
|
||
context = Context(shared=shared, | ||
scratch=scratch) | ||
|
||
# execute | ||
result = unit.execute( | ||
context=context, | ||
raise_error=raise_error, | ||
**inputs) | ||
all_results.append(result) | ||
|
||
if not keep_scratch: | ||
shutil.rmtree(scratch) | ||
|
||
if result.ok(): | ||
# attach result to this `ProtocolUnit` | ||
results[unit.key] = result | ||
break | ||
attempt += 1 | ||
|
||
if not result.ok(): | ||
break | ||
|
||
if not keep_shared: | ||
for shared_path in shared_paths: | ||
shutil.rmtree(shared_path) | ||
with storage_manager.running_dag(dag_label) as dag_ctx: | ||
for unit in protocoldag.protocol_units: | ||
# import pdb; pdb.set_trace() | ||
attempt = 0 | ||
while attempt <= n_retries: | ||
# translate each `ProtocolUnit` in input into corresponding | ||
# `ProtocolUnitResult` | ||
inputs = _pu_to_pur(unit.inputs, results) | ||
|
||
label = storage_manager.make_label(dag_label, unit.key, | ||
attempt=attempt) | ||
with dag_ctx.running_unit( | ||
dag_label, unit.key, attempt=attempt | ||
) as context: | ||
_logger.info("Starting unit {label}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing an f-string thingy here |
||
_logger.info(context) | ||
result = unit.execute( | ||
context=context, | ||
raise_error=raise_error, | ||
**inputs) | ||
all_results.append(result) | ||
|
||
if result.ok(): | ||
# attach result to this `ProtocolUnit` | ||
results[unit.key] = result | ||
break | ||
attempt += 1 | ||
|
||
if not result.ok(): | ||
break | ||
|
||
return ProtocolDAGResult( | ||
name=protocoldag.name, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
GufeTokenizable, GufeKey, TOKENIZABLE_REGISTRY | ||
) | ||
|
||
from ..storage.stagingregistry import StagingRegistry | ||
|
||
|
||
@dataclass | ||
class Context: | ||
|
@@ -31,7 +33,8 @@ class Context: | |
|
||
""" | ||
scratch: PathLike | ||
shared: PathLike | ||
shared: StagingRegistry | ||
permanent: StagingRegistry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these are meant to be stagingpaths not registries? |
||
|
||
|
||
def _list_dependencies(inputs, cls): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we fix the namespace of
storage
so these are all available at the same level?from ..storage import StorageManager, FileStorage, etc