From d61a0d3346394e45cfba2c0c10365e2f720a22f1 Mon Sep 17 00:00:00 2001 From: Jess <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:49:38 -0500 Subject: [PATCH] Cesm cat builder (#726) * move call_build function to base class and define a class parser attribute that is modified for each child class at initialization * move parsers and json utilities to a separate module * refine gfdl parsers * add function to get metadata from file attributes * add debugging statements to catalog_builder make reference dict definition a function refine the catalog population procedure need to define time_range * fix cesm parser frequency def add procedure to generate time_range to parsers add procedure to fix units in catalog * add entries to CESM and GFDL fieldlists * clean up cesm parser include file name parts in cesm parser frequency search * clean up print statements --- data/fieldlist_CESM.jsonc | 92 +++++- data/fieldlist_GFDL.jsonc | 4 +- tools/catalog_builder/catalog_builder.py | 366 ++--------------------- tools/catalog_builder/parsers.py | 312 +++++++++++++++++++ 4 files changed, 424 insertions(+), 350 deletions(-) create mode 100644 tools/catalog_builder/parsers.py diff --git a/data/fieldlist_CESM.jsonc b/data/fieldlist_CESM.jsonc index 7e500ee3c..f5d426581 100644 --- a/data/fieldlist_CESM.jsonc +++ b/data/fieldlist_CESM.jsonc @@ -37,12 +37,26 @@ }, "lev": { "standard_name": "atmosphere_hybrid_sigma_pressure_coordinate", - "units": "level", // equivalent to '1' + "units": "level", + // equivalent to '1' "positive": "down", "axis": "Z" }, + "hyam": { + "standard_name": "hybrid A coefficient at layer midpoints", + "long_name": "hybrid A coefficient at layer midpoints", + "units": "1", + "axis": "Z" + + }, + "hybm": { + "standard_name": "hybrid B coefficient at layer midpoints", + "long_name": "hybrid B coefficient at layer midpoints", + "units": "1", + "axis": "Z" + }, "z_t": { - "standard_name": "depth from surface to midpoint of layer", + "standard_name": "depth_from_surface_to_midpoint_of_layer", "units": "centimeters", "positive": "down", "axis": "Z" @@ -61,6 +75,27 @@ "scalar_coord_templates": {"plev": "U{value}"}, "ndim": 4 }, + "U250": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 250 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, + "U200": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 200 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, + "U850": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 850 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, "V": { "standard_name": "northward_wind", "realm":"atmos", @@ -68,6 +103,27 @@ "scalar_coord_templates": {"plev": "V{value}"}, "ndim": 4 }, + "V250": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 250 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, + "V200": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 200 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, + "V850": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 850 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, "Z3": { "standard_name": "geopotential_height", "units": "m", @@ -78,10 +134,23 @@ }, "Z500": { "standard_name": "geopotential_height", - "long_name": "geopotential height at 500 hPa", + "long_name": "geopotential height at 500 mbar pressure surface", + "realm": "atmos", + "units": "m", + "ndim": 3 + }, + "Z850": { + "standard_name": "geopotential_height", + "long_name": "geopotential height at 850 mbar pressure surface", + "realm": "atmos", + "units": "m", + "ndim": 3 + }, + "Z250": { + "standard_name": "geopotential_height", + "long_name": "geopotential height at 250 mbar pressure surface", "realm": "atmos", "units": "m", - // note: 4d name is 'Z3' but Z500 = height at 500 mb, etc. "ndim": 3 }, "Q": { @@ -96,6 +165,13 @@ "units": "Pa s-1", "scalar_coord_templates": {"plev": "OMEGA{value}"}, "ndim": 4 + }, + "OMEGA500": { + "standard_name": "lagrangian_tendency_of_air_pressure", + "long_name": "Vertical velocity at 500 mbar pressure surface", + "realm": "atmos", + "units": "Pa s-1", + "ndim": 3 }, "TS": { "standard_name": "surface_temperature", @@ -349,6 +425,13 @@ "realm": "atmos", "units": "K", "ndim": 4 + }, + "T250": { + "standard_name": "air_temperature", + "long_name": "air temperature at 250 mbar pressure surface", + "realm": "atmos", + "units": "K", + "ndim": 3 }, // prw: Column Water Vapor (precipitable water vapor), units = mm (or kg/m^2) "prw": { @@ -358,6 +441,7 @@ "units": "kg m-2", "ndim": 3 } + // Variables for SM_ET_coupling module // "mrsos": { // "standard_name": "mass_content_of_water_in_soil_layer", diff --git a/data/fieldlist_GFDL.jsonc b/data/fieldlist_GFDL.jsonc index 16c225a38..82ef5dfb0 100644 --- a/data/fieldlist_GFDL.jsonc +++ b/data/fieldlist_GFDL.jsonc @@ -625,14 +625,14 @@ "ndim": 3 }, "zg500": { - "standard_name": "", + "standard_name": "geopotential_height", "long_name": "Geopotential Height at 500 hPa", "realm": "atmos", "units": "m", "ndim": 3 }, "zg": { - "standard_name": "", + "standard_name": "geopotential_heihgt", "long_name": "Geopotential Height", "realm": "atmos", "units": "m", diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index 1c7c1098e..0a9d037b6 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -15,19 +15,13 @@ import click import intake import os -import io -import json -import pathlib import sys import time -import traceback import typing -import collections -import xarray as xr import yaml +import parsers from datetime import timedelta from ecgtools import Builder -from ecgtools.builder import INVALID_ASSET, TRACEBACK from ecgtools.parsers.cmip import parse_cmip6 from ecgtools.parsers.cesm import parse_cesm_timeseries import logging @@ -65,302 +59,6 @@ def __getitem__(self, n): # instantiate the class maker catalog_class = ClassMaker() -def strip_comments(str_: str, delimiter=None): - """Remove comments from *str_*. Comments are taken to start with an - arbitrary *delimiter* and run to the end of the line. - """ - # would be better to use shlex, but that doesn't support multi-character - # comment delimiters like '//' - escaped_quote_placeholder = '\v' # no one uses vertical tab - - if not delimiter: - return str_ - lines = str_.splitlines() - for i in range(len(lines)): - # get rid of lines starting with delimiter - if lines[i].startswith(delimiter): - lines[i] = '' - continue - # handle delimiters midway through a line: - # If delimiter appears quoted in a string, don't want to treat it as - # a comment. So for each occurrence of delimiter, count number of - # "s to its left and only truncate when that's an even number. - # First we get rid of -escaped single "s. - replaced_line = lines[i].replace('\\\"', escaped_quote_placeholder) - line_parts = replaced_line.split(delimiter) - quote_counts = [s.count('"') for s in line_parts] - j = 1 - while sum(quote_counts[:j]) % 2 != 0: - if j >= len(quote_counts): - raise ValueError(f"Couldn't parse line {i+1} of string.") - j += 1 - replaced_line = delimiter.join(line_parts[:j]) - lines[i] = replaced_line.replace(escaped_quote_placeholder, '\\\"') - # make lookup table of correct line numbers, taking into account lines we - # dropped - line_nos = [i for i, s in enumerate(lines) if (s and not s.isspace())] - # join lines, stripping blank lines - new_str = '\n'.join([s for s in lines if (s and not s.isspace())]) - return new_str, line_nos -def parse_json(str_: str): - """Parse JSONC (JSON with ``//``-comments) string *str_* into a Python object. - Comments are discarded. Wraps standard library :py:func:`json.loads`. - - Syntax errors in the input (:py:class:`~json.JSONDecodeError`) are passed - through from the Python standard library parser. We correct the line numbers - mentioned in the errors to refer to the original file (i.e., with comments.) - """ - def _pos_from_lc(lineno, colno, str_): - # fix line number, since we stripped commented-out lines. JSONDecodeError - # computes line/col no. in error message from character position in string. - lines = str_.splitlines() - return (colno - 1) + sum((len(line) + 1) for line in lines[:lineno]) - - (strip_str, line_nos) = strip_comments(str_, delimiter='//') - try: - parsed_json = json.loads(strip_str, - object_pairs_hook=collections.OrderedDict) - except json.JSONDecodeError as exc: - # fix reported line number, since we stripped commented-out lines. - assert exc.lineno <= len(line_nos) - raise json.JSONDecodeError( - msg=exc.msg, doc=str_, - pos=_pos_from_lc(line_nos[exc.lineno-1], exc.colno, str_) - ) - except UnicodeDecodeError as exc: - raise json.JSONDecodeError( - msg=f"parse_json received UnicodeDecodeError:\n{exc}", - doc=strip_str, pos=0 - ) - - return parsed_json -def read_json(file_path: str, log=_log) -> dict: - """Reads a struct from a JSONC file at *file_path*. - """ - log.debug('Reading file %s', file_path) - try: - with io.open(file_path, 'r', encoding='utf-8') as file_: - str_ = file_.read() - except Exception as exc: - # something more serious than missing file - _log.critical("Caught exception when trying to read %s: %r", file_path, exc) - exit(1) - return parse_json(str_) - -freq_opts = ['mon', - 'day', - 'daily', - '6hr', - '3hr', - '1hr', - 'subhr', - 'annual', - 'year'] - -# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate -# required catalog fields -def parse_gfdl_am5_data(file_name: str): - - file = pathlib.Path(file_name) # uncomment when ready to run - - num_dir_parts = len(file.parts) # file name index = num_parts 1 - # isolate file from rest of path - stem = file.stem - # split the file name into components based on - # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc - split = stem.split('.') - num_file_parts = len(split) - realm = split[0] - cell_methods = "" - cell_measures = "" - time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] - variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" - chunk_freq = file.parts[num_dir_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - long_name = "" - standard_name = "" - units = "" - output_frequency = "" - file_freq = file.parts[num_dir_parts - 3] - - for f in freq_opts: - if f in file_freq: - output_frequency = f - break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' - - # read metadata from the appropriate fieldlist - if 'cmip' in realm.lower(): - gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') - else: - gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') - try: - gfdl_info = read_json(gfdl_fieldlist, log=_log) - except IOError: - print("Unable to open file", gfdl_fieldlist) - sys.exit(1) - - if hasattr(gfdl_info['variables'], variable_id): - var_metadata = gfdl_info['variables'].get(variable_id) - else: - raise KeyError(f'{variable_id} not found in {gfdl_fieldlist}') - - if hasattr(var_metadata, 'standard_name'): - standard_name = var_metadata.standard_name - if hasattr(var_metadata, 'long_name'): - long_name = var_metadata.long_name - if hasattr(var_metadata, 'units'): - units = var_metadata.units - - try: - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } - - return info - - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} - - -# custom parser for pp data stored on GFDL archive filesystem -# assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] - -def parse_gfdl_pp_ts(file_name: str): - # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run - # file = pathlib.Path(files[0]) # debug comment when ready to run - file = pathlib.Path(file_name) # uncomment when ready to run - num_parts = len(file.parts) # file name index = num_parts 1 - # isolate file from rest of path - stem = file.stem - # split the file name into components based on _ - split = stem.split('.') - realm = split[0] - cell_methods = "" - cell_measures = "" - time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] - variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" - chunk_freq = file.parts[num_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - - output_frequency = "" - file_freq = file.parts[num_parts - 3] - for f in freq_opts: - if f in file_freq: - output_frequency = f - break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' - - try: - # call to xr.open_dataset required by ecgtoos.builder.Builder - with xr.open_dataset(file, chunks={}, decode_times=False) as ds: - variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] - if variable_id not in variable_list: - print(f'Asset variable {variable_id} not found in {file}') - exit(1) - standard_name = "" - long_name = "" - if 'standard_name' in ds[variable_id].attrs: - standard_name = ds[variable_id].attrs['standard_name'] - standard_name.replace("", "_") - if 'long_name' in ds[variable_id].attrs: - long_name = ds[variable_id].attrs['long_name'] - if len(long_name) == 0 and len(standard_name) == 0: - print('Asset variable does not contain a standard_name or long_name attribute') - exit(1) - - if 'cell_methods' in ds[variable_id].attrs: - cell_methods = ds[variable_id].attrs['cell_methods'] - if 'cell_measures' in ds[variable_id].attrs: - cell_measures = ds[variable_id].attrs['cell_measures'] - - units = ds[variable_id].attrs['units'] - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } - - return info - - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} - - class CatalogBase(object): """Catalog base class\n """ @@ -386,6 +84,7 @@ def __init__(self): self.variable_col_name = "variable_id" self.path_col_name = "path" self.cb = None + self.file_parse_method = "" def cat_builder(self, data_paths: list, exclude_patterns=None, @@ -401,11 +100,18 @@ def cat_builder(self, data_paths: list, depth=dir_depth, exclude_patterns=exclude_patterns, # Exclude the following directories include_patterns=include_patterns, - joblib_parallel_kwargs={'n_jobs': nthreads}, # Number of jobs to execute - + joblib_parallel_kwargs={'n_jobs': nthreads} # Number of jobs to execute - # should be equal to # threads you are using - extension='.nc' # extension of target file ) + def call_build(self, file_parse_method=None): + if file_parse_method is None: + file_parse_method = self.file_parse_method + # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py + # for more parsing methods + self.cb = self.cb.build(parsing_func=file_parse_method) + print('Build complete') + def call_save(self, output_dir: str, output_filename: str ): @@ -434,14 +140,7 @@ class CatalogCMIP(CatalogBase): def __init__(self): super().__init__() - - def call_build(self, file_parse_method=None): - if file_parse_method is None: - file_parse_method = parse_cmip6 - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) - print('Build complete') + self.file_parse_method = parse_cmip6 @catalog_class.maker @@ -458,15 +157,7 @@ def __init__(self): 'member_id', 'realm' ] - def call_build(self, - file_parse_method=None): - - if file_parse_method is None: - file_parse_method = parse_gfdl_pp_ts - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) - print('Build complete') + self.file_parse_method = parsers.parse_gfdl_pp_ts @catalog_class.maker @@ -478,27 +169,14 @@ class CatalogCESM(CatalogBase): def __init__(self): super().__init__() self.groupby_attrs = [ - 'component', - 'stream', - 'case', - 'frequency' - ] - - self.xarray_aggregations = [ - {'type': 'union', 'attribute_name': 'variable_id'}, - { - 'type': 'join_existing', - 'attribute_name': 'date', - 'options': {'dim': 'time', 'coords': 'minimal', 'compat': 'override'} - } + 'activity_id', + 'institution_id', + 'experiment_id', + 'frequency', + 'member_id', + 'realm' ] - - def call_build(self, file_parse_method=None): - if file_parse_method is None: - file_parse_method = parse_cesm_timeseries - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cesm.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) + self.file_parse_method = parsers.parse_cesm def load_config(config): @@ -542,7 +220,7 @@ def main(config: str): file_parse_method = None if conf['dataset_id'] is not None: if 'am5' in conf['dataset_id'].lower(): - file_parse_method = parse_gfdl_am5_data + file_parse_method = parsers.parse_gfdl_am5_data # build the catalog print('Building the catalog') @@ -554,7 +232,7 @@ def main(config: str): print("Time to build catalog:", timedelta(seconds=end_time - start_time)) # save the catalog - print('Saving catalog to', conf['output_dir'],'/',conf['output_filename'] + ".csv") + print('Saving catalog to',conf['output_dir'],'/',conf['output_filename'] + ".csv") cat_obj.call_save(output_dir=conf['output_dir'], output_filename=conf['output_filename'] diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py new file mode 100644 index 000000000..258257a99 --- /dev/null +++ b/tools/catalog_builder/parsers.py @@ -0,0 +1,312 @@ +import xarray as xr +import traceback +import pathlib +import os +import io +import sys +import json +import logging +import collections +import cftime +from ecgtools.builder import INVALID_ASSET, TRACEBACK + + +# Define a log object for debugging +_log = logging.getLogger(__name__) + +ROOT_DIR = os.path.dirname(os.path.realpath(__file__)).split('/tools/catalog_builder')[0] + +freq_opts = ['mon', + 'day', + 'daily', + '6hr', + '3hr', + '1hr', + 'subhr', + 'annual', + 'year'] + +catalog_keys = [ + 'activity_id', + 'assoc_files', + 'institution_id', + 'member_id', + 'realm', + 'variable_id', + 'table_id', + 'source_id', + 'source_type', + 'cell_methods', + 'cell_measures', + 'experiment_id', + 'variant_label', + 'grid_label', + 'units', + 'time_range', + 'chunk_freq', + 'standard_name', + 'long_name', + 'frequency', + 'file_name', + 'path' + ] + +def strip_comments(str_: str, delimiter=None): + """ Remove comments from *str_*. Comments are taken to start with an + arbitrary *delimiter* and run to the end of the line. + """ + # would be better to use shlex, but that doesn't support multi-character + # comment delimiters like '//' + escaped_quote_placeholder = '\v' # no one uses vertical tab + + if not delimiter: + return str_ + lines = str_.splitlines() + for i in range(len(lines)): + # get rid of lines starting with delimiter + if lines[i].startswith(delimiter): + lines[i] = '' + continue + # handle delimiters midway through a line: + # If delimiter appears quoted in a string, don't want to treat it as + # a comment. So for each occurrence of delimiter, count number of + # 's to its left and only truncate when that's an even number. + # First we get rid of -escaped single "s. + replaced_line = lines[i].replace('\\\"', escaped_quote_placeholder) + line_parts = replaced_line.split(delimiter) + quote_counts = [s.count('"') for s in line_parts] + j = 1 + while sum(quote_counts[:j]) % 2 != 0: + if j >= len(quote_counts): + raise ValueError(f"Couldn't parse line {i+1} of string.") + j += 1 + replaced_line = delimiter.join(line_parts[:j]) + lines[i] = replaced_line.replace(escaped_quote_placeholder, '\\\"') + # make lookup table of correct line numbers, taking into account lines we + # dropped + line_nos = [i for i, s in enumerate(lines) if (s and not s.isspace())] + # join lines, stripping blank lines + new_str = '\n'.join([s for s in lines if (s and not s.isspace())]) + return new_str, line_nos +def parse_json(str_: str): + """Parse JSONC (JSON with ``//``-comments) string *str_* into a Python object. + Comments are discarded. Wraps standard library :py:func:`json.loads`. + + Syntax errors in the input (:py:class:`~json.JSONDecodeError`) are passed + through from the Python standard library parser. We correct the line numbers + mentioned in the errors to refer to the original file (i.e., with comments.) + """ + def _pos_from_lc(lineno, colno, str_): + # fix line number, since we stripped commented-out lines. JSONDecodeError + # computes line/col no. in error message from character position in string. + lines = str_.splitlines() + return (colno - 1) + sum((len(line) + 1) for line in lines[:lineno]) + + (strip_str, line_nos) = strip_comments(str_, delimiter='//') + try: + parsed_json = json.loads(strip_str, + object_pairs_hook=collections.OrderedDict) + except json.JSONDecodeError as exc: + # fix reported line number, since we stripped commented-out lines. + assert exc.lineno <= len(line_nos) + raise json.JSONDecodeError( + msg=exc.msg, doc=str_, + pos=_pos_from_lc(line_nos[exc.lineno-1], exc.colno, str_) + ) + except UnicodeDecodeError as exc: + raise json.JSONDecodeError( + msg=f"parse_json received UnicodeDecodeError:\n{exc}", + doc=strip_str, pos=0 + ) + + return parsed_json +def read_json(file_path: str, log=_log) -> dict: + """Reads a struct from a JSONC file at *file_path*. + """ + log.debug('Reading file %s', file_path) + try: + with io.open(file_path, 'r', encoding='utf-8') as file_: + str_ = file_.read() + except Exception as exc: + # something more serious than missing file + _log.critical("Caught exception when trying to read %s: %r", file_path, exc) + exit(1) + return parse_json(str_) + + +def parse_nc_file(file_path: pathlib.Path, catalog_info: dict) -> dict: + # call to xr.open_dataset required by ecgtools.builder.Builder + exclude_vars = ('time', 'time_bnds', 'date', 'hyam', 'hybm') + with xr.open_dataset(file_path, chunks={}, decode_times=False, engine="netcdf4") as ds: + variable_list = [var for var in ds if 'standard_name' in ds[var].attrs + or 'long_name' in ds[var].attrs and + var not in ds.coords and + var not in exclude_vars] + # append time range + if 'time' in ds.coords: + time_var = ds.coords['time'] + calendar = None + if 'calendar' in time_var.attrs: + calendar = time_var.attrs['calendar'] + if calendar == 'no_leap': + calendar = 'noleap' + start_time = cftime.num2date(time_var.values[0], time_var.attrs['units'], calendar=calendar) + end_time = cftime.num2date(time_var.values[-1], time_var.attrs['units']) + time_range = start_time.strftime("%Y%m%d:%H%M%S") + '-' + end_time.strftime("%Y%m%d:%H%M%S") + catalog_info.update({'time_range': time_range}) + + for var in variable_list: + if len(ds[var].attrs['long_name']) == 0 and len(ds[var].attrs['long_name']) == 0: + print('Asset variable does not contain a standard_name or long_name attribute') + exit(1) + for attr in catalog_keys: + if attr in ds[var].attrs: + catalog_info.update({attr: ds[var].attrs[attr]}) + if catalog_info['variable_id'] == "": + catalog_info.update({'variable_id': var}) + + return catalog_info + +def setup_catalog() -> dict: + catalog_info = dict() + for k in catalog_keys: + catalog_info[k] = "" + return catalog_info + +# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate +# required catalog fields +def parse_gfdl_am5_data(file_name: str): + catalog_info = setup_catalog() + file = pathlib.Path(file_name) # uncomment when ready to run + + num_dir_parts = len(file.parts) # file name index = num_parts 1 + # isolate file from rest of path + stem = file.stem + # split the file name into components based on + # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc + split = stem.split('.') + catalog_info.update({"realm": split[0]}) + catalog_info.update({"time_range": split[1]}) + catalog_info.update({"variable_id": split[2]}) + catalog_info.update({"chunk_freq": file.parts[num_dir_parts - 2]}) + catalog_info.update({"activity_id": "GFDL"}) + catalog_info.update({"institution_id": "GFDL"}) + file_freq = file.parts[num_dir_parts - 3] + + for f in freq_opts: + if f in file_freq: + catalog_info.update({"frequency": f}) + break + if 'daily' in file_freq: + catalog_info.update({"frequency": "day"}) + elif 'monthly' in file_freq: + catalog_info.update({"frequency": "mon"}) + + # read metadata from the appropriate fieldlist + if 'cmip' in catalog_info['realm'].lower(): + gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') + else: + gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') + + try: + gfdl_info = read_json(gfdl_fieldlist, log=_log) + except IOError: + print("Unable to open file", gfdl_fieldlist) + sys.exit(1) + + if hasattr(gfdl_info['variables'], catalog_info['variable_id']): + var_metadata = gfdl_info['variables'].get(catalog_info['variable_id']) + else: + raise KeyError(f'{catalog_info['variable_id']} not found in {gfdl_fieldlist}') + if hasattr(var_metadata, 'standard_name'): + catalog_info.update({'standard_name': var_metadata.standard_name}) + if hasattr(var_metadata, 'long_name'): + catalog_info.update({'long_name': var_metadata.long_name}) + if hasattr(var_metadata, 'units'): + catalog_info.update({'units': var_metadata.units}) + try: + # populate information from file metadata + parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + + + +# custom parser for pp data stored on GFDL archive filesystem +# assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] +def parse_gfdl_pp_ts(file_name: str): + catalog_info = setup_catalog() + # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run + # file = pathlib.Path(files[0]) # debug comment when ready to run + file = pathlib.Path(file_name) # uncomment when ready to run + num_parts = len(file.parts) # file name index = num_parts 1 + # isolate file from rest of path + stem = file.stem + # split the file name into components based on _ + split = stem.split('.') + realm = split[0] + time_range = split[1] + variable_id = split[2] + chunk_freq = file.parts[num_parts - 2] # e.g, 1yr, 5yr + + catalog_info.update({"variable_id": variable_id}) + catalog_info.update({"chunk_freq": chunk_freq}) + catalog_info.update({"realm": realm}) + catalog_info.update({"time_range": time_range}) + + file_freq = file.parts[num_parts - 3] + for f in freq_opts: + if f in file_freq: + catalog_info.update({"frequency": f}) + break + if 'daily' in file_freq: + catalog_info.update({"frequency": "day"}) + elif 'monthly' in file_freq: + catalog_info.update({"frequency": "mon"}) + try: + # populate information from file metadata + parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + +# custom parser for CESM data that uses fieldlist metadata and the DRS to populate +# required catalog fields. Bas +def parse_cesm(file_name: str): + catalog_info = setup_catalog() + catalog_info.update({"path": file_name}) + catalog_info.update({"activity_id": "CESM"}) + catalog_info.update({"institution_id": "NCAR"}) + # split the file path and name into parts + file = pathlib.Path(file_name) + stem_parts = file.stem.split('.') + # search file and path for output frequency + for p in list(file.parts) + stem_parts: + if p in freq_opts: + catalog_info.update({"frequency": p}) + break + try: + # populate information from file metadata + new_catalog = parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + # read metadata from the appropriate fieldlist + cesm_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CESM.jsonc') + try: + cesm_info = read_json(cesm_fieldlist, log=_log) + except IOError: + print("Unable to open file", cesm_fieldlist) + sys.exit(1) + + units = new_catalog.get('units') + new_catalog.update({'units': units.replace('/s',' s-1').replace('/m2', ' m-2')}) + var_metadata = cesm_info['variables'].get(new_catalog['variable_id'], None) + if var_metadata is not None: + if var_metadata.get('standard_name', None) is not None: + new_catalog.update({'standard_name': var_metadata['standard_name']}) + if var_metadata.get('realm', None) is not None : + new_catalog.update({'realm': var_metadata['realm']}) + + return new_catalog