Skip to content

Commit

Permalink
Refactor dask cluster init
Browse files Browse the repository at this point in the history
  • Loading branch information
ghiggi committed Dec 16, 2024
1 parent 055384c commit 7874379
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 135 deletions.
8 changes: 5 additions & 3 deletions disdrodb/api/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def check_directories_inside(dir_path):
def check_base_dir(base_dir: str):
"""Raise an error if the path does not end with ``DISDRODB``."""
base_dir = str(base_dir) # convert Pathlib to string
base_dir = os.path.normpath(base_dir)
if not base_dir.endswith("DISDRODB"):
raise ValueError(f"The path {base_dir} does not end with DISDRODB. Please check the path.")
return base_dir
Expand Down Expand Up @@ -291,6 +292,7 @@ def check_issue_dir(data_source, campaign_name, base_dir=None):
def check_issue_file(data_source, campaign_name, station_name, base_dir=None):
"""Check existence of a valid issue YAML file. If does not exists, raise an error."""
from disdrodb.issue.checks import check_issue_compliance
from disdrodb.issue.writer import create_station_issue

_ = check_issue_dir(
base_dir=base_dir,
Expand All @@ -306,9 +308,9 @@ def check_issue_file(data_source, campaign_name, station_name, base_dir=None):
)
# Check existence
if not os.path.exists(issue_filepath):
msg = f"The issue YAML file of {data_source} {campaign_name} {station_name} does not exist at {issue_filepath}."
logger.error(msg)
raise ValueError(msg)
create_station_issue(
base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name
)

# Check validity
check_issue_compliance(
Expand Down
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l0a_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,15 @@ def disdrodb_run_l0a_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l0.l0_processing import run_l0a_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()
# -------------------------------------------------------------------------.

run_l0a_station(
Expand All @@ -131,4 +112,4 @@ def disdrodb_run_l0a_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l0b_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,34 +87,15 @@ def disdrodb_run_l0b_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l0.l0_processing import run_l0b_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()

# -------------------------------------------------------------------------.
run_l0b_station(
Expand All @@ -134,4 +115,4 @@ def disdrodb_run_l0b_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l0c_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,34 +90,15 @@ def disdrodb_run_l0c_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l0.l0_processing import run_l0c_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()

# -------------------------------------------------------------------------.
run_l0c_station(
Expand All @@ -138,4 +119,4 @@ def disdrodb_run_l0c_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l1_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,15 @@ def disdrodb_run_l1_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l1.routines import run_l1_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()

# -------------------------------------------------------------------------.
run_l1_station(
Expand All @@ -131,4 +112,4 @@ def disdrodb_run_l1_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l2e_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,15 @@ def disdrodb_run_l2e_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l2.routines import run_l2e_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()

# -------------------------------------------------------------------------.
run_l2e_station(
Expand All @@ -131,4 +112,4 @@ def disdrodb_run_l2e_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
25 changes: 3 additions & 22 deletions disdrodb/cli/disdrodb_run_l2m_station.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,15 @@ def disdrodb_run_l2m_station(
Format: <...>/DISDRODB
If not specified, uses path specified in the DISDRODB active configuration.
"""
import os

import dask
from dask.distributed import Client, LocalCluster

from disdrodb.l2.routines import run_l2m_station
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster

base_dir = parse_base_dir(base_dir)

# -------------------------------------------------------------------------.
# If parallel=True, set the dask environment
if parallel:
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
Client(cluster)
cluster, client = initialize_dask_cluster()

# -------------------------------------------------------------------------.
run_l2m_station(
Expand All @@ -131,4 +112,4 @@ def disdrodb_run_l2m_station(
# -------------------------------------------------------------------------.
# Close the cluster
if parallel:
cluster.close()
close_dask_cluster(cluster, client)
1 change: 1 addition & 0 deletions disdrodb/psd/fitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ def get_gamma_moment_v1(N0, mu, Lambda, moment):
Combining Reflectivity Profile and Path-integrated Attenuation.
J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2
"""
# Zhang et al 2001: N0 * gamma(mu + moment + 1) * Lambda ** (-(mu + moment + 1))
return N0 * gamma(mu + moment + 1) / Lambda ** (mu + moment + 1)


Expand Down
64 changes: 64 additions & 0 deletions disdrodb/utils/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python3

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for Dask Distributed computations."""
import os
import logging


def initialize_dask_cluster():
"""Initialize Dask Cluster."""
import dask
from dask.distributed import Client, LocalCluster

# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# dask.config.set({"distributed.admin.system-monitor.gil.enabled": False})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
client = Client(cluster)
return cluster, client


def close_dask_cluster(cluster, client):
"""Close Dask Cluster."""
logger = logging.getLogger()
# Backup current log level
original_level = logger.level
logger.setLevel(logging.CRITICAL + 1) # Set level to suppress all logs
# Close cluster
# - Avoid log 'distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.'
try:
cluster.close()
client.close()
finally:
# Restore the original log level
logger.setLevel(original_level)
return None

0 comments on commit 7874379

Please sign in to comment.