Skip to content

Commit

Permalink
Merge pull request #276 from broadinstitute/jlc_ingest_h5ad_cluster
Browse files Browse the repository at this point in the history
Extract clustering from h5ad and delocalize intermediate files to study bucket (SCP-4771)
  • Loading branch information
jlchang authored Nov 15, 2022
2 parents 3249e71 + 2e07bee commit ae99818
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 10 deletions.
83 changes: 79 additions & 4 deletions ingest/anndata_.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pandas as pd # NOqa: F821

try:
from ingest_files import IngestFiles
from monitor import log_exception
Expand All @@ -14,13 +16,20 @@ def __init__(self, file_path, study_file_id, study_id, **kwargs):
IngestFiles.__init__(
self, file_path, allowed_file_types=self.ALLOWED_FILE_TYPES
)
pass
# If performing cluster extraction, set obsm_keys
extract_cluster = kwargs.get("extract_cluster")
if extract_cluster:
self.obsm_keys = kwargs["obsm_keys"]
else:
pass

def obtain_adata(self):
try:
self.adata = self.open_file(self.file_path)[0]
print(self.adata)
IngestFiles.dev_logger.info(str(self.adata))
adata = self.open_file(self.file_path)[0]
# for faster dev, print adata info to screen, may want to remove in future
print(adata)
IngestFiles.dev_logger.info(str(adata))
return adata
except ValueError as e:
raise ValueError(e)

Expand All @@ -35,3 +44,69 @@ def validate(self):
except ValueError:
return False

@staticmethod
def generate_cluster_header(adata, clustering_name):
"""
Based on clustering dimensions, write clustering NAME line to file
"""
dim = ['NAME', 'X', 'Y']
clustering_dimension = adata.obsm[clustering_name].shape[1]
if clustering_dimension == 3:
headers = dim.append('Z')
elif clustering_dimension == 3:
headers = dim
elif clustering_dimension > 3:
msg = f"Too many dimensions for visualization in obsm \"{clustering_name}\", found {clustering_dimension}, expected 2 or 3."
raise ValueError(msg)
else:
msg = f"Too few dimensions for visualization in obsm \"{clustering_name}\", found {clustering_dimension}, expected 2 or 3."
raise ValueError(msg)
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "w") as f:
f.write('\t'.join(headers) + '\n')

@staticmethod
def generate_cluster_type_declaration(adata, clustering_name):
"""
Based on clustering dimensions, write clustering TYPE line to file
"""
clustering_dimension = adata.obsm[clustering_name].shape[1]
types = ["TYPE", *["numeric"] * clustering_dimension]
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "a") as f:
f.write('\t'.join(types) + '\n')

@staticmethod
def generate_cluster_body(adata, clustering_name):
"""
Append clustering data to clustering file
"""
cluster_cells = pd.DataFrame(adata.obs_names)
cluster_body = pd.concat(
[cluster_cells, pd.DataFrame(adata.obsm[clustering_name])], axis=1
)
pd.DataFrame(cluster_body).to_csv(
f"{clustering_name}.cluster.anndata_segment.tsv",
sep="\t",
mode="a",
header=None,
index=False,
)

@staticmethod
def files_to_delocalize(arguments):
# ToDo - check if names using obsm_keys need sanitization
cluster_file_names = [name + ".tsv" for name in arguments["obsm_keys"]]
return cluster_file_names

@staticmethod
def delocalize_cluster_files(file_path, study_file_id, files_to_delocalize):
""" Copy cluster files to study bucket
"""

for file in files_to_delocalize:
IngestFiles.delocalize_file(
study_file_id,
None,
file_path,
file,
f"_scp_internal/anndata_ingest/{file}",
)
24 changes: 21 additions & 3 deletions ingest/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,28 @@ def create_parser():
"--anndata-file", required=True, help="Path to AnnData file"
)

parser_anndata.add_argument(
"--obsm-keys",
type=ast.literal_eval,
help="Array of obsm key(s) to extract as cluster files",
)

parser_anndata.add_argument(
"--extract-cluster",
action="store_true",
help="Indicates clustering data should be extracted",
)

parser_expression_writer = subparsers.add_parser(
"render_expression_arrays",
help="Indicates preprocessing of cluster/expression files for image pipeline"
help="Indicates preprocessing of cluster/expression files for image pipeline",
)

parser_expression_writer.add_argument(
'--render-expression-arrays', action="store_true", help='Invoke expression_writer.py', required=True
'--render-expression-arrays',
action="store_true",
help='Invoke expression_writer.py',
required=True,
)

parser_expression_writer.add_argument(
Expand All @@ -366,7 +381,10 @@ def create_parser():
'--matrix-file-path', help='path to matrix file', required=True
)
parser_expression_writer.add_argument(
'--matrix-file-type', help='type to matrix file (dense or mtx)', required=True, choices=['dense', 'mtx']
'--matrix-file-type',
help='type to matrix file (dense or mtx)',
required=True,
choices=['dense', 'mtx'],
)
parser_expression_writer.add_argument(
'--gene-file', help='path to gene file (omit for dense matrix files)'
Expand Down
22 changes: 19 additions & 3 deletions ingest/ingest_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_expression --taxon-name 'Homo sapiens' --taxon-common-name human --ncbi-taxid 9606 --matrix-file ../tests/data/dense_matrix_19_genes_1000_cells.txt --matrix-file-type dense
# Ingest AnnData file
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --anndata-file ../tests/data/anndata/test.h5ad
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --ingest-anndata --anndata-file ../tests/data/anndata/test.h5ad
# Subsample cluster and metadata file
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_subsample --cluster-file ../tests/data/test_1k_cluster_Data.csv --name custer1 --cell-metadata-file ../tests/data/test_1k_metadata_Data.csv --subsample
Expand Down Expand Up @@ -103,7 +103,7 @@
from .clusters import Clusters
from .expression_files.dense_ingestor import DenseIngestor
from .expression_files.mtx import MTXIngestor
from .anndata import AnnDataIngestor
from .anndata_ import AnnDataIngestor
from .cli_parser import create_parser, validate_arguments
from .de import DifferentialExpression
from .expression_writer import ExpressionWriter
Expand Down Expand Up @@ -487,6 +487,13 @@ def ingest_anndata(self):
)
if self.anndata.validate():
self.report_validation("success")
if self.kwargs["extract_cluster"]:
for key in self.kwargs["obsm_keys"]:
AnnDataIngestor.generate_cluster_header(self.anndata.adata, key)
AnnDataIngestor.generate_cluster_type_declaration(
self.anndata.adata, key
)
AnnDataIngestor.generate_cluster_body(self.anndata.adata, key)
return 0
# scanpy unable to open AnnData file
else:
Expand Down Expand Up @@ -605,7 +612,16 @@ def exit_pipeline(ingest, status, status_cell_metadata, arguments):
DifferentialExpression.delocalize_de_files(
file_path, study_file_id, files_to_match
)
# all non-DE ingest jobs can exit on success
# for successful anndata jobs, need to delocalize intermediate ingest files
elif "extract_cluster" in arguments and all(i < 1 for i in status):
file_path, study_file_id = get_delocalization_info(arguments)
# append status?
if IngestFiles.is_remote_file(file_path):
files_to_delocalize = AnnDataIngestor.files_to_delocalize(arguments)
AnnDataIngestor.delocalize_cluster_files(
file_path, study_file_id, files_to_delocalize
)
# all non-DE, non-anndata ingest jobs can exit on success
elif all(i < 1 for i in status):
sys.exit(os.EX_OK)
else:
Expand Down

0 comments on commit ae99818

Please sign in to comment.