diff --git a/config.ini b/config.ini index 6ed3f783..b7ead8f8 100644 --- a/config.ini +++ b/config.ini @@ -3,6 +3,7 @@ Port = 3000 BasePath = /htsget/v1 ChunkSize = 1000000 BucketSize = 10000 +MaxTries = 5 AGGREGATE_COUNT_THRESHOLD = [paths] diff --git a/entrypoint.sh b/entrypoint.sh index c2cf9de3..f0a0919c 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -22,7 +22,7 @@ candigv2_logging.logging.initialize()" # use the following for development #python3 htsget_server/server.py -python htsget_server/indexing.py & +bash htsget_server/indexing.sh & # use the following instead for production deployment cd htsget_server diff --git a/htsget_server/config.py b/htsget_server/config.py index c68c7ab1..528a8dd3 100644 --- a/htsget_server/config.py +++ b/htsget_server/config.py @@ -27,6 +27,8 @@ AGGREGATE_COUNT_THRESHOLD = config['DEFAULT']['AGGREGATE_COUNT_THRESHOLD'] +MAX_TRIES = int(config['DEFAULT']['MaxTries']) + TEST_KEY = os.getenv("HTSGET_TEST_KEY", "testtesttest") DEBUG_MODE = False diff --git a/htsget_server/database.py b/htsget_server/database.py index 8dd5cdf0..066b148c 100644 --- a/htsget_server/database.py +++ b/htsget_server/database.py @@ -5,7 +5,7 @@ from datetime import datetime from random import randint from time import sleep -from config import DB_PATH, BUCKET_SIZE, HTSGET_URL +from config import DB_PATH, BUCKET_SIZE, HTSGET_URL, MAX_TRIES from flask import Flask from candigv2_logging.logging import CanDIGLogger @@ -13,7 +13,7 @@ logger = CanDIGLogger(__file__) -engine = create_engine(DB_PATH, echo=False) +engine = create_engine(DB_PATH, echo=False, pool_timeout=5, pool_size=10) ObjectDBBase = declarative_base() @@ -363,23 +363,23 @@ def __repr__(self): """ Helper Functions""" def get_drs_object(object_id, expand=False, tries=1): - if tries > 3: + if tries > MAX_TRIES: raise Exception(f"Exception in get_drs_object {object_id}, too many tries") elif tries > 1: # if this isn't the first try, pause for a bit and then try again sleep(randint(1,10)/2) - with Session() as session: - try: + try: + with Session() as session: result = session.query(DrsObject).filter_by(id=object_id).one_or_none() if result is not None: new_obj = json.loads(str(result)) # if expand: # expand doesn't do anything on this DRS server return new_obj - except Exception as e: - logger.debug(f"Exception in get_drs_object {object_id}: {str(e)}, trying again") - return get_drs_object(object_id, expand, tries=tries+1) - return None + except Exception as e: + logger.debug(f"Exception in get_drs_object {object_id}: {str(e)}, trying again") + return get_drs_object(object_id, expand, tries=tries+1) + return None def list_drs_objects(cohort_id=None): @@ -394,109 +394,130 @@ def list_drs_objects(cohort_id=None): return None -def create_drs_object(obj): - with Session() as session: - new_object = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() - if new_object is None: - new_object = DrsObject() - - # required fields: - new_object.id = obj['id'] - if 'name' in obj: - new_object.name = obj['name'] - else: - new_object.name = obj['id'] - - # optional string fields - new_object.self_uri = f'{HTSGET_URL.replace("http://", "drs://").replace("https://", "drs://")}/{new_object.name}' - if 'created_time' in obj: - new_object.created_time = obj['created_time'] - if 'updated_time' in obj: - new_object.updated_time = obj['updated_time'] - if 'mime_type' in obj: - new_object.mime_type = obj['mime_type'] - if 'version' in obj: - new_object.version = obj['version'] - if 'size' in obj: - new_object.size = obj['size'] - if 'description' in obj: - new_object.description = obj['description'] - if 'cohort' in obj: - cohort = session.query(Cohort).filter_by(id=obj['cohort']).one_or_none() - if cohort is None: - create_cohort({"id": obj["cohort"], "drsobjects": []}) - new_object.cohort_id = obj['cohort'] - - # json arrays stored as strings - if 'checksums' in obj: - new_object.checksums = json.dumps(obj['checksums']) - if 'aliases' in obj: - new_object.aliases = json.dumps(obj['aliases']) - - # access methods is special - if 'access_methods' not in obj: - obj['access_methods'] = [] - # only add access methods after removing any previous ones - if len(new_object.access_methods) != 0: - for method in new_object.access_methods: - session.delete(method) - session.commit() - for method in obj['access_methods']: - new_method = AccessMethod() - new_method.drs_object_id = new_object.id - new_method.type = method['type'] - if 'region' in method: - new_method.region = method['region'] - if 'access_id' in method: - new_method.access_id = method['access_id'] - if 'access_url' in method: - new_method.url = method['access_url']['url'] - if 'headers' in method['access_url']: - new_method.headers = json.dumps(method['access_url']['headers']) - session.add(new_method) - - # contents objects are special - if 'contents' not in obj: - obj['contents'] = [] - if len(new_object.contents) != 0: - for contents in new_object.contents: - session.delete(contents) - session.commit() - for contents in obj['contents']: - new_contents = ContentsObject() - new_contents.drs_object_id = new_object.id - new_contents.name = contents['name'] - if 'drs_uri' in contents: - new_contents.drs_uri = json.dumps(contents['drs_uri']) - if 'contents' in contents: - new_contents.contents = json.dumps(contents['contents']) - if 'id' in contents: - new_contents.contents_id = contents['id'] - session.add(new_contents) - session.add(new_object) - session.commit() +def create_drs_object(obj, tries=1): + logger.debug(f"create_drs_object {obj['id']}") + if tries > MAX_TRIES: + raise Exception(f"Exception in create_drs_object {obj['id']}, too many tries") + elif tries > 1: + # if this isn't the first try, pause for a bit and then try again + sleep(randint(1,10)/2) + try: + with Session() as session: + new_object = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() + if new_object is None: + new_object = DrsObject() + + # required fields: + new_object.id = obj['id'] + if 'name' in obj: + new_object.name = obj['name'] + else: + new_object.name = obj['id'] + + # optional string fields + new_object.self_uri = f'{HTSGET_URL.replace("http://", "drs://").replace("https://", "drs://")}/{new_object.name}' + if 'created_time' in obj: + new_object.created_time = obj['created_time'] + if 'updated_time' in obj: + new_object.updated_time = obj['updated_time'] + if 'mime_type' in obj: + new_object.mime_type = obj['mime_type'] + if 'version' in obj: + new_object.version = obj['version'] + if 'size' in obj: + new_object.size = obj['size'] + if 'description' in obj: + new_object.description = obj['description'] + if 'cohort' in obj: + cohort = session.query(Cohort).filter_by(id=obj['cohort']).one_or_none() + if cohort is None: + create_cohort({"id": obj["cohort"], "drsobjects": []}) + new_object.cohort_id = obj['cohort'] + + # json arrays stored as strings + if 'checksums' in obj: + new_object.checksums = json.dumps(obj['checksums']) + if 'aliases' in obj: + new_object.aliases = json.dumps(obj['aliases']) + + # access methods is special + if 'access_methods' not in obj: + obj['access_methods'] = [] + # only add access methods after removing any previous ones + if len(new_object.access_methods) != 0: + for method in new_object.access_methods: + session.delete(method) + session.commit() + for method in obj['access_methods']: + new_method = AccessMethod() + new_method.drs_object_id = new_object.id + new_method.type = method['type'] + if 'region' in method: + new_method.region = method['region'] + if 'access_id' in method: + new_method.access_id = method['access_id'] + if 'access_url' in method: + new_method.url = method['access_url']['url'] + if 'headers' in method['access_url']: + new_method.headers = json.dumps(method['access_url']['headers']) + session.add(new_method) + + # contents objects are special + if 'contents' not in obj: + obj['contents'] = [] + if len(new_object.contents) != 0: + for contents in new_object.contents: + session.delete(contents) + session.commit() + for contents in obj['contents']: + new_contents = ContentsObject() + new_contents.drs_object_id = new_object.id + new_contents.name = contents['name'] + if 'drs_uri' in contents: + new_contents.drs_uri = json.dumps(contents['drs_uri']) + if 'contents' in contents: + new_contents.contents = json.dumps(contents['contents']) + if 'id' in contents: + new_contents.contents_id = contents['id'] + session.add(new_contents) + session.add(new_object) + session.commit() - # if we have reference_genome info, it's a GenomicDrsObject and needs a variantfile: - if 'reference_genome' in obj: - create_variantfile({"id": obj["id"], "reference_genome": obj["reference_genome"]}) + # if we have reference_genome info, it's a GenomicDrsObject and needs a variantfile: + if 'reference_genome' in obj: + create_variantfile({"id": obj["id"], "reference_genome": obj["reference_genome"]}) - result = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() - return json.loads(str(result)) + result = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() + logger.debug(f"DONE create_drs_object {obj['id']}") + return json.loads(str(result)) + except Exception as e: + logger.debug(f"Exception in create_drs_object {obj['id']}: {str(e)}, trying again") + return create_drs_object(obj, tries=tries+1) -def delete_drs_object(obj_id): - with Session() as session: - new_object = session.query(DrsObject).filter_by(id=obj_id).one() - cohort = session.query(Cohort).filter_by(id=new_object.cohort_id).one_or_none() - if new_object.description in ["wgs", "wts"]: - # this is a GenomicDrsObject; we need to delete any indexed variantfiles - variantfiles = session.query(VariantFile).filter_by(drs_object_id=new_object.id).all() - for vf in variantfiles: - session.delete(vf) - session.commit() - session.delete(new_object) - session.commit() - return json.loads(str(new_object)) +def delete_drs_object(obj_id, tries=1): + if tries > MAX_TRIES: + raise Exception(f"Exception in delete_drs_object {obj_id}, too many tries") + elif tries > 1: + # if this isn't the first try, pause for a bit and then try again + sleep(randint(1,10)/2) + try: + with Session() as session: + new_object = session.query(DrsObject).filter_by(id=obj_id).one() + cohort = session.query(Cohort).filter_by(id=new_object.cohort_id).one_or_none() + if new_object.description in ["wgs", "wts"]: + # this is a GenomicDrsObject; we need to delete any indexed variantfiles + variantfiles = session.query(VariantFile).filter_by(drs_object_id=new_object.id).all() + for vf in variantfiles: + session.delete(vf) + session.commit() + session.delete(new_object) + session.commit() + return json.loads(str(new_object)) + except Exception as e: + logger.debug(f"Exception in delete_drs_object {obj_id}: {str(e)}, trying again") + return delete_drs_object(obj_id, tries=tries+1) + return None def get_cohort(cohort_id): @@ -517,35 +538,54 @@ def list_cohorts(): return None -def create_cohort(obj): - with Session() as session: - new_cohort = session.query(Cohort).filter_by(id=obj['id']).one_or_none() - if new_cohort is None: - new_cohort = Cohort() - new_cohort.id = obj['id'] - for drs_uri in obj['drsobjects']: - new_drs = session.query(DrsObject).filter_by(self_uri=drs_uri).one_or_none() - if new_drs is not None: - new_cohort.associated_drs.append(new_drs) - session.add(new_cohort) - session.commit() - result = session.query(Cohort).filter_by(id=obj['id']).one_or_none() - if result is not None: - return json.loads(str(result)) - return None +def create_cohort(obj, tries=1): + if tries > MAX_TRIES: + raise Exception(f"Exception in create_cohort {obj['id']}, too many tries") + elif tries > 1: + # if this isn't the first try, pause for a bit and then try again + sleep(randint(1,10)/2) + try: + with Session() as session: + new_cohort = session.query(Cohort).filter_by(id=obj['id']).one_or_none() + if new_cohort is None: + new_cohort = Cohort() + new_cohort.id = obj['id'] + for drs_uri in obj['drsobjects']: + new_drs = session.query(DrsObject).filter_by(self_uri=drs_uri).one_or_none() + if new_drs is not None: + new_cohort.associated_drs.append(new_drs) + session.add(new_cohort) + session.commit() + result = session.query(Cohort).filter_by(id=obj['id']).one_or_none() + if result is not None: + return json.loads(str(result)) + except Exception as e: + logger.debug(f"Exception in create_cohort {obj['id']}: {str(e)}, trying again") + return create_cohort(obj, tries=tries+1) + return None -def delete_cohort(cohort_id): - with Session() as session: - cohort_objs = session.query(Cohort).filter_by(id=cohort_id).all() - for cohort_obj in cohort_objs: - for drs_obj in cohort_obj.associated_drs: - session.delete(drs_obj) +def delete_cohort(cohort_id, tries=1): + if tries > MAX_TRIES: + raise Exception(f"Exception in delete_cohort {cohort_id}, too many tries") + elif tries > 1: + # if this isn't the first try, pause for a bit and then try again + sleep(randint(1,10)/2) + try: + with Session() as session: + cohort_objs = session.query(Cohort).filter_by(id=cohort_id).all() + for cohort_obj in cohort_objs: + for drs_obj in cohort_obj.associated_drs: + session.delete(drs_obj) + session.commit() + session.delete(cohort_obj) session.commit() - session.delete(cohort_obj) session.commit() - session.commit() - return json.loads(str(cohort_objs)) + return json.loads(str(cohort_objs)) + except Exception as e: + logger.debug(f"Exception in delete_cohort {cohort_id}: {str(e)}, trying again") + return delete_cohort(cohort_id, tries=tries+1) + return None def list_refseqs(reference_genome="hg38"): @@ -588,43 +628,52 @@ def get_chromosome_for_refseq(refseq=None): def get_variantfile(variantfile_id, tries=1): - if tries > 3: + if tries > MAX_TRIES: raise Exception(f"Exception in get_variantfile {variantfile_id}, too many tries") elif tries > 1: # if this isn't the first try, pause for a bit and then try again sleep(randint(1,10)/2) - with Session() as session: - try: + try: + with Session() as session: result = session.query(VariantFile).filter_by(id=variantfile_id).one_or_none() if result is not None: new_obj = json.loads(str(result)) return new_obj - except Exception as e: - logger.debug(f"Exception in get_variantfile {variantfile_id}: {str(e)}, trying again") - return get_variantfile(variantfile_id, tries=tries+1) - return None + except Exception as e: + logger.debug(f"Exception in get_variantfile {variantfile_id}: {str(e)}, trying again") + return get_variantfile(variantfile_id, tries=tries+1) + return None -def create_variantfile(obj): +def create_variantfile(obj, tries=1): # obj = {"id", "reference_genome"} - with Session() as session: - new_variantfile = session.query(VariantFile).filter_by(id=obj['id']).one_or_none() - if new_variantfile is None: - new_variantfile = VariantFile() - new_variantfile.indexed = 0 - new_variantfile.chr_prefix = '' - new_variantfile.id = obj['id'] - new_variantfile.reference_genome = obj['reference_genome'] - new_drs = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() - if new_drs is not None: - new_variantfile.drs_object_id = new_drs.id - else: - raise Exception(f"Cannot create variantfile {obj['id']}: no corresponding DRS object") - session.add(new_variantfile) - session.commit() - result = session.query(VariantFile).filter_by(id=obj['id']).one_or_none() - if result is not None: - return json.loads(str(result)) + if tries > MAX_TRIES: + raise Exception(f"Exception in create_variantfile {obj['id']}, too many tries") + elif tries > 1: + # if this isn't the first try, pause for a bit and then try again + sleep(randint(1,10)/2) + try: + with Session() as session: + new_variantfile = session.query(VariantFile).filter_by(id=obj['id']).one_or_none() + if new_variantfile is None: + new_variantfile = VariantFile() + new_variantfile.indexed = 0 + new_variantfile.chr_prefix = '' + new_variantfile.id = obj['id'] + new_variantfile.reference_genome = obj['reference_genome'] + new_drs = session.query(DrsObject).filter_by(id=obj['id']).one_or_none() + if new_drs is not None: + new_variantfile.drs_object_id = new_drs.id + else: + raise Exception(f"Cannot create variantfile {obj['id']}: no corresponding DRS object") + session.add(new_variantfile) + session.commit() + result = session.query(VariantFile).filter_by(id=obj['id']).one_or_none() + if result is not None: + return json.loads(str(result)) + except Exception as e: + logger.debug(f"Exception in create_variantfile {obj['id']}: {str(e)}, trying again") + return create_variantfile(obj, tries=tries+1) return None @@ -901,13 +950,13 @@ def get_contig_name_in_variantfile(obj): def search(obj, tries=1): # obj = {'region', 'headers'} - if tries > 3: + if tries > MAX_TRIES: raise Exception(f"Exception in search, too many tries") elif tries > 1: # if this isn't the first try, pause for a bit and then try again sleep(randint(1,10)/2) - with Session() as session: - try: + try: + with Session() as session: vfile = aliased(VariantFile) q = select(vfile.drs_object_id, vfile.reference_genome, PositionBucket.id, PositionBucket.pos_bucket_id).select_from(PositionBucket).join(vfile.associated_pos_buckets).join(vfile.associated_headers) if 'headers' in obj: @@ -942,7 +991,7 @@ def search(obj, tries=1): curr_result['variantcount'] += bv.bucket_count results.append(curr_result) return results - except Exception as e: - logger.debug(f"Exception in search: {str(e)}, trying again") - return search(obj, tries=tries+1) + except Exception as e: + logger.debug(f"Exception in search: {str(e)}, trying again") + return search(obj, tries=tries+1) return None \ No newline at end of file diff --git a/htsget_server/indexing.sh b/htsget_server/indexing.sh new file mode 100644 index 00000000..6f6dd1fc --- /dev/null +++ b/htsget_server/indexing.sh @@ -0,0 +1,4 @@ +until python htsget_server/indexing.py; do + echo "Indexing crashed with exit code $?. Respawning.." >&2 + sleep 1 +done \ No newline at end of file