Skip to content

Commit

Permalink
Remove legacy Dask-cuDF handling (#1417)
Browse files Browse the repository at this point in the history
Removes testing/handling for "legacy" Dask cuDF (i.e. `DASK_DATAFRAME__QUERY_PLANNING=False`).

This PR also adds support for the `"explicit-comms"` config with query-planning enabled (we used to raise an error telling the user to disable query planning).

This should be merged **before** rapidsai/cudf#17558 (otherwise Dask-CUDA CI will break).
This PR is marked as "breaking", because it technically breaks the `"explicit-comms"` config with the "legacy" version of Dask cuDF (which we are about to remove in 25.02 anyway).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - James Lamb (https://github.com/jameslamb)
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #1417
  • Loading branch information
rjzamora authored Jan 7, 2025
1 parent a1d7874 commit c19a1a7
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 151 deletions.
49 changes: 2 additions & 47 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ set_exit_code() {
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda (dask-expr)"
rapids-logger "pytest dask-cuda"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -65,67 +64,40 @@ timeout 90m pytest \
tests -k "not ucxx"
popd

rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 60m pytest \
-vv \
--durations=50 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
rapids-logger "Run local benchmark"
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm-pool \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Test script exiting with latest error code: $EXITCODE"
exit ${EXITCODE}
34 changes: 9 additions & 25 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,28 @@
import dask.utils
import dask.dataframe.core
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
from .explicit_comms.dataframe.shuffle import patch_shuffle_expression
from dask.dataframe import DASK_EXPR_ENABLED
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.protocol.serialize import dask_deserialize, dask_serialize

from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
from .explicit_comms.dataframe.shuffle import (
get_rearrange_by_column_wrapper,
get_default_shuffle_method,
)

from .local_cuda_cluster import LocalCUDACluster
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
if not DASK_EXPR_ENABLED:
raise ValueError(
"Dask-CUDA no longer supports the legacy Dask DataFrame API. "
"Please set the 'dataframe.query-planning' config to `True` "
"or None, or downgrade RAPIDS to <=24.12."
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
)
# We have to replace all modules that imports Dask's `get_default_shuffle_method()`
# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack
dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method
dask.dataframe.multi.get_default_shuffle_method = get_default_shuffle_method
dask.bag.core.get_default_shuffle_method = get_default_shuffle_method


patch_shuffle_expression()
# Monkey patching Dask to make use of proxify and unproxify in compatibility mode
dask.dataframe.shuffle.shuffle_group = proxify_decorator(
dask.dataframe.shuffle.shuffle_group
Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ def parse_args():
return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ def parse_args():
args = parse_benchmark_args(
description="Parquet read benchmark",
args_list=special_args,
check_explicit_comms=False,
)
args.no_show_p2p_bandwidth = True
return args
Expand Down
20 changes: 0 additions & 20 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -52,7 +51,6 @@ def as_noop(dsk):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
Expand Down Expand Up @@ -377,24 +375,6 @@ def parse_benchmark_args(
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
79 changes: 43 additions & 36 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import asyncio
import functools
import inspect
from collections import defaultdict
from math import ceil
from operator import getitem
Expand Down Expand Up @@ -570,40 +568,49 @@ def _use_explicit_comms() -> bool:
return False


def get_rearrange_by_column_wrapper(func):
"""Returns a function wrapper that dispatch the shuffle to explicit-comms.
def patch_shuffle_expression() -> None:
"""Patch Dasks Shuffle expression.
Notice, this is monkey patched into Dask at dask_cuda import
Notice, this is monkey patched into Dask at dask_cuda
import, and it changes `Shuffle._layer` to lower into
an `ECShuffle` expression when the 'explicit-comms'
config is set to `True`.
"""
import dask_expr

class ECShuffle(dask_expr._shuffle.TaskShuffle):
"""Explicit-Comms Shuffle Expression."""

def _layer(self):
# Execute an explicit-comms shuffle
if not hasattr(self, "_ec_shuffled"):
on = self.partitioning_index
df = dask_expr._collection.new_collection(self.frame)
self._ec_shuffled = shuffle(
df,
[on] if isinstance(on, str) else on,
self.npartitions_out,
self.ignore_index,
)
graph = self._ec_shuffled.dask.copy()
shuffled_name = self._ec_shuffled._name
for i in range(self.npartitions_out):
graph[(self._name, i)] = graph[(shuffled_name, i)]
return graph

_base_lower = dask_expr._shuffle.Shuffle._lower

def _patched_lower(self):
if self.method in (None, "tasks") and _use_explicit_comms():
return ECShuffle(
self.frame,
self.partitioning_index,
self.npartitions_out,
self.ignore_index,
self.options,
self.original_partitioning_index,
)
else:
return _base_lower(self)

func_sig = inspect.signature(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
if _use_explicit_comms():
# Convert `*args, **kwargs` to a dict of `keyword -> values`
kw = func_sig.bind(*args, **kwargs)
kw.apply_defaults()
kw = kw.arguments
# Notice, we only overwrite the default and the "tasks" shuffle
# algorithm. The "disk" and "p2p" algorithm, we don't touch.
if kw["shuffle_method"] in ("tasks", None):
col = kw["col"]
if isinstance(col, str):
col = [col]
return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"])
return func(*args, **kwargs)

return wrapper


def get_default_shuffle_method() -> str:
"""Return the default shuffle algorithm used by Dask
This changes the default shuffle algorithm from "p2p" to "tasks"
when explicit comms is enabled.
"""
ret = dask.config.get("dataframe.shuffle.algorithm", None)
if ret is None and _use_explicit_comms():
return "tasks"
return dask.utils.get_default_shuffle_method()
dask_expr._shuffle.Shuffle._lower = _patched_lower
15 changes: 1 addition & 14 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@
mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")

QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False

# Skip these tests when dask-expr is active (for now)
query_planning_skip = pytest.mark.skipif(
QUERY_PLANNING_ON,
reason=(
"The 'explicit-comms' config is not supported "
"when query planning is enabled."
),
)

# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
Expand Down Expand Up @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions):
pd.testing.assert_frame_equal(got, expected)


@query_planning_skip
def test_dataframe_merge_empty_partitions():
# Notice, we use more partitions than rows
p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4))
Expand Down Expand Up @@ -250,7 +239,7 @@ def check_shuffle():
):
dask.config.refresh() # Trigger re-read of the environment variables
with pytest.raises(ValueError, match="explicit-comms-batchsize"):
ddf.shuffle(on="key", npartitions=4)
ddf.shuffle(on="key", npartitions=4).dask

if in_cluster:
with LocalCluster(
Expand All @@ -267,7 +256,6 @@ def check_shuffle():
check_shuffle()


@query_planning_skip
@pytest.mark.parametrize("in_cluster", [True, False])
def test_dask_use_explicit_comms(in_cluster):
def _timeout(process, function, timeout):
Expand Down Expand Up @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers):
assert_eq(got, expected)


@query_planning_skip
@pytest.mark.parametrize("nworkers", [1, 2, 4])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])
Expand Down
7 changes: 0 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ filterwarnings = [
"error::FutureWarning",
# remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed
"ignore:There is no current event loop:DeprecationWarning:tornado",
# This warning must be filtered until dask-expr support
# is enabled in both dask-cudf and dask-cuda.
# See: https://github.com/rapidsai/dask-cuda/issues/1311
"ignore:Dask DataFrame implementation is deprecated:DeprecationWarning",
# Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437
# When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False`
"ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning",
]

[tool.rapids-build-backend]
Expand Down

0 comments on commit c19a1a7

Please sign in to comment.