Skip to content

Commit

Permalink
finally sorted out datasinking
Browse files Browse the repository at this point in the history
  • Loading branch information
GalKepler committed Jul 31, 2024
1 parent 174a42b commit c635ab5
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/kepost/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class execution(_Config):
"""Folder where derivatives will be stored."""
run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid4()}"
"""Unique identifier of this particular run."""
participant_label = None
participant_label: list | None = None
"""List of participant identifiers that are to be preprocessed."""
work_dir = Path("work").absolute()
"""Path to a working directory where intermediate results will be available."""
Expand Down
6 changes: 3 additions & 3 deletions src/kepost/interfaces/bids/static/kepost.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@
"[acq-{acquisition}_][ce-{ceagent}_]dir-{direction}[_run-{run}][_space-{space}][_desc-{desc}]_{fmap<epi>}{extension<.json>|.json}",
"task-{task}[_acq-{acquisition}][_rec-{reconstruction}][_run-{run}][_echo-{echo}][_recording-{recording}][_desc-{desc}]_{suffix<events>}{extension<.json>|.json}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/{reconstruction_software<dipy|mrtrix|mrtrix3|fsl|qc>}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}]_{suffix<dwiref|qc>}{extension<.nii|.nii.gz|.json|.csv|.json>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/{reconstruction_software<dipy|mrtrix|mrtrix3|fsl|qc>}[/{subtype<parcellations>/{atlas<atlas>]/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}]_{suffix<dwiref|qc|parc>}{extension<.nii|.nii.gz|.json|.csv|.json>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/{reconstruction_software<dipy|mrtrix|mrtrix3|fsl|qc>}/{subtype<parcellations>}/{atlas<atlas>}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}]_{suffix<dwiref|qc|parc>}{extension<.nii|.nii.gz|.json|.csv|.json>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_space-{space}][_res-{res}][_desc-{desc}][_part-{part}]_{suffix<dwi|dwiref|epiref|mask>}{extension<.bval|.bvec|.b|.json|.nii.gz|.nii>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<anat>|anat}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_ce-{ceagent}][_rec-{reconstruction}][_space-{space}][_res-{res}][_part-{part}]_{suffix<T1w|T2w|T1rho|T1map|T2map|T2star|FLAIR|FLASH|PDmap|PD|PDT2|inplaneT[12]|angio>}{extension<.nii|.nii.gz|.json>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<anat|dwi>|anat}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_ce-{ceagent}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}][_meas-{measure}][_atlas-{atlas}]_{suffix<dseg|probseg|5TT>}{extension<.csv|.tsv|.pickle|.pkl|.nii|.nii.gz|.json|.mif>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<anat|dwi>|anat}/{reconstruction_software<dipy|mrtrix|mrtrix3|fsl|qc>}[/{subtype<parcellations>/{atlas<atlas>]/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_ce-{ceagent}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}][_meas-{measure}][_atlas-{atlas}]_{suffix<parc|dseg|probseg|5TT>}{extension<.csv|.tsv|.pickle|.pkl|.nii|.nii.gz|.json|.mif>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<anat|dwi>|anat}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_ce-{ceagent}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}][_meas-{measure}][_atlas-{atlas}]_{suffix<dseg|probseg|5TT>}{extension<.csv|.tsv|.pickle|.nii|.nii.gz|.json|.mif>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<anat|dwi>|anat}/{reconstruction_software<dipy|mrtrix|mrtrix3|fsl|qc>}[/{subtype<parcellations>}][/{atlas<atlas>}]/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_ce-{ceagent}][_rec-{reconstruction}][_space-{space}][_res-{res}][_desc-{desc}][_label-{label}][_meas-{measure}][_atlas-{atlas}]_{suffix<parc|dseg|probseg|5TT>}{extension<.csv|.tsv|.pickle|.pkl|.nii|.nii.gz|.json|.mif>|.nii.gz}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_rec-{reconstruction}][_space-{space}][_desc-{desc}]_{suffix<tracts>}{extension<.tck|.trk|.fib.gz>|.tck}",
"sub-{subject}[/ses-{session}]/{datatype<dwi>|dwi}/sub-{subject}[_ses-{session}][_acq-{acquisition}][_dir-{direction}][_rec-{reconstruction}][_space-{space}][_desc-{desc}][_label-{label}][_meas-{measure}][_atlas-{atlas}]_{suffix<connectome|assignments>}{extension<.pickle|.csv|.tsv>|.csv}"
]
Expand Down
20 changes: 17 additions & 3 deletions src/kepost/workflows/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from copy import deepcopy
from pathlib import Path

from nipype.interfaces import utility as niu
from nipype.pipeline import engine as pe
from packaging.version import Version
Expand Down Expand Up @@ -38,15 +41,26 @@ def init_kepost_wf():
kepost_wf.base_dir = config.execution.work_dir
for subject_id in config.execution.participant_label:
name = f"single_subject_{subject_id}_wf"
single_subject_wf = init_single_subject_wf(
subject_id=subject_id, atlases=atlases, name=name
)
single_subject_wf = init_single_subject_wf(subject_id=subject_id, name=name)
single_subject_wf.config["execution"]["crashdump_dir"] = str(
config.execution.output_dir
/ f"sub-{subject_id}"
/ "log"
/ config.execution.run_uuid
)
for node in single_subject_wf._get_all_nodes():
node.config = deepcopy(single_subject_wf.config)
kepost_wf.add_nodes([single_subject_wf])
# Dump a copy of the config file into the log directory
log_dir = (
Path(config.execution.output_dir) # type: ignore[operator, attr-defined]
/ f"sub-{subject_id}"
/ "log"
/ config.execution.run_uuid
)
log_dir.mkdir(exist_ok=True, parents=True)
config.to_filename(log_dir / "kepost.toml")
return kepost_wf


def init_single_subject_wf(subject_id: str, name: str):
Expand Down
57 changes: 55 additions & 2 deletions src/kepost/workflows/diffusion/diffusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from kepost.workflows.diffusion.procedures.tensor_estimations.dipy.dipy import (
TENSOR_PARAMETERS as dipy_parameters,
)
from kepost.workflows.diffusion.procedures.tensor_estimations.mrtrix3.mrtrix3 import (
TENSOR_PARAMETERS as mrtrix3_parameters,
)
from kepost.workflows.diffusion.procedures.tensor_estimations.tensor_estimation import (
init_tensor_estimation_wf,
)
Expand Down Expand Up @@ -210,6 +213,9 @@ def init_diffusion_wf(
dipy_parcellations_wf = init_parcellations_wf(
inputs=dipy_parameters, software="dipy"
)
mrtrix3_parcellations_wf = init_parcellations_wf(
inputs=mrtrix3_parameters, software="mrtrix3"
)
workflow.connect(
[
(
Expand All @@ -221,6 +227,15 @@ def init_diffusion_wf(
("atlas_name", "inputnode.atlas_name"),
],
),
(
inputnode,
mrtrix3_parcellations_wf,
[
("base_directory", "inputnode.base_directory"),
("dwi_nifti", "inputnode.source_file"),
("atlas_name", "inputnode.atlas_name"),
],
),
(
tensor_estimation_wf,
dipy_parcellations_wf,
Expand All @@ -229,12 +244,30 @@ def init_diffusion_wf(
for param in dipy_parameters
],
),
(
tensor_estimation_wf,
mrtrix3_parcellations_wf,
[
(f"mrtrix3_tensor_wf.outputnode.{param}", f"inputnode.{param}")
for param in mrtrix3_parameters
],
),
(
tensor_estimation_wf,
dipy_parcellations_wf,
[
(
"gen_acq_label.acq_label",
"outputnode.acq_label",
"inputnode.acq_label",
)
],
),
(
tensor_estimation_wf,
mrtrix3_parcellations_wf,
[
(
"outputnode.acq_label",
"inputnode.acq_label",
)
],
Expand All @@ -254,6 +287,16 @@ def init_diffusion_wf(
),
],
),
(
coregister_wf,
mrtrix3_parcellations_wf,
[
(
"outputnode.gm_cropped_parcellation",
"inputnode.atlas_nifti",
),
],
),
]
)
else:
Expand All @@ -268,7 +311,17 @@ def init_diffusion_wf(
"inputnode.atlas_nifti",
),
],
)
),
(
coregister_wf,
mrtrix3_parcellations_wf,
[
(
"outputnode.whole_brain_parcellation",
"inputnode.atlas_nifti",
),
],
),
]
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from nipype.interfaces import utility as niu
from nipype.pipeline import engine as pe

from kepost.interfaces.bids import DerivativesDataSink
from kepost.workflows.diffusion.procedures.utils.derivatives import (
DIFFUSION_WF_OUTPUT_ENTITIES,
)


def parcellate_all_measures(in_file: str, atlas_nifti: str):
"""
Parcellate the brain using a specific atlas
Parameters
----------
in_file : str
The input file
atlas_name : str
The atlas name
"""
import os

import pandas as pd
from bids.layout import parse_file_entities

from kepost.atlases.utils import get_atlas_properties, parcellate
from kepost.workflows.diffusion.procedures.parcellations.available_measures import (
AVAILABLE_MEASURES,
)

atlas_name = parse_file_entities(atlas_nifti)["atlas"]
_, description, region_col, index_col = get_atlas_properties(atlas_name)
df = pd.read_csv(description, index_col=index_col).copy()
for measure_name, measure_func in AVAILABLE_MEASURES.items():
df[measure_name] = parcellate(
atlas_description=description,
index_col=index_col,
atlas_nifti=atlas_nifti,
region_col=region_col,
metric_image=in_file,
measure=measure_func, # type: ignore[arg-type]
)["value"]
out_file = f"{os.getcwd()}/parcellations.pkl"
df.to_pickle(out_file)
return out_file, atlas_name


def init_parcellations_wf(
inputs: list, software: str, name: str = "parcellations_wf"
) -> pe.Workflow:
"""
Workflow to parcellate the brain
"""
workflow = pe.Workflow(name=f"{software}_{name}")
inputnode = pe.Node(
niu.IdentityInterface(
fields=[
"base_directory",
"acq_label",
"source_file",
"atlas_name",
"atlas_nifti",
]
+ inputs
),
name="inputnode",
)

parcellate_node = pe.MapNode(
niu.Function(
input_names=["in_file", "atlas_nifti"],
output_names=["out_file", "atlas_name"],
function=parcellate_all_measures,
),
name="parcellate_node",
iterfield=["in_file"],
)
listify_inputs_node = pe.Node(
niu.Merge(len(inputs)),
name="listify_inputs_node",
)
ds_parcellation_node = pe.MapNode(
DerivativesDataSink( # type: ignore[arg-type]
**DIFFUSION_WF_OUTPUT_ENTITIES.get("parcellations"),
reconstruction_software=software,
desc=inputs,
),
iterfield=["desc", "in_file"],
name="ds_parcellation_node",
)
workflow.connect(
[
(
inputnode,
listify_inputs_node,
[(p, f"in{i+1}") for i, p in enumerate(inputs)],
),
(listify_inputs_node, parcellate_node, [("out", "in_file")]),
(
inputnode,
parcellate_node,
[("atlas_nifti", "atlas_nifti")],
),
(
parcellate_node,
ds_parcellation_node,
[("out_file", "in_file"), ("atlas_name", "atlas")],
),
(
inputnode,
ds_parcellation_node,
[
("acq_label", "acquisition"),
("source_file", "source_file"),
("base_directory", "base_directory"),
],
),
]
)

return workflow
Loading

0 comments on commit c635ab5

Please sign in to comment.