diff --git a/ingest/scripts/call_loculus.py b/ingest/scripts/call_loculus.py index 15d42fd5f..24854e1a5 100644 --- a/ingest/scripts/call_loculus.py +++ b/ingest/scripts/call_loculus.py @@ -182,13 +182,13 @@ def post_fasta_batches( ) -> requests.Response: """Chunks metadata files, joins with sequences and submits each chunk via POST.""" - def submit(metadata_output, sequences_output, number_of_submissions): - batch_num = -(number_of_submissions // - chunk_size) # ceiling division + def submit(metadata_batch_output, sequences_batch_output, number_of_submissions): + batch_num = -(number_of_submissions // -chunk_size) # ceiling division logger.info(f"Submitting batch {batch_num}") - metadata_in_memory = BytesIO(metadata_output.encode("utf-8")) - fasta_in_memory = BytesIO(sequences_output.encode("utf-8")) - # Prepare the files dictionary for the POST request + metadata_in_memory = BytesIO(metadata_batch_output.encode("utf-8")) + fasta_in_memory = BytesIO(sequences_batch_output.encode("utf-8")) + files = { "metadataFile": ("metadata.tsv", metadata_in_memory, "text/tab-separated-values"), "sequenceFile": ("sequences.fasta", fasta_in_memory, "text/plain"), @@ -201,13 +201,12 @@ def submit(metadata_output, sequences_output, number_of_submissions): return response number_of_submissions = -1 - submission_id_chunk = [] fasta_submission_id = None - fasta_header = None - table_header = None + fasta_record_header = None + metadata_header = None - sequences_output = "" - metadata_output = "" + sequences_batch_output = "" + metadata_batch_output = "" with ( open(fasta_file, encoding="utf-8") as fasta_file_stream, @@ -215,65 +214,60 @@ def submit(metadata_output, sequences_output, number_of_submissions): ): for record in metadata_file_stream: number_of_submissions += 1 + + # process metadata header if number_of_submissions == 0: - # get column index of submissionId - print(record.split("\t")) - header_index = record.strip().split("\t").index("submissionId") - table_header = record - metadata_output += table_header + submission_id_index = record.strip().split("\t").index("submissionId") + metadata_header = record + metadata_batch_output += metadata_header continue - if number_of_submissions > 1 and number_of_submissions % chunk_size == 1: - metadata_output += table_header - metadata_output += record + # add header to batch metadata output + if number_of_submissions > 1 and number_of_submissions % chunk_size == 1: + metadata_batch_output += metadata_header - metadata_submission_id = record.split("\t")[header_index].strip() + metadata_batch_output += record + metadata_submission_id = record.split("\t")[submission_id_index].strip() if fasta_submission_id and metadata_submission_id != fasta_submission_id: msg = f"Fasta SubmissionId {fasta_submission_id} not in correct order in metadata" logger.error(msg) raise ValueError(msg) - searching = True - - while searching: + while True: + # get all fasta sequences for the current metadata submissionId line = fasta_file_stream.readline() - if not line: - searching = False + if not line: # EOF break if line.startswith(">"): - fasta_header = line + fasta_record_header = line if config.segmented: - submission_id = "_".join(fasta_header[1:].strip().split("_")[:-1]) + fasta_submission_id = "_".join(fasta_record_header[1:].strip().split("_")[:-1]) else: - submission_id = fasta_header[1:].strip() - if submission_id == metadata_submission_id: + fasta_submission_id = fasta_record_header[1:].strip() + if fasta_submission_id == metadata_submission_id: continue - if submission_id < metadata_submission_id: + if fasta_submission_id < metadata_submission_id: msg = "Fasta file is not sorted by submissionId" logger.error(msg) raise ValueError(msg) - fasta_submission_id = submission_id - submission_id_chunk.append(submission_id) - searching = False break - # add to sequences file - sequences_output += fasta_header - sequences_output += line + # add to batch sequences output + sequences_batch_output += fasta_record_header + sequences_batch_output += line if number_of_submissions % chunk_size == 0: response = submit( - metadata_output, sequences_output, number_of_submissions + metadata_batch_output, sequences_batch_output, number_of_submissions ) - submission_id_chunk = [] - sequences_output = "" - metadata_output = "" + sequences_batch_output = "" + metadata_batch_output = "" - if submission_id_chunk: + if number_of_submissions % chunk_size != 0: # submit the last chunk - response = submit(metadata_output, sequences_output, number_of_submissions) + response = submit(metadata_batch_output, sequences_batch_output, number_of_submissions) return response