From b55a16150ba43aaac655c3add07d4ddb51095365 Mon Sep 17 00:00:00 2001 From: mattjbr123 Date: Tue, 27 Aug 2024 18:19:39 +0100 Subject: [PATCH] Add preprocessor #3 --- scripts/convert_GEAR_beam.py | 41 +++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/scripts/convert_GEAR_beam.py b/scripts/convert_GEAR_beam.py index 2673c48..50959e2 100644 --- a/scripts/convert_GEAR_beam.py +++ b/scripts/convert_GEAR_beam.py @@ -15,6 +15,8 @@ StoreToZarr, ConsolidateDimensionCoordinates, ConsolidateMetadata, + Indexed, + T, ) startyear = 1990 @@ -44,9 +46,46 @@ def make_path(time): if prune > 0: pattern = pattern.prune(nkeep=prune) +# Add in our own custom Beam PTransform (Parallel Transform) to apply +# some preprocessing to the dataset. In this case to convert the +# 'bounds' variables to coordinate rather than data variables. + +# They are implemented as subclasses of the beam.PTransform class +class DataVarToCoordVar(beam.PTransform): + + # not sure why it needs to be a staticmethod + @staticmethod + # the preprocess function should take in and return an + # object of type Indexed[T]. These are pangeo-forge-recipes + # derived types, internal to the functioning of the + # pangeo-forge-recipes transforms. + # I think they consist of a list of 2-item tuples, + # each containing some type of 'index' and a 'chunk' of + # the dataset or a reference to it, as can be seen in + # the first line of the function below + def _datavar_to_coordvar(item: Indexed[T]) -> Indexed[T]: + index, ds = item + # do something to each ds chunk here + # and leave index untouched. + # Here we convert some of the variables in the file + # to coordinate variables so that pangeo-forge-recipes + # can process them + print(f'Preprocessing before {ds =}') + ds = ds.set_coords(['x_bnds', 'y_bnds', 'time_bnds', 'crs']) + print(f'Preprocessing after {ds =}') + return index, ds + + # this expand function is a necessary part of + # developing your own Beam PTransforms, I think + # it wraps the above preprocess function and applies + # it to the PCollection, i.e. all the 'ds' chunks in Indexed + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | beam.Map(self._datavar_to_coordvar) + recipe = ( beam.Create(pattern.items()) - | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={'preprocess':lambda ds: ds.set_coords(['x_bnds', 'y_bnds', 'time_bnds', 'crs'])}) + | OpenWithXarray(file_type=pattern.file_type) + | DataVarToCoordVar() # the preprocess | StoreToZarr( target_root=td, store_name=tn,