Skip to content

Commit

Permalink
✅ Add test for resource inventory error message
Browse files Browse the repository at this point in the history
  • Loading branch information
shnizzedy committed Jan 13, 2025
1 parent 962a1e5 commit 4cdff4e
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CPAC/_entrypoints/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def run_main():
args.data_config_file, args.participant_label, args.aws_input_creds
)
sub_list = sub_list_filter_by_labels(
sub_list, {"T1w": args.T1w_label, "bold": args.bold_label}
list(sub_list), {"T1w": args.T1w_label, "bold": args.bold_label}
)

# C-PAC only handles single anatomical images (for now)
Expand Down
2 changes: 1 addition & 1 deletion CPAC/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
def bids_examples(tmp_path_factory: TempPathFactory) -> Path:
"""Get the BIDS examples dataset."""
example_dir = tmp_path_factory.mktemp("bids-examples")
if not example_dir.exists():
if not example_dir.exists() or not any(example_dir.iterdir()):
Repo.clone_from(
"https://github.com/bids-standard/bids-examples.git", str(example_dir)
)
Expand Down
2 changes: 1 addition & 1 deletion CPAC/pipeline/resource_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def where_to_find(resources: list[str] | str) -> str:
inventory = resource_inventory("CPAC")
output = ""
for resource in resources:
output += f"'{resource}' is output from:\n"
output += f"'{resource}' can be output from:\n"
if resource in inventory:
for source in inventory[resource].output_from:
output += f" {source}\n"
Expand Down
140 changes: 137 additions & 3 deletions CPAC/pipeline/test/test_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Copyright (C) 2021-2025 C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Unit tests for the C-PAC pipeline engine."""

from argparse import Namespace
import os
from pathlib import Path
from typing import cast

from _pytest.logging import LogCaptureFixture
import pytest

from CPAC.pipeline.cpac_pipeline import (
Expand Down Expand Up @@ -138,17 +160,129 @@ def test_build_workflow(pipe_config, bids_dir, test_dir):
wf.run()


def test_missing_resource(
bids_examples: Path, caplog: LogCaptureFixture, tmp_path: Path
) -> None:
"""Test the error message thrown when a resource is missing."""
from datetime import datetime

import yaml

from CPAC.pipeline.cpac_runner import run
from CPAC.utils.bids_utils import sub_list_filter_by_labels
from CPAC.utils.configuration import Preconfiguration, set_subject
from CPAC.utils.configuration.yaml_template import create_yaml_from_template

st = datetime.now().strftime("%Y-%m-%dT%H-%M-%SZ")
namespace = Namespace(
bids_dir=str(bids_examples / "ds113b"),
output_dir=str(tmp_path / "output"),
analysis_level="test_config",
participant_label="sub-01",
)
c = Preconfiguration("anat-only")
c["pipeline_setup", "output_directory", "path"] = namespace.output_dir
c["pipeline_setup", "log_directory", "path"] = str(tmp_path / "logs")
c["pipeline_setup", "working_directory", "path"] = str(tmp_path / "work")
c["pipeline_setup", "system_config", "maximum_memory_per_participant"] = 1.0
c["pipeline_setup", "system_config", "max_cores_per_participant"] = 1
c["pipeline_setup", "system_config", "num_participants_at_once"] = 1
c["pipeline_setup", "system_config", "num_ants_threads"] = 1
c["pipeline_setup", "working_directory", "remove_working_dir"] = True
sub_list = create_cpac_data_config(
namespace.bids_dir,
namespace.participant_label,
None,
True,
only_one_anat=False,
)
sub_list = sub_list_filter_by_labels(list(sub_list), {"T1w": None, "bold": None})
for i, sub in enumerate(sub_list):
if isinstance(sub.get("anat"), dict):
for anat_key in sub["anat"]:
if isinstance(sub["anat"][anat_key], list) and len(
sub["anat"][anat_key]
):
sub_list[i]["anat"][anat_key] = sub["anat"][anat_key][0]
if isinstance(sub.get("anat"), list) and len(sub["anat"]):
sub_list[i]["anat"] = sub["anat"][0]
data_config_file = f"cpac_data_config_{st}.yml"
sublogdirs = [set_subject(sub, c)[2] for sub in sub_list]
# write out the data configuration file
data_config_file = os.path.join(sublogdirs[0], data_config_file)
with open(data_config_file, "w", encoding="utf-8") as _f:
noalias_dumper = yaml.dumper.SafeDumper
noalias_dumper.ignore_aliases = lambda self, data: True
yaml.dump(sub_list, _f, default_flow_style=False, Dumper=noalias_dumper)

# update and write out pipeline config file
pipeline_config_file = os.path.join(sublogdirs[0], f"cpac_pipeline_config_{st}.yml")
with open(pipeline_config_file, "w", encoding="utf-8") as _f:
_f.write(create_yaml_from_template(c))
minimized_config = f"{pipeline_config_file[:-4]}_min.yml"
with open(minimized_config, "w", encoding="utf-8") as _f:
_f.write(create_yaml_from_template(c, import_from="blank"))
for config_file in (data_config_file, pipeline_config_file, minimized_config):
os.chmod(config_file, 0o444) # Make config files readonly

if len(sublogdirs) > 1:
# If more than one run is included in the given data config
# file, an identical copy of the data and pipeline config
# will be included in the log directory for each run
for sublogdir in sublogdirs[1:]:
for config_file in (
data_config_file,
pipeline_config_file,
minimized_config,
):
try:
os.link(config_file, config_file.replace(sublogdirs[0], sublogdir))
except FileExistsError:
pass

run(
data_config_file,
pipeline_config_file,
plugin="Linear",
plugin_args={
"n_procs": int(
cast(
int | str,
c["pipeline_setup", "system_config", "max_cores_per_participant"],
)
),
"memory_gb": int(
cast(
int | str,
c[
"pipeline_setup",
"system_config",
"maximum_memory_per_participant",
],
)
),
"raise_insufficient": c[
"pipeline_setup", "system_config", "raise_insufficient"
],
},
tracking=False,
test_config=namespace.analysis_level == "test_config",
)

assert "can be output from" in caplog.text


# bids_dir = "/Users/steven.giavasis/data/HBN-SI_dataset/rawdata"
# test_dir = "/test_dir"

# cfg = "/Users/hecheng.jin/GitHub/DevBranch/CPAC/resources/configs/pipeline_config_monkey-ABCD.yml"
cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml"
bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis"
test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc"

# test_ingress_func_raw_data(cfg, bids_dir, test_dir)
# test_ingress_anat_raw_data(cfg, bids_dir, test_dir)
# test_ingress_pipeconfig_data(cfg, bids_dir, test_dir)
# test_build_anat_preproc_stack(cfg, bids_dir, test_dir)
if __name__ == "__main__":
cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml"
bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis"
test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc"
test_build_workflow(cfg, bids_dir, test_dir)

0 comments on commit 4cdff4e

Please sign in to comment.