Skip to content

Commit

Permalink
Change dask configs
Browse files Browse the repository at this point in the history
  • Loading branch information
ghiggi committed Dec 11, 2024
1 parent de09ad9 commit 6298811
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 48 deletions.
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l0a_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def disdrodb_run_l0a_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l0b_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def disdrodb_run_l0b_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l0c_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def disdrodb_run_l0c_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l1_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def disdrodb_run_l1_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l2e_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def disdrodb_run_l2e_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
2 changes: 2 additions & 0 deletions disdrodb/cli/disdrodb_run_l2m_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def disdrodb_run_l2m_station(
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
Expand Down
12 changes: 9 additions & 3 deletions disdrodb/l0/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,20 @@ def read_l0a_dataframe(
if isinstance(filepaths, str):
filepaths = [filepaths]
# ---------------------------------------------------
# - If debugging_mode=True, it reads only the first 3 filepaths
# If debugging_mode=True, it reads only the first 3 filepaths
if debugging_mode:
filepaths = filepaths[0:3] # select first 3 filepaths

# - Define the list of dataframe
# ---------------------------------------------------
# Define the list of dataframe
list_df = [_read_l0a(filepath, verbose=verbose, debugging_mode=debugging_mode) for filepath in filepaths]
# - Concatenate dataframe

# Concatenate dataframe
df = concatenate_dataframe(list_df, verbose=verbose)

# Ensure time is in nanoseconds
df["time"] = df["time"].astype("M8[ns]")

# ---------------------------------------------------
# Return dataframe
return df
65 changes: 43 additions & 22 deletions disdrodb/l0/l0_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@


@delayed_if_parallel
@single_threaded_if_parallel
def _generate_l0a(
filepath,
data_dir,
Expand Down Expand Up @@ -161,6 +162,8 @@ def _generate_l0a(
return logger_filepath


@delayed_if_parallel
@single_threaded_if_parallel
def _generate_l0b(
filepath,
data_dir,
Expand Down Expand Up @@ -968,27 +971,11 @@ def run_l0b_station(
# - If parallel=True, it does that in parallel using dask.bag
# Settings npartitions=len(filepaths) enable to wait prior task on a core
# finish before starting a new one.
if not parallel:
list_logs = [
_generate_l0b(
filepath=filepath,
data_dir=data_dir,
logs_dir=logs_dir,
metadata=metadata,
campaign_name=campaign_name,
station_name=station_name,
force=force,
verbose=verbose,
debugging_mode=debugging_mode,
parallel=parallel,
)
for filepath in filepaths
]

else:
bag = db.from_sequence(filepaths, npartitions=len(filepaths))
list_logs = bag.map(
_generate_l0b,
# BUG: If debugging_mode=True and parallel=True a subtle bug can currently occur when
# two processes with a subsetted L0A files want to create the same L0B files !
list_tasks = [
_generate_l0b(
filepath=filepath,
data_dir=data_dir,
logs_dir=logs_dir,
metadata=metadata,
Expand All @@ -998,7 +985,41 @@ def run_l0b_station(
verbose=verbose,
debugging_mode=debugging_mode,
parallel=parallel,
).compute()
)
for filepath in filepaths
]
list_logs = dask.compute(*list_tasks) if parallel else list_tasks
# if not parallel:
# list_logs = [
# _generate_l0b(
# filepath=filepath,
# data_dir=data_dir,
# logs_dir=logs_dir,
# metadata=metadata,
# campaign_name=campaign_name,
# station_name=station_name,
# force=force,
# verbose=verbose,
# debugging_mode=debugging_mode,
# parallel=parallel,
# )
# for filepath in filepaths
# ]

# else:
# bag = db.from_sequence(filepaths, npartitions=len(filepaths))
# list_logs = bag.map(
# _generate_l0b,
# data_dir=data_dir,
# logs_dir=logs_dir,
# metadata=metadata,
# campaign_name=campaign_name,
# station_name=station_name,
# force=force,
# verbose=verbose,
# debugging_mode=debugging_mode,
# parallel=parallel,
# ).compute()

# -----------------------------------------------------------------.
# Define L0B summary logs
Expand Down
5 changes: 5 additions & 0 deletions disdrodb/l0/l0a_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ def remove_duplicated_timesteps(df: pd.DataFrame, verbose: bool = False):
values_duplicates = values[idx_duplicates].astype("M8[s]")
# If there are duplicated timesteps
if len(values_duplicates) > 0:
# TODO: raise error if duplicated timesteps have different values !

Check notice on line 211 in disdrodb/l0/l0a_processing.py

View check run for this annotation

codefactor.io / CodeFactor

disdrodb/l0/l0a_processing.py#L211

Unresolved comment '# TODO: raise error if duplicated timesteps have different values !' (C100)

# Drop duplicated timesteps (keeping the first occurrence)
df = df.drop_duplicates(subset="time", keep="first")
# Report the values of duplicated timesteps
Expand Down Expand Up @@ -653,6 +655,9 @@ def process_raw_file(
# - Replace invalid values with np.nan
df = set_nan_invalid_values(df, sensor_name=sensor_name, verbose=verbose)

# - Sort by time
df = df.sort_values("time")

# ------------------------------------------------------.
# - Check column names agrees to DISDRODB standards
check_l0a_column_names(df, sensor_name=sensor_name)
Expand Down
3 changes: 2 additions & 1 deletion disdrodb/l0/l0b_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
log_error,
log_info,
)
from disdrodb.utils.time import ensure_sorted_by_time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -456,7 +457,7 @@ def set_geolocation_coordinates(ds, attrs):
def finalize_dataset(ds, sensor_name, attrs):
"""Finalize DISDRODB L0B Dataset."""
# Ensure sorted by time
ds = ds.sortby("time")
ds = ensure_sorted_by_time(ds)

# Set diameter and velocity bin coordinates
ds = ds.assign_coords(get_bin_coords_dict(sensor_name=sensor_name))
Expand Down
3 changes: 2 additions & 1 deletion disdrodb/l2/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import xarray as xr

Check warning on line 24 in disdrodb/l2/event.py

View check run for this annotation

Codecov / codecov/patch

disdrodb/l2/event.py#L21-L24

Added lines #L21 - L24 were not covered by tests

from disdrodb.api.info import get_start_end_time_from_filepaths
from disdrodb.utils.time import ensure_sorted_by_time

Check warning on line 27 in disdrodb/l2/event.py

View check run for this annotation

Codecov / codecov/patch

disdrodb/l2/event.py#L26-L27

Added lines #L26 - L27 were not covered by tests


@dask.delayed
Expand Down Expand Up @@ -54,7 +55,7 @@ def identify_events(filepaths, parallel):
if len(set(sample_intervals)) > 1:
raise ValueError("Sample intervals are not constant across files.")

Check warning on line 56 in disdrodb/l2/event.py

View check run for this annotation

Codecov / codecov/patch

disdrodb/l2/event.py#L55-L56

Added lines #L55 - L56 were not covered by tests
# Sort dataset by time
ds = ds.sortby("time")
ds = ensure_sorted_by_time(ds)

Check warning on line 58 in disdrodb/l2/event.py

View check run for this annotation

Codecov / codecov/patch

disdrodb/l2/event.py#L58

Added line #L58 was not covered by tests
# Select events
# TODO:
minimum_n_drops = 5
Expand Down
14 changes: 7 additions & 7 deletions disdrodb/l2/processing_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
},
# PSD models fitting options
"psd_models": {
# 'gamma': {
# 'probability_method': 'cdf',
# 'likelihood': 'multinomial',
# 'truncated_likelihood': True,
# 'optimizer': 'Nelder-Mead',
# "add_gof_metrics": True
# },
"gamma": {
"probability_method": "cdf",
"likelihood": "multinomial",
"truncated_likelihood": True,
"optimizer": "Nelder-Mead",
"add_gof_metrics": True,
},
"normalized_gamma": {
"optimizer": "Nelder-Mead",
"order": 2,
Expand Down
11 changes: 2 additions & 9 deletions disdrodb/routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@

from disdrodb.api.io import available_stations, get_required_product
from disdrodb.utils.cli import _execute_cmd
from disdrodb.utils.logger import (
# log_warning,
# log_error,
log_info,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -514,8 +509,7 @@ def run_disdrodb_l0_station(
"""
# ---------------------------------------------------------------------.
t_i = time.time()
msg = f"L0 processing of station {station_name} has started."
log_info(logger=logger, msg=msg, verbose=verbose)
print(f"L0 processing of station {station_name} has started.")

# ------------------------------------------------------------------.
# L0A processing
Expand Down Expand Up @@ -571,8 +565,7 @@ def run_disdrodb_l0_station(
# -------------------------------------------------------------------------.
# End of L0 processing for all stations
timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i))
msg = f"L0 processing of stations {station_name} completed in {timedelta_str}"
log_info(logger, msg, verbose)
print(f"L0 processing of stations {station_name} completed in {timedelta_str}")


####---------------------------------------------------------------------------.
Expand Down
17 changes: 12 additions & 5 deletions disdrodb/utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""This module contains utilities related to the processing of temporal dataset."""
import logging
from typing import Optional

import numpy as np
import pandas as pd
import xarray as xr
from xarray.core import dtypes

from disdrodb.utils.logger import log_info

logger = logging.getLogger(__name__)


def get_dataset_start_end_time(ds: xr.Dataset, time_dim="time"):
"""Retrieves dataset starting and ending time.
Expand Down Expand Up @@ -183,13 +188,15 @@ def infer_sample_interval(ds, verbose=False, robust=False):
unexpected_intervals_counts = counts[unique_deltas != sample_interval]
unexpected_intervals_fractions = fractions[unique_deltas != sample_interval]
if verbose and len(unexpected_intervals) > 0:
print("Warning: Irregular timesteps detected.")
msg = "Irregular timesteps detected."
log_info(logger=logger, msg=msg, verbose=verbose)
for interval, count, fraction in zip(
unexpected_intervals,
unexpected_intervals_counts,
unexpected_intervals_fractions,
):
print(f" Interval: {interval} seconds, Occurrence: {count}, Frequency: {fraction} %")
msg = f" Interval: {interval} seconds, Occurrence: {count}, Frequency: {fraction} %"
log_info(logger=logger, msg=msg, verbose=verbose)

# Perform checks

Check notice on line 201 in disdrodb/utils/time.py

View check run for this annotation

codefactor.io / CodeFactor

disdrodb/utils/time.py#L201

Unresolved comment '# TODO: this currently allow to catch up sensors that logs data only when rainy !' (C100)
# - Raise error if negative or zero time intervals are presents
Expand Down Expand Up @@ -256,11 +263,11 @@ def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=Tru
times = pd.to_datetime(ds["time"].values)

# Determine the start and end times
start_time = times[0].floor(f"{sample_interval}S")
end_time = times[-1].ceil(f"{sample_interval}S")
start_time = times[0].floor(f"{sample_interval}s")
end_time = times[-1].ceil(f"{sample_interval}s")

# Create the expected time grid
expected_times = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}S")
expected_times = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s")

# Convert to numpy arrays
times = times.to_numpy(dtype="M8[s]")
Expand Down

0 comments on commit 6298811

Please sign in to comment.