Skip to content

Commit

Permalink
feat(sat-etl): Add tqdm
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Dec 24, 2024
1 parent 3c7ec16 commit e534f23
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
2 changes: 1 addition & 1 deletion containers/sat/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ENV GDAL_CONFIG=/venv/bin/gdal-config
# Build the virtualenv
FROM build-venv as install-reqs
RUN /venv/bin/python -m pip install -q -U diskcache pyproj \
pyresample xarray pyyaml ocf_blosc2 eumdac requests dask zarr
pyresample xarray pyyaml ocf_blosc2 eumdac requests dask zarr tqdm

# Copy the virtualenv into a distroless image
FROM gcr.io/distroless/python3-debian11
Expand Down
34 changes: 12 additions & 22 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,30 @@
"""

import argparse
import unittest
import dataclasses
import datetime as dt
import itertools
import json
import logging
import os
import pathlib
import shutil
import sys
import traceback
from multiprocessing import Pool, cpu_count
from typing import Literal
from collections.abc import Iterator
from typing import Literal

import dask.delayed
import dask.distributed
import dask.diagnostics
import dask.distributed
import eumdac
import eumdac.cli
import eumdac.product
import numpy as np
import pandas as pd
import pyproj
import pyresample
import satpy.dataset.dataid
import xarray as xr
import yaml
import zarr
from ocf_blosc2 import Blosc2

from satpy import Scene
from tqdm import tqdm

if sys.stdout.isatty():
# Simple logging for terminals
Expand Down Expand Up @@ -170,7 +162,7 @@ def get_products_iterator(
start: dt.datetime,
end: dt.datetime,
token: eumdac.AccessToken,
) -> Iterator[eumdac.product.Product]:
) -> Iterator[eumdac.product.Product], int:
"""Get an iterator over the products for a given satellite in a given time range.
Checks that the number of products returned matches the expected number of products.
Expand All @@ -185,13 +177,13 @@ def get_products_iterator(
search_results: eumdac.SearchResults = collection.search(
dtstart=start,
dtend=end,
sort="start,time,1", # Sort by ascending start time
sort="start,time,1", # Sort by ascending start time
)
log.info(
f"Found {search_results.total_results}/{expected_products_count} products "
f"for {sat_config.product_id} "
f"for {sat_config.product_id} ",
)
return search_results.__iter__()
return search_results.__iter__(), search_results.total_results


def download_nat(
Expand Down Expand Up @@ -311,17 +303,15 @@ def write_to_zarr(
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=False,
compute=True,
consolidated=True,
mode=mode,
**extra_kwargs,
)
with dask.diagnostics.ProgressBar():
write_job.compute()
except Exception as e:
log.error(f"Error writing dataset to zarr store {zarr_path} with mode {mode}: {e}")
traceback.print_tb(e.__traceback__)

return None

#def download_scans(
Expand Down Expand Up @@ -730,7 +720,7 @@ def _rescale(da: xr.DataArray, channels: list[Channel]) -> xr.DataArray:
# if "x_geostationary_coordinates" in ds:
# del ds["x_geostationary_coordinates"]
# if "y_geostationary_coordinates" in ds:
# del ds["y_geostationary_coordinates"]
# del ds["y_geostationary_coordinates"]
# # Need to remove these encodings to avoid chunking
# del ds.time.encoding["chunks"]
# del ds.time.encoding["preferred_chunks"]
Expand Down Expand Up @@ -831,14 +821,14 @@ def run(args: argparse.Namespace) -> None:
end: dt.datetime = (start + pd.DateOffset(months=1, minutes=-1)).to_pydatetime()
dstype: str = "hrv" if args.hrv else "nonhrv"

product_iter = get_products_iterator(
product_iter, total = get_products_iterator(
sat_config=sat_config,
start=start,
end=end,
token=_gen_token(),
)

for product in product_iter:
for product in tqdm(product_iter, total=total):
nat_filepath = download_nat(
product=product,
folder=folder / args.sat,
Expand Down

0 comments on commit e534f23

Please sign in to comment.