Skip to content

Commit

Permalink
fix: Chunk new endpoint about "load_collection_from_object_storage"
Browse files Browse the repository at this point in the history
  • Loading branch information
annelhote committed Apr 20, 2023
1 parent 33d797d commit 417831e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 35 deletions.
38 changes: 22 additions & 16 deletions project/server/main/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from project.server.main.aurehal import harvest_and_save_aurehal
from project.server.main.logger import get_logger
from project.server.main.parse import get_aurehal_from_OS, parse_hal
from project.server.main.utils_swift import get_objects_by_prefix, upload_object
from project.server.main.utils_swift import get_objects, get_paths_by_prefix, upload_object

logger = get_logger(__name__)

Expand Down Expand Up @@ -179,18 +179,24 @@ def load_collection_from_object_storage(collection_name: str) -> None:
logger.debug(f'dropping {collection_name} collection before insertion')
myclient = pymongo.MongoClient('mongodb://mongo:27017/')
myclient['hal'][collection_name].drop()
# 2. Collect all files from Object Storage
publications = get_objects_by_prefix(container='hal', prefix=f'{collection_name}/parsed/hal_parsed')
# 3. Extract oa_details from publications
oa_details_data = []
for publication in publications:
result = {
'hal_id': publication.get('hal_id'),
'oa_details': publication.get('oa_details')
}
oa_details_data.append(result)
# 4. Save it into mongo collection
current_file_oa_details = f'hal_oa_details.json'
json.dump(oa_details_data, open(current_file_oa_details, 'w'))
insert_data(collection_name=collection_name, output_file=current_file_oa_details)
return
# 2. Collect all paths from Object Storage container with prefix
paths = get_paths_by_prefix(container='hal', prefix=f'{collection_name}/parsed/hal_parsed')
logger.debug(f'{len(paths)} paths retrieved in the container with prefix')
for path in paths:
# 3. For each path, collect all objects
publications = get_objects(container='hal', path=path)
# publications = [item for sublist in objects for item in sublist]
oa_details_data = []
# 4. Extract oa_details from publications
for publication in publications:
result = {
'hal_id': publication.get('hal_id'),
'oa_details': publication.get('oa_details')
}
oa_details_data.append(result)
# 5. Save it into mongo collection
current_file_oa_details = f'hal_oa_details.json'
json.dump(oa_details_data, open(current_file_oa_details, 'w'))
insert_data(collection_name=collection_name, output_file=current_file_oa_details)
os.system(f'rm -rf {current_file_oa_details}')
return
26 changes: 13 additions & 13 deletions project/server/main/utils_swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_connection() -> swiftclient.Connection:


@retry(delay=2, tries=50)
def upload_object(container: str, source: str, target:str) -> str:
def upload_object(container: str, source: str, target: str) -> str:
logger.debug(f'Uploading {source} in {container} as {target}')
cmd = init_cmd + f' upload {container} {source} --object-name {target}' \
f' --segment-size 1048576000 --segment-threads 100'
Expand All @@ -61,29 +61,29 @@ def download_object(container: str, filename: str, out: str) -> None:
cmd = init_cmd + f' download {container} {filename} -o {out}'
os.system(cmd)


@retry(delay=2, tries=50)
def get_objects(container: str, path: str) -> list:
try:
connection = get_connection()
df = pd.read_json(BytesIO(connection.get_object(container, path)[1]), compression='gzip')
df = pd.read_json(BytesIO(connection.get_object(
container, path)[1]), compression='gzip')
except:
df = pd.DataFrame([])
return df.to_dict('records')


@retry(delay=2, tries=50)
def get_objects_by_prefix(container: str, prefix: str) -> list:
logger.debug(f'Retrieving object from container {container} and prefix {prefix}')
objects = []
def get_paths_by_prefix(container: str, prefix: str) -> list:
logger.debug(
f'Retrieving paths from container {container} and prefix {prefix}')
marker = None
keep_going = True
filenames = []
while keep_going:
connection = get_connection()
content = connection.get_container(container=container, marker=marker, prefix=prefix)[1]
filenames = [file['name'] for file in content]
objects += [get_objects(container=container, path=filename) for filename in filenames]
content = connection.get_container(
container=container, marker=marker, prefix=prefix)[1]
filenames += [file['name'] for file in content]
keep_going = len(content) == SWIFT_SIZE
if len(content) > 0:
marker = content[-1]['name']
logger.debug(f'Now {len(objects)} objects and counting')
flat_list = [item for sublist in objects for item in sublist]
return flat_list
return filenames
11 changes: 5 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
gunicorn==20.0.4
pymongo==3.8.0
lxml==4.6.3
pandas==1.2.5
pycountry==20.7.3
pymongo==3.8.0
python-dateutil~=2.8.1
python-keystoneclient==4.0.0
python-swiftclient==3.9.0
redis==3.5.3
rq==1.9.0
regex==2017.4.5
requests==2.20.0
lxml==4.6.3
python-dateutil~=2.8.1
pycountry==20.7.3
retry~=0.9.2
rq==1.9.0
SPARQLWrapper==1.8.5
tokenizers==0.10.1
Unidecode==1.3.2
SPARQLWrapper==1.8.5

0 comments on commit 417831e

Please sign in to comment.