Skip to content

Commit

Permalink
First attempt at recipe for GEAR #3
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjbr123 committed Aug 16, 2024
1 parent cdeb448 commit a892352
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions scripts/convert_GEAR_beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# MJB (UKCEH) Aug-2024
# Example script for a pangeo-forge-recipe to convert
# gridded netcdf files to a zarr datastore ready for upload
# to object storage.
# See jupyter notebook for more details and explanations/comments
# Please note that this script/notebook is intended to serve as an example only,
# and be adapted for your own datasets.

import os
import apache_beam as beam
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from apache_beam.options.pipeline_options import PipelineOptions
from pangeo_forge_recipes.transforms import (
OpenWithXarray,
StoreToZarr,
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
)

startyear = 1990
endyear = 2016 # inclusive
indir = "/home/users/mattjbr/fdri/data/gear-hrly"
pre = "CEH-GEAR-1hr-v2_"
suf = ".nc"
td = "/work/scratch-pw2/mattjbr"
tn = "gear_1hrly_fulloutput_yearly_100km_chunks.zarr"
target_chunks = {"time": 365.25*24, "y": 100, "x": 100}
#nprocs = 64
prune = 12 # no. of files to process, set to 0 to use all

if not os.path.exists(td):
os.makedirs(td)

def make_path(yearmonth):
filename = pre + yearmonth + suf
return os.path.join(indir, filename)

years = list(range(startyear, endyear + 1))
months = list(range(1, 13))
ymonths = [f"{year}{month:02d}" for month in months for year in years]
time_concat_dim = ConcatDim("time", ymonths)

pattern = FilePattern(make_path, time_concat_dim)
if prune > 0:
pattern = pattern.prune(nkeep=prune)

recipe = (
beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=td,
store_name=tn,
combine_dims=pattern.combine_dim_keys,
target_chunks=target_chunks,
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
)

#beam_options = PipelineOptions(
# direct_num_workers=nprocs, direct_running_mode="multi_processing"
#)
#with beam.Pipeline(options=beam_options) as p:
# p | recipe

with beam.Pipeline() as p:
p | recipe

0 comments on commit a892352

Please sign in to comment.