Skip to content

Commit

Permalink
Remove flatten from dask bag
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 28, 2023
1 parent 194d61b commit 47267eb
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
branches: []
paths-ignore:
- 'README.md'
workflow_dispatch:

# Specify concurrency such that only one workflow can run at a time
# * Different workflow files are not affected
Expand Down Expand Up @@ -112,7 +113,9 @@ jobs:
runs-on: ubuntu-latest
container: quay.io/condaforge/miniforge3:latest
needs: build-venv
if: contains(github.ref, 'refs/tags/v') && github.event_name == 'push'
if: |
github.event_name == 'workflow_dispatch' ||
(contains(github.ref, 'refs/tags/v') && github.event_name == 'push')
steps:
- name: Checkout repository
Expand Down
4 changes: 0 additions & 4 deletions src/nwp_consumer/internal/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> list[pathlib.Pa
src=infoPathTuple[1],
dst=self.rawdir/infoPathTuple[0].it().strftime(internal.IT_FOLDER_FMTSTR)/(infoPathTuple[0].filename())
)) \
.flatten() \
.compute()

return storedFiles
Expand Down Expand Up @@ -132,7 +131,6 @@ def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> list[pathl
.filter(_dataQualityFilter) \
.map(lambda ds: _saveAsTempZipZarr(ds=ds)) \
.map(lambda path: self.storer.store(src=path, dst=self.zarrdir / path.name)) \
.flatten() \
.compute(num_workers=1) # AWS ECS only has 1 CPU which amounts to half a physical core

if not isinstance(storedfiles, list):
Expand Down Expand Up @@ -189,15 +187,13 @@ def CreateLatestZarr(self) -> list[pathlib.Path]:
self.storer.delete(p=self.zarrdir / 'latest.zarr.zip')
storedFiles = datasets.map(lambda ds: _saveAsTempZipZarr(ds=ds)) \
.map(lambda path: self.storer.store(src=path, dst=self.zarrdir / 'latest.zarr.zip')) \
.flatten() \
.compute()

# Save as regular zarr
if self.storer.exists(dst=self.zarrdir / 'latest.zarr'):
self.storer.delete(p=self.zarrdir / 'latest.zarr')
storedFiles += datasets.map(lambda ds: _saveAsTempRegularZarr(ds=ds)) \
.map(lambda path: self.storer.store(src=path, dst=self.zarrdir / 'latest.zarr')) \
.flatten() \
.compute()

# Delete the temporary files
Expand Down
15 changes: 10 additions & 5 deletions src/nwp_consumer/internal/service/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ def exists(self, *, dst: pathlib.Path) -> bool:
return False

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path:
if src.is_dir():
shutil.rmtree(src.as_posix(), ignore_errors=True)
else:
src.unlink(missing_ok=True)
if src.exists():
if src.is_dir():
shutil.rmtree(src.as_posix(), ignore_errors=True)
else:
src.unlink(missing_ok=True)
return dst

def listInitTimes(self, prefix: pathlib.Path) -> list[dt.datetime]:
Expand All @@ -41,7 +42,11 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \
return [pathlib.Path(f'{it:%Y%m%d%H%M}/{f}.grib') for f in INIT_TIME_FILES]

def delete(self, *, p: pathlib.Path) -> None:
pass
if p.exists():
if p.is_dir():
shutil.rmtree(p.as_posix(), ignore_errors=True)
else:
p.unlink(missing_ok=True)


class DummyFileInfo(internal.FileInfoModel):
Expand Down

0 comments on commit 47267eb

Please sign in to comment.