diff --git a/containers/sat/Containerfile b/containers/sat/Containerfile index ff76556..c974753 100644 --- a/containers/sat/Containerfile +++ b/containers/sat/Containerfile @@ -11,7 +11,7 @@ ENV GDAL_CONFIG=/venv/bin/gdal-config # Build the virtualenv FROM build-venv as install-reqs RUN /venv/bin/python -m pip install -q -U diskcache pyproj \ - pyresample xarray pyyaml ocf_blosc2 eumdac requests dask zarr + pyresample xarray pyyaml ocf_blosc2 eumdac requests dask zarr tqdm # Copy the virtualenv into a distroless image FROM gcr.io/distroless/python3-debian11 diff --git a/containers/sat/download_process_sat.py b/containers/sat/download_process_sat.py index 65b2802..24d8ac8 100644 --- a/containers/sat/download_process_sat.py +++ b/containers/sat/download_process_sat.py @@ -4,38 +4,30 @@ """ import argparse -import unittest import dataclasses import datetime as dt -import itertools -import json import logging import os import pathlib import shutil import sys import traceback -from multiprocessing import Pool, cpu_count -from typing import Literal from collections.abc import Iterator +from typing import Literal import dask.delayed -import dask.distributed import dask.diagnostics +import dask.distributed import eumdac import eumdac.cli import eumdac.product import numpy as np import pandas as pd import pyproj -import pyresample -import satpy.dataset.dataid import xarray as xr -import yaml -import zarr from ocf_blosc2 import Blosc2 - from satpy import Scene +from tqdm import tqdm if sys.stdout.isatty(): # Simple logging for terminals @@ -170,7 +162,7 @@ def get_products_iterator( start: dt.datetime, end: dt.datetime, token: eumdac.AccessToken, -) -> Iterator[eumdac.product.Product]: +) -> Iterator[eumdac.product.Product], int: """Get an iterator over the products for a given satellite in a given time range. Checks that the number of products returned matches the expected number of products. @@ -185,13 +177,13 @@ def get_products_iterator( search_results: eumdac.SearchResults = collection.search( dtstart=start, dtend=end, - sort="start,time,1", # Sort by ascending start time + sort="start,time,1", # Sort by ascending start time ) log.info( f"Found {search_results.total_results}/{expected_products_count} products " - f"for {sat_config.product_id} " + f"for {sat_config.product_id} ", ) - return search_results.__iter__() + return search_results.__iter__(), search_results.total_results def download_nat( @@ -311,17 +303,15 @@ def write_to_zarr( promote_attrs=True, ).to_zarr( store=zarr_path, - compute=False, + compute=True, consolidated=True, mode=mode, **extra_kwargs, ) - with dask.diagnostics.ProgressBar(): - write_job.compute() except Exception as e: log.error(f"Error writing dataset to zarr store {zarr_path} with mode {mode}: {e}") traceback.print_tb(e.__traceback__) - + return None #def download_scans( @@ -730,7 +720,7 @@ def _rescale(da: xr.DataArray, channels: list[Channel]) -> xr.DataArray: # if "x_geostationary_coordinates" in ds: # del ds["x_geostationary_coordinates"] # if "y_geostationary_coordinates" in ds: -# del ds["y_geostationary_coordinates"] +# del ds["y_geostationary_coordinates"] # # Need to remove these encodings to avoid chunking # del ds.time.encoding["chunks"] # del ds.time.encoding["preferred_chunks"] @@ -831,14 +821,14 @@ def run(args: argparse.Namespace) -> None: end: dt.datetime = (start + pd.DateOffset(months=1, minutes=-1)).to_pydatetime() dstype: str = "hrv" if args.hrv else "nonhrv" - product_iter = get_products_iterator( + product_iter, total = get_products_iterator( sat_config=sat_config, start=start, end=end, token=_gen_token(), ) - for product in product_iter: + for product in tqdm(product_iter, total=total): nat_filepath = download_nat( product=product, folder=folder / args.sat,