-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* added list-outputs command * Added option to get workflow level outputs or task level outputs * Added option to print json summary and text * add integration test_list_outputs.py draft * Added options to utility_test_functions.py run cromshell function * Added function to confirm results from cromwell outputs endpoint contain outputs else throws an error. * Added variable to hold workflow id in cromshellconfig.py * add check of outputs for detailed list-outputs option --------- Co-authored-by: bshifaw <[email protected]>
- Loading branch information
Showing
17 changed files
with
843 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
import logging | ||
|
||
import click | ||
import requests | ||
|
||
import cromshell.utilities.http_utils as http_utils | ||
import cromshell.utilities.io_utils as io_utils | ||
from cromshell.metadata import command as metadata_command | ||
from cromshell.utilities import command_setup_utils | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
@click.command(name="list-outputs") | ||
@click.argument("workflow_ids", required=True, nargs=-1) | ||
@click.option( | ||
"-d", | ||
"--detailed", | ||
is_flag=True, | ||
default=False, | ||
help="Get the output for a workflow at the task level", | ||
) | ||
@click.option( | ||
"-j", | ||
"--json-summary", | ||
is_flag=True, | ||
default=False, | ||
help="Print a json summary of outputs, including non-file types.", | ||
) | ||
@click.pass_obj | ||
def main(config, workflow_ids, detailed, json_summary): | ||
"""List all output files produced by a workflow.""" | ||
|
||
LOGGER.info("list-outputs") | ||
|
||
return_code = 0 | ||
|
||
for workflow_id in workflow_ids: | ||
command_setup_utils.resolve_workflow_id_and_server( | ||
workflow_id=workflow_id, cromshell_config=config | ||
) | ||
|
||
if not detailed: | ||
workflow_outputs = get_workflow_level_outputs(config).get("outputs") | ||
|
||
if json_summary: | ||
io_utils.pretty_print_json(format_json=workflow_outputs) | ||
else: | ||
print_file_like_value_in_dict( | ||
outputs_metadata=workflow_outputs, | ||
indent=False, | ||
) | ||
else: | ||
task_outputs = get_task_level_outputs(config) | ||
|
||
if json_summary: | ||
io_utils.pretty_print_json(format_json=task_outputs) | ||
else: | ||
print_task_level_outputs(task_outputs) | ||
|
||
return return_code | ||
|
||
|
||
def get_workflow_level_outputs(config) -> dict: | ||
"""Get the workflow level outputs from the workflow outputs | ||
Args: | ||
config (dict): The cromshell config object | ||
""" | ||
|
||
requests_out = requests.get( | ||
f"{config.cromwell_api_workflow_id}/outputs", | ||
timeout=config.requests_connect_timeout, | ||
verify=config.requests_verify_certs, | ||
headers=http_utils.generate_headers(config), | ||
) | ||
|
||
if requests_out.ok: | ||
check_for_empty_output(requests_out.json().get("outputs"), config.workflow_id) | ||
return requests_out.json() | ||
else: | ||
http_utils.check_http_request_status_code( | ||
short_error_message="Failed to retrieve outputs for " | ||
f"workflow: {config.workflow_id}", | ||
response=requests_out, | ||
# Raising exception is set false to allow | ||
# command to retrieve outputs of remaining workflows. | ||
raise_exception=False, | ||
) | ||
|
||
|
||
def get_task_level_outputs(config) -> dict: | ||
"""Get the task level outputs from the workflow metadata | ||
Args: | ||
config (dict): The cromshell config object | ||
""" | ||
# Get metadata | ||
formatted_metadata_parameter = metadata_command.format_metadata_params( | ||
list_of_keys=config.METADATA_KEYS_TO_OMIT, | ||
exclude_keys=True, | ||
expand_subworkflows=True, | ||
) | ||
|
||
workflow_metadata = metadata_command.get_workflow_metadata( | ||
meta_params=formatted_metadata_parameter, | ||
api_workflow_id=config.cromwell_api_workflow_id, | ||
timeout=config.requests_connect_timeout, | ||
verify_certs=config.requests_verify_certs, | ||
headers=http_utils.generate_headers(config), | ||
) | ||
|
||
return filter_outputs_from_workflow_metadata(workflow_metadata) | ||
|
||
|
||
def filter_outputs_from_workflow_metadata(workflow_metadata: dict) -> dict: | ||
"""Get the outputs from the workflow metadata | ||
Args: | ||
workflow_metadata (dict): The workflow metadata | ||
""" | ||
calls_metadata = workflow_metadata["calls"] | ||
output_metadata = {} | ||
extract_task_key = "outputs" | ||
|
||
for call, index_list in calls_metadata.items(): | ||
if "subWorkflowMetadata" in calls_metadata[call][0]: | ||
output_metadata[call] = [] | ||
for scatter in calls_metadata[call]: | ||
output_metadata[call].append( | ||
filter_outputs_from_workflow_metadata( | ||
scatter["subWorkflowMetadata"] | ||
) | ||
) | ||
else: | ||
output_metadata[call] = [] | ||
for index in index_list: | ||
output_metadata[call].append(index.get(extract_task_key)) | ||
|
||
check_for_empty_output(output_metadata, workflow_metadata["id"]) | ||
|
||
return output_metadata | ||
|
||
|
||
def print_task_level_outputs(output_metadata: dict) -> None: | ||
"""Print the outputs from the workflow metadata | ||
output_metadata: {call_name:[index1{output_name: outputvalue}, index2{...}, ...], call_name:[], ...} | ||
Args: | ||
output_metadata (dict): The output metadata from the workflow | ||
""" | ||
for call, index_list in output_metadata.items(): | ||
print(call) | ||
for call_index in index_list: | ||
if call_index is not None: | ||
print_file_like_value_in_dict(outputs_metadata=call_index, indent=True) | ||
|
||
|
||
def print_file_like_value_in_dict(outputs_metadata: dict, indent: bool) -> None: | ||
"""Print the file like values in the output metadata dictionary | ||
Args: | ||
outputs_metadata (dict): The output metadata | ||
indent (bool): Whether to indent the output | ||
""" | ||
|
||
for output_name, output_value in outputs_metadata.items(): | ||
if isinstance(output_value, str): | ||
print_output_name_and_file(output_name, output_value, indent=indent) | ||
elif isinstance(output_value, list): | ||
for output_value_item in output_value: | ||
print_output_name_and_file( | ||
output_name, output_value_item, indent=indent | ||
) | ||
|
||
|
||
def print_output_name_and_file( | ||
output_name: str, output_value: str, indent: bool = True | ||
) -> None: | ||
"""Print the task name and the file name | ||
Args: | ||
output_name (str): The task output name | ||
output_value (str): The task output value | ||
indent (bool): Whether to indent the output""" | ||
|
||
i = "\t" if indent else "" | ||
|
||
if isinstance(output_value, str): | ||
if is_path_or_url_like(output_value): | ||
print(f"{i}{output_name}: {output_value}") | ||
|
||
|
||
def is_path_or_url_like(in_string: str) -> bool: | ||
"""Check if the string is a path or url | ||
Args: | ||
in_string (str): The string to check for path or url like-ness | ||
""" | ||
if ( | ||
in_string.startswith("gs://") | ||
or in_string.startswith("/") | ||
or in_string.startswith("http://") | ||
or in_string.startswith("https://") | ||
): | ||
return True | ||
else: | ||
return False | ||
|
||
|
||
def check_for_empty_output(workflow_outputs: dict, workflow_id: str) -> None: | ||
"""Check if the workflow outputs are empty | ||
Args: | ||
cromwell_outputs (dict): Dictionary of workflow outputs | ||
:param workflow_id: The workflow id | ||
""" | ||
if not workflow_outputs: | ||
LOGGER.error(f"No outputs found for workflow: {workflow_id}") | ||
raise Exception(f"No outputs found for workflow: {workflow_id}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from tests.integration import utility_test_functions | ||
|
||
workflows_path = Path(__file__).parents[1].joinpath("workflows/") | ||
|
||
|
||
class TestListOutputs: | ||
@pytest.mark.parametrize( | ||
"wdl, json_file, options, output_template", | ||
[ | ||
( | ||
"tests/workflows/helloWorld.wdl", | ||
"tests/workflows/helloWorld.json", | ||
None, | ||
[ | ||
"HelloWorld.output_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout", | ||
"", | ||
], | ||
), | ||
( | ||
"tests/workflows/helloWorld.wdl", | ||
"tests/workflows/helloWorld.json", | ||
["-d"], | ||
[ | ||
"HelloWorld.HelloWorldTask", | ||
"\toutput_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout", | ||
"", | ||
], | ||
), | ||
( | ||
"tests/workflows/helloWorld.wdl", | ||
"tests/workflows/helloWorld.json", | ||
["-j"], | ||
[ | ||
"{", | ||
' "HelloWorld.output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"', | ||
"}", | ||
"", | ||
], | ||
), | ||
( | ||
"tests/workflows/helloWorld.wdl", | ||
"tests/workflows/helloWorld.json", | ||
["-j", "-d"], | ||
[ | ||
"{", | ||
' "HelloWorld.HelloWorldTask": [', | ||
" {", | ||
' "output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"', | ||
" }", | ||
" ]", | ||
"}", | ||
"", | ||
], | ||
), | ||
], | ||
) | ||
def test_list_outputs( | ||
self, | ||
local_cromwell_url: str, | ||
wdl: str, | ||
json_file: str, | ||
options: list, | ||
output_template: list, | ||
ansi_escape, | ||
): | ||
# submit workflow | ||
test_workflow_id = utility_test_functions.submit_workflow( | ||
local_cromwell_url=local_cromwell_url, | ||
wdl=wdl, | ||
json_file=json_file, | ||
exit_code=0, | ||
) | ||
|
||
utility_test_functions.wait_for_workflow_completion( | ||
test_workflow_id=test_workflow_id | ||
) | ||
|
||
# run list-outputs | ||
status_result = utility_test_functions.run_cromshell_command( | ||
command=["list-outputs", test_workflow_id], | ||
exit_code=0, | ||
subcommand_options=options, | ||
) | ||
|
||
status_result_per_line = status_result.stdout.split("\n") | ||
|
||
workflow_outputs = [ | ||
sub.replace("<workflow-id>", test_workflow_id) for sub in output_template | ||
] | ||
|
||
print("Print workflow list-outputs results:") | ||
for line in status_result_per_line: | ||
print(line) | ||
|
||
assert status_result_per_line == workflow_outputs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.