Skip to content

Commit

Permalink
Consider regex match when issuing "unrecognized flow operation warning.
Browse files Browse the repository at this point in the history
  • Loading branch information
joaander committed Feb 2, 2024
1 parent f36bb3a commit 33eec7b
Showing 1 changed file with 19 additions and 27 deletions.
46 changes: 19 additions & 27 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2647,14 +2647,6 @@ def _fetch_status(
only has to occur one time.
"""
if names is not None:
absent_ops = (set(self._groups.keys()) ^ set(names)) & set(names)
if absent_ops:
print(
f"Unrecognized flow operation(s): {', '.join(absent_ops)}",
file=sys.stderr,
)

if status_parallelization not in ("thread", "process", "none"):
raise RuntimeError(
"Configuration value status_parallelization is invalid. "
Expand Down Expand Up @@ -3613,13 +3605,6 @@ def run(
)
if names is None:
names = list(self.operations)
else:
absent_ops = (set(self._groups.keys()) ^ set(names)) & set(names)
if absent_ops:
print(
f"Unrecognized flow operation(s): {', '.join(absent_ops)}",
file=sys.stderr,
)

# Get default directives
default_directives = self._get_default_directives()
Expand Down Expand Up @@ -3699,7 +3684,7 @@ def select(operation):
# Generate _JobOperation instances for selected groups and aggregates.
with self._buffered():
operations = []
run_groups = set(self._gather_executable_flow_groups(names))
run_groups = set(self._gather_executable_flow_groups(names, i_pass))
for (
aggregate_id,
aggregate,
Expand Down Expand Up @@ -3761,7 +3746,7 @@ def key_func_by_job(operation):
operations, pretend=pretend, np=np, timeout=timeout, progress=progress
)

def _gather_selected_flow_groups(self, names=None):
def _gather_selected_flow_groups(self, names=None, i_pass=1):
r"""Grabs :class:`~.FlowGroup`\ s that match any of a set of names.
The provided names can be any regular expression that fully matches a group name.
Expand All @@ -3771,6 +3756,8 @@ def _gather_selected_flow_groups(self, names=None):
names : iterable of :class:`str`
Only select groups that match the provided set of names (interpreted as regular
expressions), or all if the argument is None. (Default value = None)
i_pass : int
The current pass in `run`. Used to print the warning message only once.
Returns
-------
Expand All @@ -3781,6 +3768,7 @@ def _gather_selected_flow_groups(self, names=None):
if names is None:
return list(self._groups.values())
operations = {}
absent_ops = []
for name in names:
if name in operations:
continue
Expand All @@ -3789,11 +3777,21 @@ def _gather_selected_flow_groups(self, names=None):
for group_name, group in self.groups.items()
if re.fullmatch(name, group_name)
]

if i_pass == 1 and len(groups) == 0:
absent_ops.append(name)

for group in groups:
operations[group.name] = group

if len(absent_ops) > 0:
print(
f"Unrecognized flow operation(s): {', '.join(absent_ops)}",
file=sys.stderr,
)
return list(operations.values())

def _gather_executable_flow_groups(self, names=None):
def _gather_executable_flow_groups(self, names=None, i_pass=1):
r"""Grabs immediately executable flow groups that match any given name.
The provided names can be any regular expression that fully match a group name.
Expand All @@ -3809,6 +3807,8 @@ def _gather_executable_flow_groups(self, names=None):
names : iterable of :class:`str`
Only select groups that match the provided set of names (interpreted as regular
expressions), or all singleton groups if the argument is None. (Default value = None)
i_pass : int
The current pass in `run`. Used to print the warning message only once.
Returns
-------
Expand All @@ -3818,7 +3818,7 @@ def _gather_executable_flow_groups(self, names=None):
"""
if names is None:
return [self._groups[op_name] for op_name in self.operations]
operations = self._gather_selected_flow_groups(names)
operations = self._gather_selected_flow_groups(names, i_pass)
# Have to verify no overlap to ensure all returned groups are
# simultaneously executable.
if not FlowProject._verify_group_compatibility(operations):
Expand Down Expand Up @@ -4149,14 +4149,6 @@ def submit(
Additional keyword arguments forwarded to :meth:`~.ComputeEnvironment.submit`.
"""
if names is not None:
absent_ops = (set(self._groups.keys()) ^ set(names)) & set(names)
if absent_ops:
print(
f"Unrecognized flow operation(s): {', '.join(absent_ops)}",
file=sys.stderr,
)

aggregates = self._convert_jobs_to_aggregates(jobs)

# Regular argument checks and expansion
Expand Down

0 comments on commit 33eec7b

Please sign in to comment.