diff --git a/clock.time b/clock.time index 59937afe..5b2cf238 100644 --- a/clock.time +++ b/clock.time @@ -1 +1 @@ -Last updated at 16:15 10/09/2024 \ No newline at end of file +Last updated at 10:20 07/01/2025 (dd/mm/aaaa) \ No newline at end of file diff --git a/navigation/IMF.py b/navigation/IMF.py index 18ba52e3..00a1d5c4 100644 --- a/navigation/IMF.py +++ b/navigation/IMF.py @@ -188,7 +188,7 @@ def analyse(dna_sequence=None): break else: found_species = "n.d" - regions_prom = ['Promoter', 'Terminator'] + regions_prom = ['promoter', 'terminator', 'rna', 'mrna'] for regions in regions_prom: if regions.lower() in promoter_name.lower(): region = regions[:4] + "." @@ -248,13 +248,16 @@ def analyse(dna_sequence=None): st.success(f"{TF_species} transcription factor {TF_name}") with REcol2: st.image(weblogo) + isUIPAC = True button = False error_input_im = True else: + isUIPAC = False button = True error_input_im = False st.error('Wrong JASPAR_ID') else: + isUIPAC = False button = True error_input_im = False st.warning('Please enter a JASPAR_ID') @@ -265,7 +268,6 @@ def analyse(dna_sequence=None): matrix_type = st.radio('🔹 :blue[**Step 2.2bis**] Matrix:', ('With FASTA sequences', 'With PWM'), label_visibility='collapsed') if matrix_type == 'With PWM': - isUIPAC = True with REcol2: st.markdown("🔹 :blue[**Step 2.3**] Matrix:") matrix_str = st.text_area("🔹 :blue[**Step 2.3**] Matrix:", @@ -303,12 +305,14 @@ def analyse(dna_sequence=None): weblogo.fig.savefig(logo, format='png') logo.seek(0) st.session_state['weblogo'] = logo - + isUIPAC = True error_input_im = True except Exception as e: + isUIPAC = False error_input_im = False REcol2.error(e) else: + isUIPAC = False error_input_im = False REcol2.warning("Please input your PWM :)") else: @@ -322,7 +326,6 @@ def analyse(dna_sequence=None): label_visibility='collapsed') st.session_state['individual_motif_save'] = individual_motif individual_motif = individual_motif.upper() - isUIPAC = True try: matrix, weblogo = IMO.individual_motif_pwm(individual_motif) @@ -339,8 +342,10 @@ def analyse(dna_sequence=None): weblogo.fig.savefig(logo, format='png') logo.seek(0) st.session_state['weblogo'] = logo + isUIPAC = True error_input_im = True except Exception as e: + isUIPAC = False error_input_im = False REcol1.error(e) @@ -357,39 +362,36 @@ def analyse(dna_sequence=None): IUPAC_code = ['A', 'T', 'G', 'C', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N', '-', '.'] - if all(char in IUPAC_code for char in IUPAC): - isUIPAC = True - - sequences = IMO.generate_iupac_variants(IUPAC, max_variant_allowed=1048576) - - if 'Too many' not in sequences: - individual_motif = "" - for i, seq in enumerate(sequences): - individual_motif += f">seq{i + 1}\n{seq}\n" - + if len(IUPAC) > 0: + if all(char in IUPAC_code for char in IUPAC): + isUIPAC = True try: - matrix, weblogo = IMO.individual_motif_pwm(individual_motif) + matrix = IMO.generate_iupac_variants(IUPAC) + _, weblogo = IMO.individual_motif_pwm(matrix, "UIPAC") + matrix_str = "" for base, values in matrix.items(): values_str = " ".join([f"{val:.4f}" for val in values]) matrix_str += f"{base} [ {values_str} ]\n" + with REcol2: matrix_text = st.text_area('PWM', value=matrix_str, height=125, help='Copy to use later. Not editable.', disabled=True) + st.pyplot(weblogo.fig) logo = io.BytesIO() weblogo.fig.savefig(logo, format='png') logo.seek(0) st.session_state['weblogo'] = logo + error_input_im = True + except Exception as e: error_input_im = False REcol1.error(e) else: - st.error(sequences) isUIPAC = False - else: isUIPAC = False @@ -457,7 +459,7 @@ def analyse(dna_sequence=None): "[TFBSTools](https://bioconductor.org/packages/release/bioc/html/TFBSTools.html))." f"") if isUIPAC is True: - pseudocount_auto, _, _ = IMO.transform_PWM(matrix) + pseudocount_auto, _ = IMO.transform_PWM(matrix) pc_col1, pc_col2, pc_col3, pc_col4 = st.columns(4, gap="small") pseudocount_A = pc_col1.number_input('A', value=0.20 if pseudocount_type is False else pseudocount_auto['A'], diff --git a/navigation/regulatory_regions_extractor.py b/navigation/regulatory_regions_extractor.py index 7dc5c95b..9c85d853 100644 --- a/navigation/regulatory_regions_extractor.py +++ b/navigation/regulatory_regions_extractor.py @@ -50,29 +50,6 @@ def extract(): label_visibility='collapsed') gene_ids = gene_id_entry.strip().split("\n") - # Verify if gene is available for all species - if st.button('🔎 Check genes avaibility', - help='Sometimes genes do not have the same name in all species or do not exist.'): - with st.spinner('Please wait...'): - species_list = ['ID', 'Human', 'Mouse', 'Rat', 'Drosophila', 'Zebrafish'] - gene_disponibility_output = [] - pbar = st.progress(0, - text='**:blue[Analyse genes...] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - for i, gene_id in enumerate(gene_ids): - pbar.progress(i / len(gene_ids), - text=f'**:blue[Analyse genes... {gene_id}] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - gene_disponibility_output.append(NCBIdna.analyse_gene(gene_id)) - pbar.progress((i + 1) / len(gene_ids), - text=f'**:blue[Analyse genes... {gene_id}] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - - species_columns = ['Gene'] + species_list - gene_disponibility_output = pd.DataFrame(gene_disponibility_output, columns=species_columns) - - st.session_state['gene_disponibility_output'] = gene_disponibility_output - - if 'gene_disponibility_output' in st.session_state: - st.dataframe(st.session_state['gene_disponibility_output'], hide_index=True) - with colprom2: tab1, tab2 = st.tabs(['Default', 'Advance']) @@ -99,26 +76,40 @@ def extract(): col3.markdown("") col3.markdown("") - all_variants = col3.toggle(label='All variants') - - # Upstream/Downstream Promoter - st.markdown("🔹 :blue[**Step 1.3**] Regulatory region:") - prom_term = st.radio("🔹 :blue[**Step 1.3**] Regulatory region:", ('Promoter', 'Terminator'), - horizontal=True, - label_visibility='collapsed') - if prom_term == 'Promoter': - st.markdown("🔹 :blue[**Step 1.4**] Upstream/downstream from the TSS (bp)") - else: - st.markdown("🔹 :blue[**Step 1.4**] Upstream/downstream from gene end (bp)") + all_slice_forms = col3.toggle(label='All variants') + + st.markdown("🔹 :blue[**Step 1.3**] Sequence type:") + seq_type_display = st.radio( + "🔹 :blue[**Step 1.3**] Sequence type:", + ('Promoter', 'Terminator', "RNA (exon + intron)", "mRNA (exon)"), + horizontal=True, + label_visibility='collapsed' + ) + seq_type_mapping = { + "Promoter": "Promoter", + "Terminator": "Terminator", + "RNA (exon + intron)": "RNA", + "mRNA (exon)": "mRNA" + } - updown_slide = st.slider("🔹 :blue[**Step 1.4**] Upstream/downstream", -10000, 10000, - (-2000, 2000), step=100, label_visibility='collapsed') - if prom_term == 'Promoter': - st.write("Upstream: ", min(updown_slide), " bp from TSS | Downstream: ", max(updown_slide), - " bp from TSS") + seq_type = seq_type_mapping[seq_type_display] + + if seq_type in ['Promoter', 'Terminator']: + if seq_type == 'Promoter': + st.markdown("🔹 :blue[**Step 1.4**] Upstream/downstream from the TSS (bp)") + elif seq_type == "Terminator": + st.markdown("🔹 :blue[**Step 1.4**] Upstream/downstream from gene end (bp)") + + updown_slide = st.slider("🔹 :blue[**Step 1.4**] Upstream/downstream", -10000, 10000, + (-2000, 2000), step=100, label_visibility='collapsed', disabled=True if seq_type in ['mRNA', "RNA"] else False) + if seq_type == 'Promoter': + st.write("Upstream: ", min(updown_slide), " bp from TSS | Downstream: ", max(updown_slide), + " bp from TSS") + elif seq_type == "Terminator": + st.write("Upstream: ", min(updown_slide), " bp from gene end | Downstream: ", max(updown_slide), + " bp from gene end") else: - st.write("Upstream: ", min(updown_slide), " bp from gene end | Downstream: ", max(updown_slide), - " bp from gene end") + updown_slide = [0, 0] upstream_entry = -min(updown_slide) downstream_entry = max(updown_slide) @@ -128,13 +119,14 @@ def extract(): downstream = int(downstream_entry) # Run Promoter Finder - if st.button(f"🧬 :blue[**Step 1.5**] Extract {prom_term}", help='(~5sec/gene)'): + if st.button(f"🧬 :blue[**Step 1.5**] Extract {seq_type}", help='(~5sec/gene)'): response = requests.get( 'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=gene&term=nos2[Gene%20Name]+AND+human[Organism]&retmode=json&rettype=xml') ncbi_status = True if response.status_code == 200 else False if ncbi_status is True: + st.session_state['result_promoter_text'] = "" with st.spinner('Please wait...'): with colprom1: @@ -143,16 +135,53 @@ def extract(): for i, gene_id in enumerate(gene_ids): pbar.progress(i / len(gene_ids), text=f'**:blue[Extract sequence... {gene_id}] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - result_promoter_output, message = NCBIdna(gene_id, prom_term, upstream, downstream, - species, gr.lower(), - all_slice_forms=True if all_variants else False).find_sequences() + all_variants, message = NCBIdna(gene_id, species, seq_type.lower(), upstream, downstream, gr.lower(), + all_slice_forms=True if all_slice_forms else False).find_sequences() if message == "OK": - pbar.progress((i + 1) / len(gene_ids), - - text=f'**:blue[Extract sequence... {gene_id}] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - st.toast(f"{prom_term} **{gene_id}** from **{species}** extracted", icon='🧬') + for nm_id, data in all_variants.items(): + exon_coords = data.get('exon_coords', []) + upstream_seq = data.get('upstream') + + dna_sequence = ( + f">{nm_id} {data.get('gene_name', 'Unknown')} | " + f"{data.get('genomic_info', 'No Title')} {data.get('chraccver', 'No chraccver')} | " + f"Strand: {data.get('strand', 'Unknown')} | " + f"Type: {data.get('seq_type', 'Unknown')} | " + ) + + if data.get('seq_type') == 'promoter': + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"{f'| TSS (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + elif data.get('seq_type') == 'terminator': + coord = ( + f"Gene end (on chromosome): {exon_coords[-1][1]} " + f"{f'| Gene end (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + else: + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"| Gene end (on chromosome): {exon_coords[-1][1]}\n" + ) + + sequence = f"{data.get('sequence', 'No sequence available')}\n" + + result_promoter.append(dna_sequence + coord + sequence) + + pbar.progress( + (i + 1) / len(gene_ids), + text=( + f"**:blue[Extract sequence... {gene_id}] " + f"⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**" + ) + ) + + st.toast( + f"{data.get('seq_type', 'Unknown')} **{gene_id}** from **{species}** extracted", + icon='🧬' + ) - result_promoter.append(result_promoter_output) else: st.error(message) continue @@ -161,8 +190,8 @@ def extract(): st.session_state['result_promoter_text'] = result_promoter_text - st.success(f"{prom_term} extraction complete !") - st.toast(f"{prom_term} extraction complete !", icon='😊') + st.success(f"{seq_type} extraction complete !") + st.toast(f"{seq_type} extraction complete !", icon='😊') elif ncbi_status is False: st.warning("⚠ NCBI servers are under maintenance or have an error") @@ -179,13 +208,15 @@ def extract(): "zebrafish": [False] * len(gene_ids), "promoter": [False] * len(gene_ids), "terminator": [False] * len(gene_ids), + "rna": [False] * len(gene_ids), + "mrna": [False] * len(gene_ids), "current": [False] * len(gene_ids), "previous": [False] * len(gene_ids), } ) species_list = ['human', 'mouse', 'rat', 'drosophila', 'zebrafish'] - search_types = ['promoter', 'terminator'] + search_types = ['promoter', 'terminator', "rna", "mrna"] genome_type = ["current", "previous"] st.markdown('**🔹 :blue[Step 1.2]** Select species for all genes:', @@ -207,12 +238,12 @@ def extract(): st.markdown('**🔹 :blue[Step 1.2]** Select regions for all genes:', help='Checking a box allows you to check all the corresponding boxes for each gene. Warning: if you have manually checked boxes in the table, they will be reset.') - region1, region2 = st.columns(2) + region1, region2, region3, region4 = st.columns(4) - with region1: - all_prom = st.toggle("Promoter") - with region2: - all_term = st.toggle("Terminator") + all_prom = region1.toggle("Promoter") + all_term = region2.toggle("Terminator") + all_rna = region3.toggle("RNA") + all_mrna = region4.toggle("mRNA") st.markdown('**🔹 :blue[Step 1.2]** Select genome version for all genes:', help="Human: Current -> GrCh38, Previous -> GrCh37\n\n" @@ -242,6 +273,10 @@ def extract(): data_df["promoter"] = True if all_term: data_df["terminator"] = True + if all_rna: + data_df["rna"] = True + if all_mrna: + data_df["mrna"] = True if gr_current: data_df['current'] = True if gr_previous: @@ -281,6 +316,14 @@ def extract(): "Terminator", default=False, ), + "rna": st.column_config.CheckboxColumn( + "RNA", + default=False, + ), + "mrna": st.column_config.CheckboxColumn( + "mRNA", + default=False, + ), "current": st.column_config.CheckboxColumn( "Current genome", default=False, @@ -310,6 +353,7 @@ def extract(): ncbi_status = True if response.status_code == 200 else False if ncbi_status is True: + st.session_state['result_promoter_text'] = "" with colprom1: st.session_state['upstream'] = upstream_entry upstream = int(upstream_entry) @@ -323,20 +367,55 @@ def extract(): for genome in genome_type: for search_type in search_types: if getattr(gene_info, f'{search_type}') and getattr(gene_info, f'{genome}'): - prom_term = search_type.capitalize() + seq_type = search_type.capitalize() pbar.progress((i + 1) / len(data_dff), - text=f'**:blue[Extract sequence... {prom_term} **{gene_id}** from **{species}**] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - - result_promoter_output, message = NCBIdna(gene_id, prom_term, upstream, - downstream, - genome_version=genome).find_sequences() + text=f'**:blue[Extract sequence... {seq_type} **{gene_id}** from **{species}**] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') + all_variants, message = NCBIdna(gene_id, seq_type=seq_type.lower(), + upstream=upstream, downstream=downstream, genome_version=gr.lower(), + all_slice_forms=True if all_slice_forms else False).find_sequences() if message == "OK": - st.toast(f"{prom_term} **{gene_id}** from **{species}** extracted", - icon='🧬') - - result_promoter.append(result_promoter_output) + for nm_id, data in all_variants.items(): + exon_coords = data.get('exon_coords', []) + upstream_seq = data.get('upstream') + + if not exon_coords: + print( + f"Erreur: 'exon_coords' est manquant ou vide pour {nm_id}.") + continue + + dna_sequence = ( + f">{nm_id} {data.get('gene_name', 'Unknown')} | " + f"{data.get('genomic_info', 'No Title')} {data.get('chraccver', 'No chraccver')} | " + f"Strand: {data.get('strand', 'Unknown')} | " + f"Type: {data.get('seq_type', 'Unknown')} | " + ) + + if data.get('seq_type') == 'promoter': + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"{f'| TSS (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + elif data.get('seq_type') == 'terminator': + coord = ( + f"Gene end (on chromosome): {exon_coords[-1][1]} " + f"{f'| Gene end (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + else: + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"| Gene end (on chromosome): {exon_coords[-1][1]}\n" + ) + + sequence = f"{data.get('sequence', 'No sequence available')}\n" + + result_promoter.append(dna_sequence + coord + sequence) + + st.toast( + f"{data.get('seq_type', 'Unknown')} **{gene_id}** from **{species}** extracted", + icon='🧬' + ) else: st.error(message) continue @@ -348,53 +427,118 @@ def extract(): if getattr(gene_info, f'{species}') and getattr(gene_info, f'{search_type}') and getattr( gene_info, f'{genome}'): - prom_term = search_type.capitalize() + seq_type = search_type.capitalize() pbar.progress((i + 1) / len(data_dff), - text=f'**:blue[Extract sequence... {prom_term} **{gene_id}** from **{species.capitalize()}**] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') - - result_promoter_output, message = NCBIdna(gene_id, prom_term, upstream, - downstream, species, - genome).find_sequences() + text=f'**:blue[Extract sequence... {seq_type} **{gene_id}** from **{species.capitalize()}**] ⚠️:red[PLEASE WAIT UNTIL END WITHOUT CHANGING ANYTHING]**') + all_variants, message = NCBIdna(gene_id, species, seq_type=seq_type.lower(), + upstream=upstream, + downstream=downstream, + genome_version=gr.lower(), + all_slice_forms=True if all_slice_forms else False).find_sequences() if message == "OK": - st.toast(f"{prom_term} **{gene_id}** from **{species}** extracted", - icon='🧬') - - result_promoter.append(result_promoter_output) + for nm_id, data in all_variants.items(): + exon_coords = data.get('exon_coords', []) + upstream_seq = data.get('upstream') + + if not exon_coords: + print( + f"Erreur: 'exon_coords' est manquant ou vide pour {nm_id}.") + continue + + dna_sequence = ( + f">{nm_id} {data.get('gene_name', 'Unknown')} | " + f"{data.get('genomic_info', 'No Title')} {data.get('chraccver', 'No chraccver')} | " + f"Strand: {data.get('strand', 'Unknown')} | " + f"Type: {data.get('seq_type', 'Unknown')} | " + ) + + if data.get('seq_type') == 'promoter': + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"{f'| TSS (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + elif data.get('seq_type') == 'terminator': + coord = ( + f"Gene end (on chromosome): {exon_coords[-1][1]} " + f"{f'| Gene end (on sequence): {upstream_seq}' if upstream_seq is not None else ''}\n" + ) + else: + coord = ( + f"TSS (on chromosome): {exon_coords[0][0] + 1} " + f"| Gene end (on chromosome): {exon_coords[-1][1]}\n" + ) + + sequence = f"{data.get('sequence', 'No sequence available')}\n" + + result_promoter.append(dna_sequence + coord + sequence) + + st.toast( + f"{data.get('seq_type', 'Unknown')} **{gene_id}** from **{species}** extracted", + icon='🧬' + ) else: st.error(message) continue result_promoter_text = "\n".join(result_promoter) st.session_state['result_promoter_text'] = result_promoter_text - st.success(f"{prom_term} extraction complete !") - st.toast(f"{prom_term} extraction complete !", icon='😊') + st.success(f"{seq_type} extraction complete !") + st.toast(f"{seq_type} extraction complete !", icon='😊') elif ncbi_status is False: st.warning("⚠ NCBI servers are under maintenance or have an error") def fasta(result_promoter_text=None): - # Promoter output state st.divider() - promcol1, promcol2 = st.columns([0.9, 0.1], gap='small') - with promcol1: - st.markdown("🔹 :blue[**Step 2.1**] Sequences:", help='Copy: Click in sequence, CTRL+A, CTRL+C') - if 'result_promoter_text' not in st.session_state: - result_promoter_text = '' - st.session_state['result_promoter_text'] = result_promoter_text - dna_sequence = st.text_area("🔹 :blue[**Step 2.1**] Sequences:", - value=st.session_state['result_promoter_text'], - label_visibility='collapsed', height=125) + promcol1, promcol2 = st.columns([0.75, 0.25], gap='small') with promcol2: st.markdown('') - st.markdown('') - st.markdown('') - current_date_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - txt_output = f"{dna_sequence}" - st.download_button(label="💾 Download (.fasta)", data=txt_output, - file_name=f"Sequences_{current_date_time}.fasta", mime="text/plain") + uploaded_files = st.file_uploader( + "Upload FASTA/TXT files:", + type=["fasta", "txt"], + accept_multiple_files=True, + help="Upload multiple FASTA/TXT files" + ) + if uploaded_files: + concatenated_sequences = "" + for uploaded_file in uploaded_files: + uploaded_content = uploaded_file.getvalue().decode("utf-8") + concatenated_sequences += uploaded_content + "\n" + st.session_state['result_promoter_text'] = concatenated_sequences.strip() + + with promcol1: + st.markdown("🔹 :blue[**Step 2.1**] Sequences:", help='Copy: Click in sequence, CTRL+A, CTRL+C') + + dna_sequence = st.text_area( + "🔹 :blue[**Step 2.1**] Sequences:", + value="" if 'result_promoter_text' not in st.session_state + else st.session_state['result_promoter_text'], + label_visibility='collapsed', + height=125 + ) + st.session_state['result_promoter_text'] = dna_sequence + + if '>' in dna_sequence: + num_sequences = dna_sequence.count('>') + st.markdown(f"**Number of sequences**: {num_sequences}") + elif dna_sequence.strip(): + num_sequences = 1 + st.markdown("**Single sequence detected**") + else: + num_sequences = 0 + st.markdown("**No sequence entered**") + + current_date_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + txt_output = f"{dna_sequence}" + promcol2.download_button( + label="💾 Download sequence(s) (.fasta)", + data=txt_output, + file_name=f"Sequences_{current_date_time}.fasta", + mime="text/plain" + ) return dna_sequence diff --git a/tfinder/__init__.py b/tfinder/__init__.py index 66532e73..e4a3c17f 100644 --- a/tfinder/__init__.py +++ b/tfinder/__init__.py @@ -58,80 +58,38 @@ class bcolors: class NCBIdna: - def __init__(self, - gene_id, - prom_term, - upstream, - downstream, - species=None, genome_version="current", all_slice_forms=None): + def __init__(self, gene_id, species=None, seq_type="mrna", upstream=2000, downstream=2000, genome_version="Current", all_slice_forms=None): self.gene_id = gene_id - self.prom_term = prom_term if prom_term is not None else "promoter" - self.upstream = upstream if upstream is not None else -2000 - self.downstream = downstream if downstream is not None else 2000 self.species = species if species is not None else "human" + self.seq_type = seq_type if seq_type is not None else "mrna" + self.upstream = upstream if upstream is not None and seq_type in ["promoter", "terminator"] else None + self.downstream = downstream if downstream is not None and seq_type in ["promoter", "terminator"] else None self.genome_version = genome_version if genome_version is not None else "current" self.all_slice_forms = True if all_slice_forms is True else False @staticmethod def XMNM_to_gene_ID(variant): - uids = f"https://www.ncbi.nlm.nih.gov/nuccore/{variant}" - - response = requests.get(uids) - - if response.status_code == 200: - soup = BeautifulSoup(response.text, 'html.parser') - - pattern = r"list_uids=(\d+)" - matches = re.search(pattern, str(soup)) - - if matches: - entrez_id = matches.group(1) - else: - entrez_id = "UIDs not founds" - else: - entrez_id = "Error during process of retrieving UIDs" + global headers - return entrez_id + while True: + uids = f"https://www.ncbi.nlm.nih.gov/nuccore/{variant}" - @staticmethod - # Analyse if gene is available - def analyse_gene(gene_id): - disponibility_list = ['ID', 'Human', 'Mouse', 'Rat', 'Drosophila', 'Zebrafish'] - time.sleep(0.25) - gene_analyse = [gene_id] - for species_test in disponibility_list: - if not gene_id.isdigit(): - if species_test == 'ID': - gene_analyse.append('n.d') - else: - time.sleep(0.5) - url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=gene&term={gene_id}[Gene%20Name]+AND+{species_test}[Organism]&retmode=json&rettype=xml" - response = requests.get(url) + response = requests.get(uids, headers=headers) - if response.status_code == 200: - response_data = response.json() + if response.status_code == 200: + soup = BeautifulSoup(response.text, 'html.parser') - if response_data['esearchresult']['count'] != '0': - gene_analyse.append("✅") - else: - gene_analyse.append("❌") + pattern = r"list_uids=(\d+)" + matches = re.search(pattern, str(soup)) - if gene_id.isdigit(): - if species_test != 'ID': - gene_analyse.append('n.d') + if matches: + entrez_id = matches.group(1) else: - url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id={gene_id}&retmode=json&rettype=xml" - response = requests.get(url) - - if response.status_code == 200: - response_data = response.json() - - if 'chraccver' in str(response_data): - gene_analyse.append("✅") - else: - gene_analyse.append("❌") + entrez_id = "UIDs not founds" - return gene_analyse + return entrez_id + else: + print(bcolors.FAIL + "Error during process of retrieving UIDs" + bcolors.ENDC) # Sequence extractor def find_sequences(self): @@ -139,13 +97,9 @@ def find_sequences(self): if self.gene_id.startswith('XM_') or self.gene_id.startswith('NM_') or self.gene_id.startswith( 'XR_') or self.gene_id.startswith('NR_'): entrez_id = NCBIdna.XMNM_to_gene_ID(self.gene_id) - if entrez_id == 'UIDs not founds' or entrez_id == 'Error during process of retrieving UIDs': + if entrez_id == 'UIDs not founds': result_promoter = f'Please verify {self.gene_id} variant' - return result_promoter - else: - variant, gene_name, title, chraccver, chrstart, chrstop, strand, species_API = NCBIdna.get_variant_info( - entrez_id, - self.gene_id) + return result_promoter, result_promoter else: if self.gene_id.isdigit(): entrez_id = self.gene_id @@ -155,57 +109,23 @@ def find_sequences(self): if entrez_id == "Error 200": return entrez_id, message - variant, gene_name, title, chraccver, chrstart, chrstop, strand, species_API, message = NCBIdna.get_gene_info( - entrez_id, self.genome_version, gene_name_error=self.gene_id) - if variant == "Error 200": - return variant, message - - if self.all_slice_forms: - all_variants, message = NCBIdna.all_variant(entrez_id) - if all_variants == "Error 200": - all_variants = [(variant, gene_name, chraccver, exon_coords, normalized_exon_coords, species_API)] - - prom_term = self.prom_term.lower() - if prom_term not in ['promoter', 'terminator']: - result_promoter = f"'{self.prom_term}' not valid. Please use 'Promoter' or 'Terminator'." - return result_promoter, "OK" - - if isinstance(self.upstream, int) and isinstance(self.downstream, int): - upstream = int(self.upstream) - downstream = int(self.downstream) - else: - result_window = f'Upstream {self.upstream} and Downstream {self.downstream} must be integer' - return result_window, "OK" - - if not self.all_slice_forms or self.all_slice_forms and self.gene_id.startswith( - 'XM_') or self.gene_id.startswith('NM_') or self.gene_id.startswith('XR_') or self.gene_id.startswith( - 'NR_'): - dna_sequence = NCBIdna.get_dna_sequence(gene_name, prom_term, upstream, downstream, chraccver, chrstart, - chrstop) - - if prom_term == 'promoter': - dna_sequence = f">{variant} {gene_name} | {title} {chraccver} | Strand: {strand} | {self.prom_term} | TSS (on chromosome): {chrstart + 1} | TSS (on sequence): {self.upstream}\n{dna_sequence}\n" - else: - dna_sequence = f">{variant} {gene_name} | {title} {chraccver} | Strand: {strand} | {self.prom_term} | Gene end (on chromosome): {chrstop} | Gene end (on sequence): {self.upstream}\n{dna_sequence}\n" - - return dna_sequence, "OK" - - elif self.all_slice_forms: - result_compil = [] - for variant, gene_name, chraccver, exon_coords, _, species_API in all_variants: - chrstart = exon_coords[0][0] - chrstop = exon_coords[-1][1] - dna_sequence = NCBIdna.get_dna_sequence(gene_name, prom_term, upstream, downstream, chraccver, chrstart, - chrstop) - if prom_term == 'promoter': - results = f">{variant} {gene_name} | {title} | {chraccver} | Strand: {strand} | {self.prom_term} | TSS (on chromosome): {chrstart + 1} | TSS (on sequence): {self.upstream}\n{dna_sequence}\n" + all_variants, message = NCBIdna.all_variant(entrez_id, self.genome_version, self.all_slice_forms) + if "Error 200" not in all_variants: + for nm_id, data in all_variants.items(): + exon_coords = data.get('exon_coords') + data['upstream'] = self.upstream + data['seq_type'] = self.seq_type + sequence = NCBIdna.get_dna_sequence(data.get("entrez_id"), data.get("chraccver"), exon_coords[0][0], + exon_coords[-1][1], self.seq_type, self.upstream, self.downstream) + if self.seq_type in ['mrna']: + if self.seq_type == 'mrna': + data['sequence'] = "".join(sequence[start:end + 1] for start, end in data["normalized_exon_coords"]) else: - results = f">{variant} {gene_name} | {title} | {chraccver} | Strand: {strand} | {self.prom_term} | Gene end (on chromosome): {chrstop} | Gene end (on sequence): {self.upstream}\n{dna_sequence}\n" - - result_compil.append(results) + data['sequence'] = sequence - result_output = "\n".join(result_compil) - return result_output, "OK" + return all_variants, "OK" + else: + return all_variants, "Error 200" @staticmethod # Convert gene to ENTREZ_GENE_ID @@ -245,275 +165,40 @@ def convert_gene_to_entrez_id(gene_name, species): @staticmethod # Get gene information - def get_gene_info(entrez_id, genome_version="current", from_id=True, gene_name_error=None): + def all_variant(entrez_id, genome_version="current", all_slice_forms=False): global headers while True: - url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id={entrez_id}&retmode=json&rettype=xml" - response = requests.get(url, headers=headers) + url2 = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id={entrez_id}&retmode=json&rettype=xml" + response = requests.get(url2, headers=headers) if response.status_code == 200: response_data = response.json() try: gene_info = response_data['result'][str(entrez_id)] - gene_name = gene_info['name'] species_API = gene_info['organism']['scientificname'] + title, chraccver = NCBIdna.extract_genomic_info(entrez_id, response_data, genome_version, species_API) + print( + bcolors.OKGREEN + f"Response 200: Chromosome {chraccver} found for {entrez_id}: {response.text}" + bcolors.ENDC) + break - title, chraccver, chrstart, chrstop = NCBIdna.extract_genomic_info(entrez_id, response_data, - genome_version, species_API) - - - if from_id: - variant = NCBIdna.all_variant(entrez_id, from_id=True) - if not variant: - variant = gene_name - - strand = "plus" if chrstart < chrstop else "minus" - - print(bcolors.OKGREEN + f"Response 200: Info for {entrez_id} {variant} {gene_name} retrieved." - f"Entrez_ID: {entrez_id} | Gene name: {gene_name} | Genome Assembly: {title} | ChrAccVer: {chraccver}" - f"ChrStart/ChrStop: {chrstart}/{chrstop} | Strand: {strand} | Species: {species_API}" + bcolors.ENDC) - - return variant if variant else None, gene_name, title, chraccver, chrstart, chrstop, strand, species_API, ( - f"Response 200: Info for {entrez_id} {variant} {gene_name} retrieved." - f"Entrez_ID: {entrez_id} | Gene name: {gene_name} | Genome Assembly: {title} | ChrAccVer: {chraccver}" - f"ChrStart/ChrStop: {chrstart}/{chrstop} | Strand: {strand} | Species: {species_API}") except Exception as e: print( - bcolors.WARNING + f"Response 200: Info for {gene_name_error} {entrez_id} not found: {e} {traceback.print_exc()}" + bcolors.ENDC) - return "Error 200", None, None, None, None, None, None, None, f"Info for {gene_name_error} {entrez_id} not found." + bcolors.WARNING + f"Response 200: Chromosome not found for {entrez_id}: {response.text} {e} {traceback.print_exc()}" + bcolors.ENDC) + all_variants = [("Error 200", None, None, None, None, None)] + print( + bcolors.WARNING + f"Response 200: Transcript not found(s) for {entrez_id}." + bcolors.ENDC) + return "Error 200", f"Transcript not found(s) for {entrez_id}." elif response.status_code == 429: print( - bcolors.FAIL + f"Error 429: API rate limit exceeded during get {entrez_id} info, try again: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - else: - print(bcolors.FAIL + f"Error {response.status_code}: {response.text}" + bcolors.ENDC) + bcolors.ENDC + f"Error {response.status_code}: API rate limit exceeded during get chromosome of {entrez_id}: {response.text}" + bcolors.ENDC) time.sleep(random.uniform(0.25, 0.5)) - - @staticmethod - def extract_genomic_info(gene_id, gene_info, genome_version, species=None): - if gene_info and 'result' in gene_info and gene_id in gene_info['result']: - accession_dict = {} - gene_details = gene_info['result'][gene_id] - - time.sleep(1) - - location_hist = gene_details.get('locationhist', []) - if len(location_hist) == 0: - location_hist = gene_details.get('genomicinfo', []) - - for loc in location_hist: - nc_accver = loc.get('chraccver') - chrstart = loc.get('chrstart') - chrstop = loc.get('chrstop') - - if nc_accver: - base_accession = nc_accver - if base_accession not in accession_dict: - accession_dict[base_accession] = (chrstart, chrstop) - else: - existing_start, existing_stop = accession_dict[base_accession] - accession_dict[base_accession] = (min(existing_start, chrstart), max(existing_stop, chrstop)) - - nc_dict = accession_dict - - nc_dict = {base_accver: (chrstart, chrstop) for base_accver, (chrstart, chrstop) in nc_dict.items() if - base_accver.startswith(("NC_", "NT_"))} - - if species == "Rattus norvegicus" and 'locationhist' in gene_details: - location_hist = gene_details['locationhist'] - rs_annotations = [ - loc for loc in location_hist if loc.get('annotationrelease', "").startswith("RS_") - ] - rs_annotations.sort(key=lambda x: int(x['annotationrelease'].split('_')[1])) - if rs_annotations: - selected_annotation = rs_annotations[-1] if genome_version == "current" else rs_annotations[0] - selected_nc_accver = selected_annotation.get('chraccver') - if selected_nc_accver: - nc_dict = { - selected_nc_accver: (selected_annotation['chrstart'], selected_annotation['chrstop']) - } - - if nc_dict: - first_base = next(iter(nc_dict)).split('.')[0] - nc_dict = {base_accver: (chrstart, chrstop) for base_accver, (chrstart, chrstop) in nc_dict.items() if - base_accver.split('.')[0] == first_base} - - max_version = -1 - max_accver = None - max_coords = None - min_version = float('inf') - min_accver = None - min_coords = None - - for base_accver in nc_dict.keys(): - version = int(base_accver.split('.')[1]) - - if version > max_version: - max_version = version - max_accver = base_accver - max_coords = nc_dict[base_accver] - - if version < min_version: - min_version = version - min_accver = base_accver - min_coords = nc_dict[base_accver] - - if genome_version != "current": - title = NCBIdna.fetch_nc_info(min_accver) - return title, min_accver, min_coords[0], min_coords[1] - else: - title = NCBIdna.fetch_nc_info(max_accver) - return title, max_accver, max_coords[0], max_coords[1] - - @staticmethod - def fetch_nc_info(nc_accver): - global headers - - while True: - url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=nuccore&id={nc_accver}&retmode=json" - response = requests.get(url, headers=headers) - if response.status_code == 200: - nc_info = response.json() - try: - uid = nc_info['result']['uids'][0] - title = nc_info['result'][uid]['title'] - return title - except Exception as e: - time.sleep(random.uniform(0.25, 0.5)) else: - time.sleep(random.uniform(0.25, 0.5)) - - @staticmethod - # Get gene information - def get_variant_info(entrez_id, variant): - global headers - variant = variant.split(".")[0] - - while True: - url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=gene&id={entrez_id}&retmode=xml" - response = requests.get(url, headers=headers) - if response.status_code == 200: - root = ET.fromstring(response.text) - - chromosome = "" - found_variant = False - k_found = False - start_coords = [] - end_coords = [] - orientation = "" - break - - elif response.status_code == 429: print( - bcolors.FAIL + f"1 Error 429: API rate limit exceeded during get {entrez_id} {variant} info, try again: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - else: - print(bcolors.FAIL + f"Error {response.status_code}: {response.text}" + bcolors.ENDC) + bcolors.ENDC + f"Error {response.status_code}: Error during get chromosome of {entrez_id}: {response.text}" + bcolors.ENDC) time.sleep(random.uniform(0.25, 0.5)) - if response.status_code == 200: - for elem in root.iter('Gene-commentary_accession'): - if elem.text.startswith('NC_'): - chromosome = elem.text - break - - for elem in root.iter(): - if elem.tag == "Gene-commentary_accession" and elem.text != variant: - if elem.text == chromosome: - k_found = True - elif len(start_coords) < 2 and len(end_coords) < 2: - continue - else: - break - - if k_found and elem.tag == "Gene-commentary_accession": - found_variant = True if elem.text == variant else False - elif k_found and found_variant and elem.tag == "Seq-interval_from": - start_coords.append(elem.text) - elif k_found and found_variant and elem.tag == "Seq-interval_to": - end_coords.append(elem.text) - elif k_found and found_variant and elem.tag == "Na-strand" and orientation == "": - orientation += elem.attrib.get("value") - - elif elem.tag == "Org-ref_taxname": - species_API = elem.text - - elif elem.tag == 'Gene-ref_locus': - gene_name = elem.text - - if orientation != "minus": - chrstart = int(start_coords[0]) + 1 - chrstop = int(end_coords[-1]) + 1 - else: - chrstart = int(end_coords[0]) + 1 - chrstop = int(start_coords[-1]) + 1 - - strand = "plus" if chrstart < chrstop else "minus" - - while True: - url2 = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id={entrez_id}&retmode=json&rettype=xml" - response = requests.get(url2, headers=headers) - if response.status_code == 200: - response_data = response.json() - if 'result' in response_data and str(entrez_id) in response_data['result']: - gene_info = response_data['result'][str(entrez_id)] - if 'chraccver' in str(gene_info): - chraccver = gene_info['genomicinfo'][0]['chraccver'] - title = NCBIdna.fetch_nc_info(chraccver) - else: - gene_name = 'Bad ID' - - print(bcolors.OKGREEN + f"Response 200: Info for {entrez_id} {variant} {gene_name} retrieved. " - f"Entrez_ID: {entrez_id} | Gene name: {gene_name} | ChrAccVer: {chraccver}" - f"ChrStart/ChrStop: {chrstart}/{chrstop} | Species: {species_API}" + bcolors.ENDC) - return variant, gene_name, title, chraccver, chrstart, chrstop, strand, species_API - - elif response.status_code == 429: - print( - bcolors.FAIL + f"2 Error 429: API rate limit exceeded during get {entrez_id} {variant} info, try again: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - else: - print(bcolors.FAIL + f"Error {response.status_code}: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - - @staticmethod - # Get gene information - def all_variant(entrez_id, from_id=False): - global headers - - if not from_id: - while True: - url2 = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id={entrez_id}&retmode=json&rettype=xml" - response = requests.get(url2, headers=headers) - - if response.status_code == 200: - response_data = response.json() - try: - gene_info = response_data['result'][str(entrez_id)] - chraccver = gene_info['genomicinfo'][0]['chraccver'] - print( - bcolors.OKGREEN + f"Response 200: Chromosome {chraccver} found for {entrez_id}: {response.text}" + bcolors.ENDC) - break - - except Exception as e: - print( - bcolors.WARNING + f"Response 200: Chromosome not found for {entrez_id}: {response.text} {e} {traceback.print_exc()}" + bcolors.ENDC) - all_variants = [("Error 200", None, None, None, None, None)] - print( - bcolors.WARNING + f"Response 200: Transcript not found(s) for {entrez_id}." + bcolors.ENDC) - return "Error 200", f"Transcript not found(s) for {entrez_id}." - - elif response.status_code == 429: - print( - bcolors.ENDC + f"Error {response.status_code}: API rate limit exceeded during get chromosome of {entrez_id}: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - else: - print( - bcolors.ENDC + f"Error {response.status_code}: Error during get chromosome of {entrez_id}: {response.text}" + bcolors.ENDC) - time.sleep(random.uniform(0.25, 0.5)) - while True: url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=gene&id={entrez_id}&retmode=xml" response = requests.get(url, headers=headers) @@ -524,11 +209,8 @@ def all_variant(entrez_id, from_id=False): tv = [] variants = [] gene_name = [] - species_API = [] chromosome = "" - all_variants = [] - for elem in root.iter(): if elem.tag == "Gene-commentary_label": if elem.text.startswith('transcript variant'): @@ -540,24 +222,22 @@ def all_variant(entrez_id, from_id=False): if elem.text not in variants: variants.append(elem.text) - elif elem.tag == "Org-ref_taxname": - species_API = elem.text - elif elem.tag == 'Gene-ref_locus': gene_name = elem.text - if not from_id: - for elem in root.iter('Gene-commentary_accession'): - if elem.text.startswith('NC_'): - chromosome = elem.text - break + for elem in root.iter('Gene-commentary_accession'): + if elem.text.startswith(("NC_", "NT_")): + chromosome = elem.text + break + + def calc_exon(root, variants): + all_variants = {} for variant in variants: - exon_coords = [] # Liste pour stocker toutes les paires (start, end) + exon_coords = [] found_variant = False k_found = False orientation = "" - for elem in root.iter(): if elem.tag == "Gene-commentary_accession" and elem.text != variant: if elem.text == chromosome: @@ -570,10 +250,10 @@ def all_variant(entrez_id, from_id=False): if k_found and elem.tag == "Gene-commentary_accession": found_variant = True if elem.text == variant else False elif k_found and found_variant and elem.tag == "Seq-interval_from": - start = int(elem.text) # Ajuster pour 1-based + start = int(elem.text) elif k_found and found_variant and elem.tag == "Seq-interval_to": - end = int(elem.text) # Ajuster pour 1-based - exon_coords.append((start, end)) # Ajouter le couple (start, end) pour chaque exon + end = int(elem.text) + exon_coords.append((start, end)) elif k_found and found_variant and elem.tag == "Na-strand" and orientation == "": orientation += elem.attrib.get("value") @@ -583,46 +263,63 @@ def all_variant(entrez_id, from_id=False): elif elem.tag == 'Gene-ref_locus': gene_name = elem.text - # Si des exons sont trouvés, normalisez les coordonnées if exon_coords: - if orientation == "minus": - # Inverser les exons pour le brin négatif exon_coords = [(end, start) for start, end in exon_coords] - # La première coordonnée du premier exon (start) devient la référence pour la normalisation first_exon_start = exon_coords[0][0] - # Normalisation: soustraction de la première coordonnée normalized_exon_coords = [ (abs(start - first_exon_start), abs(end - first_exon_start)) for start, end in exon_coords] - # Ajouter les coordonnées chromosomiques et normalisées à la liste - all_variants.append( - (variant, gene_name, chraccver, exon_coords, normalized_exon_coords, species_API)) + all_variants[variant] = { + 'entrez_id': entrez_id, + 'gene_name': gene_name, + 'genomic_info': title, + 'chraccver': chraccver, + 'strand': orientation, + 'exon_coords': exon_coords, + 'normalized_exon_coords': normalized_exon_coords, + 'species': species_API + } if len(all_variants) > 0: print( bcolors.OKGREEN + f"Response 200: Transcript(s) found(s) for {entrez_id}: {all_variants}" + bcolors.ENDC) - return all_variants, f"Transcript(s) found(s) for {entrez_id}: {all_variants}" + return all_variants, f"Transcript(s) found(s) for {entrez_id}: {list(all_variants.keys())}" else: - all_variants.append(("Error 200", None, None, None, None, None)) + all_variants["Error 200"] = { + "entrez_id": f"Transcript not found for {entrez_id}.", + "gene_name": None, + "chraccver": None, + "exon_coords": None, + "normalized_exon_coords": None, + "species": None + } print( bcolors.WARNING + f"Error 200: Transcript not found(s) for {entrez_id}." + bcolors.ENDC) - return "Error 200", f"Transcript not found(s) for {entrez_id}." + return all_variants, f"Error 200: Transcript not found(s) for {entrez_id}." - elif from_id: + if all_slice_forms is True: + all_variants, message = calc_exon(root, variants) + return all_variants, message + + elif all_slice_forms is False: if len(tv) > 0: if "transcript variant 1" in tv: associations = dict(zip(tv, variants)) - return associations["transcript variant 1"] + variant = associations["transcript variant 1"] else: - return variants[0] + variant = variants[0] elif len(tv) == 0 and len(variants) > 0: - return variants[0] + variant = variants[0] else: - return None + variant = None + + if variant is not None: + all_variants, message = calc_exon(root, [variant]) + return all_variants, message elif response.status_code == 429: print( @@ -635,37 +332,45 @@ def all_variant(entrez_id, from_id=False): @staticmethod # Get DNA sequence - def get_dna_sequence(gene_name, prom_term, upstream, downstream, chraccver, chrstart, chrstop): + def get_dna_sequence(gene_name, chraccver, chrstart, chrstop, seq_type, upstream=2000, downstream=2000): global headers - # Determine sens of gene + coordinate for upstream and downstream - if chrstop > chrstart: - start = (chrstart if prom_term.lower() == 'promoter' else chrstop) - upstream + 1 - end = (chrstart if prom_term.lower() == 'promoter' else chrstop) + downstream - else: - start = (chrstart if prom_term.lower() == 'promoter' else chrstop) + upstream + 1 - end = (chrstart if prom_term.lower() == 'promoter' else chrstop) - downstream + 2 - # Request for DNA sequence + if seq_type in ['mrna', 'rna']: + if chrstop > chrstart: + start = chrstart + 1 + end = chrstop + 1 + else: + start = chrstop + 1 + end = chrstart + 1 + elif seq_type in ['promoter', 'terminator']: + if chrstop > chrstart: + start = (chrstart if seq_type == 'promoter' else chrstop) - upstream + 1 + end = (chrstart if seq_type == 'promoter' else chrstop) + downstream + else: + start = (chrstart if seq_type == 'promoter' else chrstop) + upstream + 1 + end = (chrstart if seq_type == 'promoter' else chrstop) - downstream + 2 + while True: url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nuccore&id={chraccver}&from={start}&to={end}&rettype=fasta&retmode=text" response = requests.get(url, headers=headers) + if response.status_code == 200: - dna_sequence = response.text.split('\n', 1)[1].replace('\n', '') - if chrstop > chrstart: - sequence = dna_sequence - else: + dna_sequence = response.text.split('\n', 1)[1].replace('\n', + '') + + if chrstop < chrstart: sequence = NCBIdna.reverse_complement(dna_sequence) + else: + sequence = dna_sequence - print( - bcolors.OKGREEN + f"Response 200: DNA sequence for {gene_name} extracted: {sequence}" + bcolors.ENDC) + print(bcolors.OKGREEN + f"Response 200: DNA sequence for {gene_name} extracted: {sequence}" + bcolors.ENDC) return sequence elif response.status_code == 429: - print( - bcolors.FAIL + f"Error 429: API rate limit exceeded for DNA extraction of {gene_name}, try again: {response.text}" + bcolors.ENDC) + print(bcolors.FAIL + f"Error 429: API rate limit exceeded for DNA extraction of {gene_name}, try again." + bcolors.ENDC) time.sleep(random.uniform(0.25, 0.5)) else: - print(bcolors.FAIL + f"Error {response.status_code}: {response.text}" + bcolors.ENDC) + print(bcolors.OKGREEN + f"Error {response.status_code}: {response.text}" + bcolors.ENDC) time.sleep(random.uniform(0.25, 0.5)) @staticmethod @@ -679,6 +384,100 @@ def reverse_complement(dna_sequence): complement_sequence = ''.join(complement_dict.get(base, base) for base in reverse_sequence) return complement_sequence + @staticmethod + def extract_genomic_info(gene_id, gene_info, genome_version, species=None): + if gene_info and 'result' in gene_info and gene_id in gene_info['result']: + accession_dict = {} + gene_details = gene_info['result'][gene_id] + + time.sleep(1) + + location_hist = gene_details.get('locationhist', []) + if len(location_hist) == 0: + location_hist = gene_details.get('genomicinfo', []) + + for loc in location_hist: + nc_accver = loc.get('chraccver') + chrstart = loc.get('chrstart') + chrstop = loc.get('chrstop') + + if nc_accver: + base_accession = nc_accver + if base_accession not in accession_dict: + accession_dict[base_accession] = (chrstart, chrstop) + else: + existing_start, existing_stop = accession_dict[base_accession] + accession_dict[base_accession] = (min(existing_start, chrstart), max(existing_stop, chrstop)) + + nc_dict = accession_dict + + nc_dict = {base_accver: (chrstart, chrstop) for base_accver, (chrstart, chrstop) in nc_dict.items() if + base_accver.startswith(("NC_", "NT_"))} + + if species == "Rattus norvegicus" and 'locationhist' in gene_details: + location_hist = gene_details['locationhist'] + rs_annotations = [ + loc for loc in location_hist if loc.get('annotationrelease', "").startswith("RS_") + ] + rs_annotations.sort(key=lambda x: int(x['annotationrelease'].split('_')[1])) + if rs_annotations: + selected_annotation = rs_annotations[-1] if genome_version == "current" else rs_annotations[0] + selected_nc_accver = selected_annotation.get('chraccver') + if selected_nc_accver: + nc_dict = { + selected_nc_accver: (selected_annotation['chrstart'], selected_annotation['chrstop']) + } + + if nc_dict: + first_base = next(iter(nc_dict)).split('.')[0] + nc_dict = {base_accver: (chrstart, chrstop) for base_accver, (chrstart, chrstop) in nc_dict.items() if + base_accver.split('.')[0] == first_base} + + max_version = -1 + max_accver = None + max_coords = None + min_version = float('inf') + min_accver = None + min_coords = None + + for base_accver in nc_dict.keys(): + version = int(base_accver.split('.')[1]) + + if version > max_version: + max_version = version + max_accver = base_accver + max_coords = nc_dict[base_accver] + + if version < min_version: + min_version = version + min_accver = base_accver + min_coords = nc_dict[base_accver] + + if genome_version != "current": + title = NCBIdna.fetch_nc_info(min_accver) + return title, min_accver + else: + title = NCBIdna.fetch_nc_info(max_accver) + return title, max_accver + + @staticmethod + def fetch_nc_info(nc_accver): + global headers + + while True: + url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=nuccore&id={nc_accver}&retmode=json" + response = requests.get(url, headers=headers) + if response.status_code == 200: + nc_info = response.json() + try: + uid = nc_info['result']['uids'][0] + title = nc_info['result'][uid]['title'] + return title + except Exception as e: + time.sleep(random.uniform(0.25, 0.5)) + else: + time.sleep(random.uniform(0.25, 0.5)) + class IMO: @@ -759,48 +558,6 @@ def calculate_score(sequence, matrix): score = sum(matrix[base][position] for position, base in enumerate(sequence)) return score - ### - - @staticmethod - def calculate_weight(sequence, pwm, background_freq): - - def calculate_pwm_probability(sequence, pwm): - probability = 1.0 - for i, base in enumerate(sequence): - probability *= pwm[base][i] - return probability - - def calculate_background_probability(sequence, background_freq): - probability = 1.0 - for base in sequence: - probability *= background_freq.get(base) - return probability - - pwm_prob = calculate_pwm_probability(sequence, pwm) - background_prob = calculate_background_probability(sequence, background_freq) - return math.log2(pwm_prob / background_prob) - - @staticmethod - def calculate_min_max_weights(pwm, background_freq): - min_prob = 1.0 - max_prob = 1.0 - - for i in range(len(next(iter(pwm.values())))): - col = {base: pwm[base][i] / (background_freq.get(base) or 1e-64) for base in pwm} - - min_value = min(col.values()) - max_value = max(col.values()) - - min_prob *= min_value - max_prob *= max_value - - min_weight = math.log2(min_prob) - max_weight = math.log2(max_prob) - - return min_weight, max_weight - - ### - @staticmethod # Generate random sequences def generate_random_sequence(length, probabilities): @@ -820,27 +577,26 @@ def is_dna(dna_sequence): return isfasta @staticmethod - def transform_PWM(matrix, pseudocount=None): + def transform_PWM(matrix, pseudocount=None, bg=None): motif = motifs.Motif(counts=matrix) pseudocount_auto = calculate_pseudocounts(motif) pwm = motif.counts.normalize(pseudocounts=pseudocount_auto if pseudocount is None else pseudocount) background_preset = {'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25} - log_odds_matrix = pwm.log_odds(background=background_preset) + log_odds_matrix = pwm.log_odds(background=bg if bg is not None else background_preset) - return pseudocount_auto, pwm, log_odds_matrix + return pseudocount_auto, log_odds_matrix @staticmethod # Find with JASPAR and manual matrix - def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, calc_pvalue=None, + def individual_motif_finder(dna_sequences, threshold, matrix_input, progress_bar=None, calc_pvalue=None, tss_ge_distance=None, alldirection=None, pseudocount=None, bgnf=None): if calc_pvalue is not None: if calc_pvalue not in ["ATGCPreset", "ATGCProportion"]: raise ValueError("Use 'ATGCPreset' or 'ATGCProportion'") individual_motif_occurrences = [] - _, pwm, log_odds_matrix = IMO.transform_PWM(matrix, pseudocount) + _, log_odds_matrix = IMO.transform_PWM(matrix_input, pseudocount) matrices = IMO.transform_matrix(log_odds_matrix, alldirection) - pwm_weight = IMO.transform_matrix(pwm, alldirection) seq_length = len(matrices['+ f']['A']) @@ -881,6 +637,12 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, percentage_g = count_g / length_prom percentage_c = count_c / length_prom + background_freq = {'A': percentage_a, 'C': percentage_c, + 'G': percentage_g, 'T': percentage_t} if bgnf is None else bgnf + + _, pwm = IMO.transform_PWM(matrix_input, pseudocount, background_freq) + pwm_weight = IMO.transform_matrix(pwm, alldirection) + if calc_pvalue == 'ATGCProportion': random_sequences = IMO.generate_ranseq([percentage_a, percentage_c, percentage_g, percentage_t], seq_length, progress_bar) @@ -902,6 +664,8 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, # Max score per matrix max_score = sum(max(matrix[base][i] for base in matrix.keys()) for i in range(seq_length)) min_score = sum(min(matrix[base][i] for base in matrix.keys()) for i in range(seq_length)) + max_score_pwm = sum(max(pwm_weight[matrix_name][base][i] for base in pwm_weight[matrix_name].keys()) for i in range(seq_length)) + min_score_pwm = sum(min(pwm_weight[matrix_name][base][i] for base in pwm_weight[matrix_name].keys()) for i in range(seq_length)) if calc_pvalue == 'ATGCProportion': matrix_random_scores = [] @@ -921,15 +685,11 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, normalized_score = (score - min_score) / (max_score - min_score) position = int(i) + 1 - background_freq = {'A': percentage_a, 'C': percentage_c, - 'G': percentage_g, 'T': percentage_t} if bgnf is None else bgnf - weight = IMO.calculate_weight(seq, pwm_weight[matrix_name], background_freq) - min_weight, max_weight = IMO.calculate_min_max_weights(pwm_weight[matrix_name], background_freq) - relative_weight = (weight - min_weight) / (max_weight - min_weight) - found_positions.append((position, seq, normalized_score, score, weight, relative_weight)) + found_positions.append((position, seq, normalized_score, score)) if progress_bar: - progress_bar.update(1) + pb = 1 if region in ["prom.", "term.", "n.d"] else 2 + progress_bar.update(pb) # Sort positions in descending order of score percentage found_positions.sort(key=lambda x: x[1], reverse=True) @@ -938,43 +698,53 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, if len(found_positions) > 0: if threshold < 0.5: highest_normalized_score = max( - [normalized_score for _, _, normalized_score, _, _, _ in found_positions]) + [normalized_score for _, _, normalized_score, _, in found_positions]) if highest_normalized_score >= 0.6: threshold = highest_normalized_score - 0.10 else: threshold = 0.5 - for position, seq, normalized_score, score, weight, relative_weight in found_positions: - start_position = max(0, position - 4) - end_position = min(len(dna_sequence), position + len(seq) + 2) - sequence_with_context = dna_sequence[start_position:end_position] + for position, seq, normalized_score, score in found_positions: + if normalized_score >= threshold: + + weight = IMO.calculate_score(seq, pwm_weight[matrix_name]) + relative_weight = (weight - min_score_pwm) / (max_score_pwm - min_score_pwm) - sequence_parts = [] - for j in range(start_position, end_position): - if j < position - 1 or j + 2 > position + len(seq): - sequence_parts.append(sequence_with_context[j - start_position].lower()) - else: - sequence_parts.append(sequence_with_context[j - start_position].upper()) + start_position = max(0, position - 4) + end_position = min(len(dna_sequence), position + len(seq) + 2) + sequence_with_context = dna_sequence[start_position:end_position] - sequence_with_context = ''.join(sequence_parts) + sequence_parts = [] + for j in range(start_position, end_position): + if j < position - 1 or j + 2 > position + len(seq): + sequence_parts.append(sequence_with_context[j - start_position].lower()) + else: + sequence_parts.append(sequence_with_context[j - start_position].upper()) - if tss_ge_distance is not None: - tis_position = position - tss_ge_distance - 1 + sequence_with_context = ''.join(sequence_parts) - if strand_seq in ["plus", "minus"]: - ch_pos = int(tss_ch) - tis_position if strand_seq == "minus" else int( - tss_ch) + tis_position - else: - ch_pos = "n.d" + tis_position = None + ch_pos = None + if region in ["prom.", "term."]: + if tss_ge_distance is not None: + tis_position = position - tss_ge_distance - 1 - if normalized_score >= threshold: + if strand_seq in ["plus", "minus"] and tss_ch != 0: + ch_pos = int(tss_ch) - tis_position if strand_seq == "minus" else int( + tss_ch) + tis_position + + elif region in ["rna.", "mrna."]: + if strand_seq in ["plus", "minus"] and tss_ch != 0: + ch_pos = int(tss_ch) - position if strand_seq == "minus" else int( + tss_ch) + position if calc_pvalue is not None: p_value = (random_scores >= normalized_score).sum() / len(random_scores) row = [position] - if tss_ge_distance is not None: + if tis_position is not None: row.append(tis_position) + if ch_pos is not None: row.append(ch_pos) row += [sequence_with_context, "{:.6f}".format(normalized_score).ljust(12), @@ -986,10 +756,14 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, row += [strand, direction, name, species, region] individual_motif_occurrences.append(row) + if region in ["rna.", "mrna."]: + break + if len(individual_motif_occurrences) > 0: header = ["Position"] - if tss_ge_distance is not None: + if tis_position is not None: header.append("Rel Position") + if ch_pos is not None: header.append("Ch Position") header += ["Sequence", "Rel Score", 'Score', 'Rel Score Adj', 'Score Adj'] if calc_pvalue is not None: @@ -1005,24 +779,7 @@ def individual_motif_finder(dna_sequences, threshold, matrix, progress_bar=None, return "No consensus sequence found with the specified threshold.", False @staticmethod - # IUPAC code - def generate_iupac_variants(sequence, max_variant_allowed=None, progress_bar=None): - iupac_codes = { - "R": ["A", "G"], - "Y": ["C", "T"], - "M": ["A", "C"], - "K": ["G", "T"], - "W": ["A", "T"], - "S": ["C", "G"], - "B": ["C", "G", "T"], - "D": ["A", "G", "T"], - "H": ["A", "C", "T"], - "V": ["A", "C", "G"], - "N": ["A", "C", "G", "T"], - "-": ['-'], - ".": ['-'] - } - + def generate_iupac_variants(sequence): iupac_codes_score = { "R": 2, "Y": 2, @@ -1041,26 +798,36 @@ def generate_iupac_variants(sequence, max_variant_allowed=None, progress_bar=Non for base in sequence: if base.upper() in iupac_codes_score: total_variants *= iupac_codes_score[base.upper()] - if max_variant_allowed is not None: - if total_variants > max_variant_allowed: - sequence = f'Too many variants. Use - or . instead N. Limit: {max_variant_allowed} | Total variants : {total_variants}' - return sequence - if progress_bar is True: - pbar = tqdm(total=total_variants, desc='Generate variants from IUPAC code...', mininterval=0.1) - sequences = [sequence] - for i, base in enumerate(sequence): - if base.upper() in iupac_codes: - new_sequences = [] - for seq in sequences: - for alternative in iupac_codes[base.upper()]: - new_sequence = seq[:i] + alternative + seq[i + 1:] - new_sequences.append(new_sequence) - if progress_bar is True: - pbar.update(1) - sequences = new_sequences - - return sequences + iupac_frequencies = { + "A": {"A": 1.0, "C": 0.0, "G": 0.0, "T": 0.0}, + "C": {"A": 0.0, "C": 1.0, "G": 0.0, "T": 0.0}, + "G": {"A": 0.0, "C": 0.0, "G": 1.0, "T": 0.0}, + "T": {"A": 0.0, "C": 0.0, "G": 0.0, "T": 1.0}, + "R": {"A": 0.5, "C": 0.0, "G": 0.5, "T": 0.0}, + "Y": {"A": 0.0, "C": 0.5, "G": 0.0, "T": 0.5}, + "M": {"A": 0.5, "C": 0.5, "G": 0.0, "T": 0.0}, + "K": {"A": 0.0, "C": 0.0, "G": 0.5, "T": 0.5}, + "W": {"A": 0.5, "C": 0.0, "G": 0.0, "T": 0.5}, + "S": {"A": 0.0, "C": 0.5, "G": 0.5, "T": 0.0}, + "B": {"A": 0.0, "C": 0.333, "G": 0.333, "T": 0.333}, + "D": {"A": 0.333, "C": 0.0, "G": 0.333, "T": 0.333}, + "H": {"A": 0.333, "C": 0.333, "G": 0.0, "T": 0.333}, + "V": {"A": 0.333, "C": 0.333, "G": 0.333, "T": 0.0}, + "N": {"A": 0.25, "C": 0.25, "G": 0.25, "T": 0.25}, + "-": {"A": 0.0, "C": 0.0, "G": 0.0, "T": 0.0}, + ".": {"A": 0.0, "C": 0.0, "G": 0.0, "T": 0.0}} + + pwm_dict = {'A': [], 'C': [], 'G': [], 'T': []} + for char in sequence: + freq = iupac_frequencies.get(char, {"A": 0.0, "C": 0.0, "G": 0.0, "T": 0.0}) + + pwm_dict['A'].append(math.ceil(freq['A'] * total_variants)) + pwm_dict['C'].append(math.ceil(freq['C'] * total_variants)) + pwm_dict['G'].append(math.ceil(freq['G'] * total_variants)) + pwm_dict['T'].append(math.ceil(freq['T'] * total_variants)) + + return pwm_dict @staticmethod # is PWM good ? @@ -1143,9 +910,11 @@ def parse_fasta(individual_motif): return sequences @staticmethod - # generate Weblogo - def create_web_logo(sequences): - matrix = logomaker.alignment_to_matrix(sequences) + def create_web_logo(sequences, type_input): + if type_input == "PWM": + matrix = logomaker.alignment_to_matrix(sequences) + elif type_input == "UIPAC": + matrix = pd.DataFrame(sequences) logo = logomaker.Logo(matrix, color_scheme='classic') logo.style_spines(visible=True) logo.style_xticks(fmt='%d') @@ -1153,46 +922,51 @@ def create_web_logo(sequences): return logo @staticmethod - # Individual motif PWM and weblogo - def individual_motif_pwm(individual_motif): - sequences = IMO.parse_fasta(individual_motif) - sequences = [seq.upper() for seq in sequences] - - if len(sequences) > 0: - sequence_length = len(sequences[0]) - inconsistent_lengths = False - - for sequence in sequences[1:]: - if len(sequence) != sequence_length: - inconsistent_lengths = True - break + def individual_motif_pwm(individual_motif, type_input="PWM"): + if type_input == "PWM": + sequences = IMO.parse_fasta(individual_motif) + sequences = [seq.upper() for seq in sequences] + + if len(sequences) > 0: + sequence_length = len(sequences[0]) + inconsistent_lengths = False + + for sequence in sequences[1:]: + if len(sequence) != sequence_length: + inconsistent_lengths = True + break - if inconsistent_lengths: - raise Exception(f"Sequence lengths are not consistent.") + if inconsistent_lengths: + raise Exception(f"Sequence lengths are not consistent.") - else: - matrix = IMO.calculate_pwm(sequences) + else: + matrix = IMO.calculate_pwm(sequences) + + sequences_text = individual_motif + sequences = [] + current_sequence = "" + for line in sequences_text.splitlines(): + line = line.strip() + if line.startswith(">"): + if current_sequence: + sequences.append(current_sequence) + current_sequence = "" + else: + current_sequence += line - sequences_text = individual_motif - sequences = [] - current_sequence = "" - for line in sequences_text.splitlines(): - line = line.strip() - if line.startswith(">"): - if current_sequence: - sequences.append(current_sequence) - current_sequence = "" - else: - current_sequence += line + sequences.append(current_sequence) - sequences.append(current_sequence) + weblogo = IMO.create_web_logo(sequences, type_input) - weblogo = IMO.create_web_logo(sequences) + return matrix, weblogo - return matrix, weblogo + else: + raise Exception(f"You forget to input something :)") - else: - raise Exception(f"You forget to input something :)") + elif type_input == "UIPAC": + weblogo = IMO.create_web_logo(individual_motif, type_input) + + return None, weblogo @staticmethod def PWM_to_weblogo(pwm_str):