diff --git a/.github/workflows/test-report.yaml b/.github/workflows/test-report.yaml index 0583aec7bb5..86d341d61e7 100644 --- a/.github/workflows/test-report.yaml +++ b/.github/workflows/test-report.yaml @@ -42,7 +42,7 @@ jobs: - name: mamba list run: mamba list - - uses: actions/cache@v3 + - uses: actions/cache@v4 id: cache with: # Suffix is depending on the backend / OS. Let's be agnostic here diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 538bcd74548..ff83e41f3ea 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -21,9 +21,9 @@ dependencies: - pytest-rerunfailures - pytest-timeout - pytest-xdist - - moto + - moto<5 # Optional dependencies - - mimesis + - mimesis<13.1.0 # https://github.com/dask/dask/issues/10858 - numpy=1.23 - pandas=1.5 - flask diff --git a/continuous_integration/environment-3.11.yaml b/continuous_integration/environment-3.11.yaml index fa8c713e932..32f6d2bd82a 100644 --- a/continuous_integration/environment-3.11.yaml +++ b/continuous_integration/environment-3.11.yaml @@ -21,9 +21,9 @@ dependencies: - pytest-rerunfailures - pytest-timeout - pytest-xdist - - moto + - moto<5 # Optional dependencies - - mimesis + - mimesis<13.1.0 # https://github.com/dask/dask/issues/10858 - numpy - pandas - flask diff --git a/continuous_integration/environment-3.12.yaml b/continuous_integration/environment-3.12.yaml index 90282cae2bd..bb3a1222d40 100644 --- a/continuous_integration/environment-3.12.yaml +++ b/continuous_integration/environment-3.12.yaml @@ -21,9 +21,9 @@ dependencies: - pytest-rerunfailures - pytest-timeout - pytest-xdist - - moto + - moto<5 # Optional dependencies - - mimesis + - mimesis<13.1.0 # https://github.com/dask/dask/issues/10858 - numpy - pandas - flask diff --git a/continuous_integration/environment-3.9.yaml b/continuous_integration/environment-3.9.yaml index be35b6aab61..0d210c73039 100644 --- a/continuous_integration/environment-3.9.yaml +++ b/continuous_integration/environment-3.9.yaml @@ -21,7 +21,7 @@ dependencies: - pytest-rerunfailures - pytest-timeout - pytest-xdist - - moto + - moto<5 # Optional dependencies - mimesis<12 - numpy=1.22 diff --git a/dask/array/fft.py b/dask/array/fft.py index e4cf9742f37..acbb2f92635 100644 --- a/dask/array/fft.py +++ b/dask/array/fft.py @@ -15,7 +15,7 @@ from dask.array.core import asarray from dask.array.core import concatenate as _concatenate from dask.array.creation import arange as _arange -from dask.array.numpy_compat import _numpy_200 +from dask.array.numpy_compat import NUMPY_GE_200 from dask.utils import derived_from, skip_doctest chunk_error = ( @@ -167,7 +167,7 @@ def func(a, s=None, axes=None): if s is None: axes = tuple(range(a.ndim)) else: - if _numpy_200: + if NUMPY_GE_200: # Match deprecation in numpy warnings.warn( "DeprecationWarning: `axes` should not be `None` " diff --git a/dask/array/numpy_compat.py b/dask/array/numpy_compat.py index 05e92dd0888..5489263ab06 100644 --- a/dask/array/numpy_compat.py +++ b/dask/array/numpy_compat.py @@ -8,16 +8,18 @@ from dask.utils import derived_from _np_version = parse_version(np.__version__) -_numpy_122 = _np_version >= parse_version("1.22.0") -_numpy_123 = _np_version >= parse_version("1.23.0") -_numpy_124 = _np_version >= parse_version("1.24.0") -_numpy_125 = _np_version.release >= (1, 25, 0) -_numpy_200 = _np_version.release >= (2, 0, 0) +NUMPY_GE_122 = _np_version.release >= (1, 22) +NUMPY_GE_123 = _np_version.release >= (1, 23) +NUMPY_GE_124 = _np_version.release >= (1, 24) +NUMPY_GE_125 = _np_version.release >= (1, 25) +NUMPY_GE_200 = _np_version.release >= (2, 0) -if _numpy_200: +if NUMPY_GE_200: + from numpy.exceptions import AxisError, ComplexWarning # noqa: F401 from numpy.lib.array_utils import normalize_axis_index, normalize_axis_tuple else: + from numpy import AxisError, ComplexWarning # noqa: F401 from numpy.core.numeric import normalize_axis_index # type: ignore[attr-defined] from numpy.core.numeric import normalize_axis_tuple # type: ignore[attr-defined] @@ -183,11 +185,7 @@ def rollaxis(a, axis, start=0): # kwarg is renamed in numpy 1.22.0 def percentile(a, q, method="linear"): - if _numpy_122: + if NUMPY_GE_122: return np.percentile(a, q, method=method) else: return np.percentile(a, q, interpolation=method) - - -ComplexWarning = np.exceptions.ComplexWarning if _numpy_200 else np.ComplexWarning -AxisError = np.exceptions.AxisError if _numpy_200 else np.AxisError diff --git a/dask/array/percentile.py b/dask/array/percentile.py index e4ee436a247..a7f347446b7 100644 --- a/dask/array/percentile.py +++ b/dask/array/percentile.py @@ -9,7 +9,7 @@ from tlz import merge from dask.array.core import Array -from dask.array.numpy_compat import _numpy_122 +from dask.array.numpy_compat import NUMPY_GE_122 from dask.array.numpy_compat import percentile as np_percentile from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph @@ -126,7 +126,7 @@ def percentile(a, q, method="linear", internal_method="default", **kwargs): internal_method = method if "interpolation" in kwargs: - if _numpy_122: + if NUMPY_GE_122: warnings.warn( "In Dask 2022.1.0, the `interpolation=` argument to percentile was renamed to " "`method= ` ", diff --git a/dask/array/routines.py b/dask/array/routines.py index f089df2f8ae..8a5c0f9223c 100644 --- a/dask/array/routines.py +++ b/dask/array/routines.py @@ -29,7 +29,7 @@ ) from dask.array.creation import arange, diag, empty, indices, tri from dask.array.einsumfuncs import einsum # noqa -from dask.array.numpy_compat import _numpy_200 +from dask.array.numpy_compat import NUMPY_GE_200 from dask.array.reductions import reduction from dask.array.ufunc import multiply, sqrt from dask.array.utils import ( @@ -1801,7 +1801,7 @@ def unique(ar, return_index=False, return_inverse=False, return_counts=False): # mapping of the original values. matches = (ar[:, None] == out["values"][None, :]).astype(np.intp) inverse = (matches * out["inverse"]).sum(axis=1) - if _numpy_200: + if NUMPY_GE_200: inverse = inverse.reshape(orig_shape) result.append(inverse) if return_counts: diff --git a/dask/array/slicing.py b/dask/array/slicing.py index e2a6b34a083..20df6fe0259 100644 --- a/dask/array/slicing.py +++ b/dask/array/slicing.py @@ -559,9 +559,14 @@ def slicing_plan(chunks, index): if not is_arraylike(index): index = np.asanyarray(index) - cum_chunks = cached_cumsum(chunks) - cum_chunks = asarray_safe(cum_chunks, like=index) + cum_chunks_tup = cached_cumsum(chunks) + cum_chunks = asarray_safe(cum_chunks_tup, like=index) + if cum_chunks.dtype.kind != "f": # Don't cast NaN chunks to int + # This is important when index.dtype=uint64 (or uint32 on 32-bit hosts) to + # prevent accidental automatic casting during `index - cum_chunks` below + cum_chunks = cum_chunks.astype(index.dtype) + # this dispactches to the array library chunk_locations = np.searchsorted(cum_chunks, index, side="right") @@ -634,8 +639,6 @@ def take(outname, inname, chunks, index, itemsize, axis=0): PerformanceWarning, stacklevel=6, ) - if not is_arraylike(index): - index = np.asarray(index) # Check for chunks from the plan that would violate the user's # configured chunk size. diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index d87283b35bd..f4b190f2f83 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -51,7 +51,7 @@ stack, store, ) -from dask.array.numpy_compat import _numpy_200 +from dask.array.numpy_compat import NUMPY_GE_200 from dask.array.reshape import _not_implemented_message from dask.array.tests.test_dispatch import EncapsulateNDArray from dask.array.utils import assert_eq, same_keys @@ -888,7 +888,7 @@ def test_elemwise_on_scalars(): dy = from_array(ny, chunks=(5,)) dz = dx.sum() * dy - if _numpy_200: + if NUMPY_GE_200: assert_eq(dz, nz) else: # Dask 0-d arrays do not behave like numpy scalars for type promotion @@ -3112,25 +3112,19 @@ def test_slice_with_floats(): d[[1, 1.5]] -def test_slice_with_integer_types(): +@pytest.mark.parametrize("dtype", [np.int32, np.int64, np.uint32, np.uint64]) +def test_slice_with_integer_types(dtype): x = np.arange(10) dx = da.from_array(x, chunks=5) - inds = np.array([0, 3, 6], dtype="u8") + inds = np.array([0, 3, 6], dtype=dtype) assert_eq(dx[inds], x[inds]) - assert_eq(dx[inds.astype("u4")], x[inds.astype("u4")]) - - inds = np.array([0, 3, 6], dtype=np.int64) - assert_eq(dx[inds], x[inds]) - assert_eq(dx[inds.astype("u4")], x[inds.astype("u4")]) -def test_index_with_integer_types(): +@pytest.mark.parametrize("cls", [int, np.int32, np.int64, np.uint32, np.uint64]) +def test_index_with_integer_types(cls): x = np.arange(10) dx = da.from_array(x, chunks=5) - inds = int(3) - assert_eq(dx[inds], x[inds]) - - inds = np.int64(3) + inds = cls(3) assert_eq(dx[inds], x[inds]) diff --git a/dask/array/tests/test_fft.py b/dask/array/tests/test_fft.py index 59d9a3e1779..6b07b16c1c7 100644 --- a/dask/array/tests/test_fft.py +++ b/dask/array/tests/test_fft.py @@ -10,7 +10,7 @@ import dask.array.fft from dask.array.core import normalize_chunks from dask.array.fft import fft_wrap -from dask.array.numpy_compat import _numpy_200 +from dask.array.numpy_compat import NUMPY_GE_200 from dask.array.utils import assert_eq, same_keys all_1d_funcnames = ["fft", "ifft", "rfft", "irfft", "hfft", "ihfft"] @@ -60,7 +60,7 @@ def test_fft2n_shapes(funcname): da_fft(darr3, (12, 11), axes=(1, 0)), np_fft(nparr, (12, 11), axes=(1, 0)) ) - if _numpy_200 and funcname.endswith("fftn"): + if NUMPY_GE_200 and funcname.endswith("fftn"): ctx = pytest.warns( DeprecationWarning, match="`axes` should not be `None` if `s` is not `None`", diff --git a/dask/array/tests/test_masked.py b/dask/array/tests/test_masked.py index 851e0eab570..4f45ea9db1d 100644 --- a/dask/array/tests/test_masked.py +++ b/dask/array/tests/test_masked.py @@ -9,7 +9,7 @@ import pytest import dask.array as da -from dask.array.numpy_compat import ComplexWarning, _numpy_123 +from dask.array.numpy_compat import NUMPY_GE_123, ComplexWarning from dask.array.utils import assert_eq from dask.base import tokenize from dask.utils import typename @@ -387,7 +387,7 @@ def test_average_weights_with_masked_array(keepdims): da_avg = da.ma.average(d_a, weights=d_weights, axis=1, keepdims=keepdims) - if _numpy_123: + if NUMPY_GE_123: assert_eq(da_avg, np.ma.average(a, weights=weights, axis=1, keepdims=keepdims)) elif not keepdims: assert_eq(da_avg, np.ma.average(a, weights=weights, axis=1)) diff --git a/dask/array/tests/test_reductions.py b/dask/array/tests/test_reductions.py index 0a2c1a89490..2af27e08970 100644 --- a/dask/array/tests/test_reductions.py +++ b/dask/array/tests/test_reductions.py @@ -13,7 +13,7 @@ import dask.array as da import dask.config as config -from dask.array.numpy_compat import ComplexWarning, _numpy_122 +from dask.array.numpy_compat import NUMPY_GE_122, ComplexWarning from dask.array.utils import assert_eq, same_keys from dask.core import get_deps @@ -261,7 +261,7 @@ def test_arg_reductions(dfunc, func): assert_eq(dfunc(a, 0), func(x, 0)) assert_eq(dfunc(a, 1), func(x, 1)) assert_eq(dfunc(a, 2), func(x, 2)) - if _numpy_122: + if NUMPY_GE_122: assert_eq(dfunc(a, keepdims=True), func(x, keepdims=True)) pytest.raises(ValueError, lambda: dfunc(a, 3)) diff --git a/dask/array/tests/test_routines.py b/dask/array/tests/test_routines.py index a1d6f5528da..5a0e6f28559 100644 --- a/dask/array/tests/test_routines.py +++ b/dask/array/tests/test_routines.py @@ -14,7 +14,7 @@ np = pytest.importorskip("numpy") import dask.array as da -from dask.array.numpy_compat import AxisError, _numpy_123, _numpy_200 +from dask.array.numpy_compat import NUMPY_GE_123, NUMPY_GE_200, AxisError from dask.array.utils import assert_eq, same_keys @@ -2351,7 +2351,7 @@ def test_result_type(): # Effect of scalars depends on their value assert da.result_type(1, b) == np.int16 assert da.result_type(1.0, a) == np.float32 - if _numpy_200: + if NUMPY_GE_200: assert da.result_type(np.int64(1), b) == np.int64 assert da.result_type(np.ones((), np.int64), b) == np.int64 assert da.result_type(1e200, a) == np.float32 @@ -2594,7 +2594,7 @@ def test_average_keepdims(a): da_avg = da.average(d_a, keepdims=True) - if _numpy_123: + if NUMPY_GE_123: np_avg = np.average(a, keepdims=True) assert_eq(np_avg, da_avg) @@ -2609,7 +2609,7 @@ def test_average_weights(keepdims): da_avg = da.average(d_a, weights=d_weights, axis=1, keepdims=keepdims) - if _numpy_123: + if NUMPY_GE_123: assert_eq(da_avg, np.average(a, weights=weights, axis=1, keepdims=keepdims)) elif not keepdims: assert_eq(da_avg, np.average(a, weights=weights, axis=1)) diff --git a/dask/bytes/tests/test_s3.py b/dask/bytes/tests/test_s3.py index e0729d55711..33f0065eb80 100644 --- a/dask/bytes/tests/test_s3.py +++ b/dask/bytes/tests/test_s3.py @@ -137,7 +137,6 @@ def s3_context(bucket=test_bucket_name, files=files): @pytest.fixture() -@pytest.mark.slow def s3_with_yellow_tripdata(s3): """ Fixture with sample yellowtrip CSVs loaded into S3. diff --git a/dask/dataframe/__init__.py b/dask/dataframe/__init__.py index 17395d8816a..0206f27dd5c 100644 --- a/dask/dataframe/__init__.py +++ b/dask/dataframe/__init__.py @@ -10,7 +10,7 @@ def _dask_expr_enabled() -> bool: import dask_expr # noqa: F401 except ImportError: raise ValueError("Must install dask-expr to activate query planning.") - return use_dask_expr + return use_dask_expr if use_dask_expr is not None else False if _dask_expr_enabled(): diff --git a/dask/dataframe/_testing.py b/dask/dataframe/_testing.py index 120a91d9d6f..77c036c920c 100644 --- a/dask/dataframe/_testing.py +++ b/dask/dataframe/_testing.py @@ -6,15 +6,13 @@ PKG = os.path.dirname(os.path.dirname(__file__)) -def test_dataframe(query_planning="True") -> None: - import os - +def test_dataframe(query_planning: bool = True) -> None: import pytest cmd = PKG + "/dataframe" print(f"running: pytest {cmd}") - os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = query_planning - sys.exit(pytest.main(["-n 4"] + [cmd])) + os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = str(query_planning) + sys.exit(pytest.main(["-n 4", cmd])) __all__ = ["test_dataframe"] diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index ae629539bb2..88b33478637 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -11,7 +11,7 @@ from dask.array.dispatch import percentile_lookup from dask.array.percentile import _percentile from dask.backends import CreationDispatch, DaskBackendEntrypoint -from dask.dataframe._compat import is_any_real_numeric_dtype +from dask.dataframe._compat import PANDAS_GE_220, is_any_real_numeric_dtype from dask.dataframe.core import DataFrame, Index, Scalar, Series, _Frame from dask.dataframe.dispatch import ( categorical_dtype_dispatch, @@ -623,11 +623,11 @@ def concat_pandas( if uniform else any(isinstance(df, pd.DataFrame) for df in dfs2) ): - if uniform: + if uniform or PANDAS_GE_220: dfs3 = dfs2 cat_mask = dfs2[0].dtypes == "category" else: - # When concatenating mixed dataframes and series on axis 1, Pandas + # When concatenating mixed dataframes and series on axis 1, Pandas <2.2 # converts series to dataframes with a single column named 0, then # concatenates. dfs3 = [ @@ -647,7 +647,7 @@ def concat_pandas( **kwargs, ).any() - if cat_mask.any(): + if isinstance(cat_mask, pd.Series) and cat_mask.any(): not_cat = cat_mask[~cat_mask].index # this should be aligned, so no need to filter warning out = pd.concat( diff --git a/dask/dataframe/io/tests/test_csv.py b/dask/dataframe/io/tests/test_csv.py index 198b3a9cc1e..a0924d2acc8 100644 --- a/dask/dataframe/io/tests/test_csv.py +++ b/dask/dataframe/io/tests/test_csv.py @@ -1219,8 +1219,15 @@ def test_parse_dates_multi_column(): """ ) - warn = FutureWarning if PANDAS_GE_220 else None - with pytest.warns(warn, match="nested"): + if PANDAS_GE_220: + with pytest.warns(FutureWarning, match="nested"): + with filetext(pdmc_text) as fn: + ddf = dd.read_csv(fn, parse_dates=[["date", "time"]]) + df = pd.read_csv(fn, parse_dates=[["date", "time"]]) + + assert (df.columns == ddf.columns).all() + assert len(df) == len(ddf) + else: with filetext(pdmc_text) as fn: ddf = dd.read_csv(fn, parse_dates=[["date", "time"]]) df = pd.read_csv(fn, parse_dates=[["date", "time"]]) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 1887ff5ff03..a54f034f56c 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -17,7 +17,7 @@ import dask import dask.dataframe as dd import dask.multiprocessing -from dask.array.numpy_compat import _numpy_124 +from dask.array.numpy_compat import NUMPY_GE_124 from dask.blockwise import Blockwise, optimize_blockwise from dask.dataframe._compat import PANDAS_GE_150, PANDAS_GE_200, PANDAS_GE_202 from dask.dataframe.io.parquet.core import get_engine @@ -789,7 +789,8 @@ def test_append_create(tmpdir, engine): assert_eq(df, ddf3) -def test_append_with_partition(tmpdir, engine): +@PYARROW_MARK +def test_append_with_partition(tmpdir): tmp = str(tmpdir) df0 = pd.DataFrame( { @@ -816,18 +817,18 @@ def test_append_with_partition(tmpdir, engine): dd_df0 = dd.from_pandas(df0, npartitions=1) dd_df1 = dd.from_pandas(df1, npartitions=1) - dd.to_parquet(dd_df0, tmp, partition_on=["lon"], engine=engine) + dd.to_parquet(dd_df0, tmp, partition_on=["lon"], engine="pyarrow") dd.to_parquet( dd_df1, tmp, partition_on=["lon"], append=True, ignore_divisions=True, - engine=engine, + engine="pyarrow", ) out = dd.read_parquet( - tmp, engine=engine, index="index", calculate_divisions=True + tmp, engine="pyarrow", index="index", calculate_divisions=True ).compute() # convert categorical to plain int just to pass assert out["lon"] = out.lon.astype("int64") @@ -3308,7 +3309,7 @@ def test_pandas_timestamp_overflow_pyarrow(tmpdir): info = np.iinfo(np.dtype("int64")) # In `numpy=1.24.0` NumPy warns when an overflow is encountered when casting from float to int # https://numpy.org/doc/stable/release/1.24.0-notes.html#numpy-now-gives-floating-point-errors-in-casts - if _numpy_124: + if NUMPY_GE_124: ctx = pytest.warns(RuntimeWarning, match="invalid value encountered in cast") else: ctx = contextlib.nullcontext() diff --git a/dask/dataframe/shuffle.py b/dask/dataframe/shuffle.py index bdf98663461..66c4954ff64 100644 --- a/dask/dataframe/shuffle.py +++ b/dask/dataframe/shuffle.py @@ -152,6 +152,13 @@ def sort_values( "You passed %s" % str(by) ) + if ( + ascending is not None + and not isinstance(ascending, bool) + and not len(ascending) == len(by) + ): + raise ValueError(f"Length of {ascending=} != length of {by=}") + sort_kwargs = { "by": by, "ascending": ascending, @@ -179,22 +186,18 @@ def sort_values( repartition = False sort_by_col = df[by[0]] - - if not isinstance(ascending, bool): - # support [True] as input - if ( - isinstance(ascending, list) - and len(ascending) == 1 - and isinstance(ascending[0], bool) - ): - ascending = ascending[0] - else: - raise NotImplementedError( - f"Dask currently only supports a single boolean for ascending. You passed {str(ascending)}" - ) - + divisions_ascending = ascending + if divisions_ascending and not isinstance(divisions_ascending, bool): + divisions_ascending = divisions_ascending[0] + assert divisions_ascending is None or isinstance(divisions_ascending, bool) divisions, _, _, presorted = _calculate_divisions( - df, sort_by_col, repartition, npartitions, upsample, partition_size, ascending + df, + sort_by_col, + repartition, + npartitions, + upsample, + partition_size, + divisions_ascending, ) if len(divisions) == 2: @@ -211,7 +214,7 @@ def sort_values( by[0], divisions, shuffle_method=shuffle_method, - ascending=ascending, + ascending=divisions_ascending, na_position=na_position, duplicates=False, ) diff --git a/dask/dataframe/tests/test_accessors.py b/dask/dataframe/tests/test_accessors.py index 34c90900473..16b1033de05 100644 --- a/dask/dataframe/tests/test_accessors.py +++ b/dask/dataframe/tests/test_accessors.py @@ -95,6 +95,7 @@ def df_ddf(): return df, ddf +@pytest.mark.skipif(not PANDAS_GE_210, reason="warning is None") def test_dt_accessor(df_ddf): df, ddf = df_ddf @@ -104,19 +105,18 @@ def test_dt_accessor(df_ddf): # see https://github.com/pydata/pandas/issues/10712 assert_eq(ddf.dt_col.dt.date, df.dt_col.dt.date, check_names=False) - warning = FutureWarning if PANDAS_GE_210 else None # to_pydatetime returns a numpy array in pandas, but a Series in dask # pandas will start returning a Series with 3.0 as well - with pytest.warns(warning, match="will return a Series"): + with pytest.warns(FutureWarning, match="will return a Series"): ddf_result = ddf.dt_col.dt.to_pydatetime() - with pytest.warns(warning, match="will return a Series"): + with pytest.warns(FutureWarning, match="will return a Series"): pd_result = pd.Series( df.dt_col.dt.to_pydatetime(), index=df.index, dtype=object ) assert_eq(ddf_result, pd_result) assert set(ddf.dt_col.dt.date.dask) == set(ddf.dt_col.dt.date.dask) - with pytest.warns(warning, match="will return a Series"): + with pytest.warns(FutureWarning, match="will return a Series"): assert set(ddf.dt_col.dt.to_pydatetime().dask) == set( ddf.dt_col.dt.to_pydatetime().dask ) diff --git a/dask/dataframe/tests/test_arithmetics_reduction.py b/dask/dataframe/tests/test_arithmetics_reduction.py index a3a22b4adcb..86b5d1feb91 100644 --- a/dask/dataframe/tests/test_arithmetics_reduction.py +++ b/dask/dataframe/tests/test_arithmetics_reduction.py @@ -10,7 +10,7 @@ from pandas.api.types import is_scalar import dask.dataframe as dd -from dask.array.numpy_compat import _numpy_125 +from dask.array.numpy_compat import NUMPY_GE_125 from dask.dataframe._compat import ( PANDAS_GE_140, PANDAS_GE_150, @@ -889,7 +889,7 @@ def test_reductions_out(axis, redfunc): # explicitly when calling np.var(dask) with ctx: np_redfunc(dsk_in, axis=axis, ddof=1, out=out) - elif _numpy_125 and redfunc == "product" and out is None: + elif NUMPY_GE_125 and redfunc == "product" and out is None: with pytest.warns(DeprecationWarning, match="`product` is deprecated"): np_redfunc(dsk_in, axis=axis, out=out) else: @@ -923,7 +923,7 @@ def test_reductions_numpy_dispatch(axis, redfunc): # explicitly when calling np.var(dask) expect = np_redfunc(pdf, axis=axis, ddof=1) actual = np_redfunc(df, axis=axis, ddof=1) - elif _numpy_125 and redfunc == "product": + elif NUMPY_GE_125 and redfunc == "product": expect = np_redfunc(pdf, axis=axis) with pytest.warns(DeprecationWarning, match="`product` is deprecated"): actual = np_redfunc(df, axis=axis) @@ -1423,18 +1423,26 @@ def test_reductions_frame_dtypes_numeric_only_supported(func): getattr(ddf, func)(), ) elif PANDAS_GE_150: - with pytest.warns(warning, match="The default value of numeric_only"): + if warning is None: pd_result = getattr(df, func)() - with pytest.warns(warning, match="The default value of numeric_only"): dd_result = getattr(ddf, func)() + else: + with pytest.warns(warning, match="The default value of numeric_only"): + pd_result = getattr(df, func)() + with pytest.warns(warning, match="The default value of numeric_only"): + dd_result = getattr(ddf, func)() assert_eq(pd_result, dd_result) else: - if func in ["std", "var", "quantile"]: + if func in ["quantile"]: warning = None - with pytest.warns(warning, match="Dropping of nuisance"): + if warning is None: pd_result = getattr(df, func)() - with pytest.warns(warning, match="Dropping of nuisance"): dd_result = getattr(ddf, func)() + else: + with pytest.warns(warning, match="Dropping of nuisance"): + pd_result = getattr(df, func)() + with pytest.warns(warning, match="Dropping of nuisance"): + dd_result = getattr(ddf, func)() assert_eq(pd_result, dd_result) num_cols = ["int", "float"] diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 82fec6fd28b..808af64ebd2 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -3486,18 +3486,26 @@ def test_applymap(): df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]}) ddf = dd.from_pandas(df, npartitions=2) msg = "DataFrame.applymap has been deprecated" - warning = FutureWarning if PANDAS_GE_210 else None - with pytest.warns(warning, match=msg): + if PANDAS_GE_210: + with pytest.warns(FutureWarning, match=msg): + ddf_result = ddf.applymap(lambda x: x + 1) + with pytest.warns(FutureWarning, match=msg): + pdf_result = df.applymap(lambda x: x + 1) + assert_eq(ddf_result, pdf_result) + + with pytest.warns(FutureWarning, match=msg): + ddf_result = ddf.applymap(lambda x: (x, x)) + with pytest.warns(FutureWarning, match=msg): + pdf_result = df.applymap(lambda x: (x, x)) + assert_eq(ddf_result, pdf_result) + else: ddf_result = ddf.applymap(lambda x: x + 1) - with pytest.warns(warning, match=msg): pdf_result = df.applymap(lambda x: x + 1) - assert_eq(ddf_result, pdf_result) + assert_eq(ddf_result, pdf_result) - with pytest.warns(warning, match=msg): ddf_result = ddf.applymap(lambda x: (x, x)) - with pytest.warns(warning, match=msg): pdf_result = df.applymap(lambda x: (x, x)) - assert_eq(ddf_result, pdf_result) + assert_eq(ddf_result, pdf_result) def test_add_prefix(): diff --git a/dask/dataframe/tests/test_groupby.py b/dask/dataframe/tests/test_groupby.py index 7dee5c7fe80..5b5f62f8f8d 100644 --- a/dask/dataframe/tests/test_groupby.py +++ b/dask/dataframe/tests/test_groupby.py @@ -1081,8 +1081,6 @@ def test_groupby_normalize_by(): def test_aggregate__single_element_groups(agg_func): spec = agg_func - if DASK_EXPR_ENABLED and spec == "median": - pytest.xfail("not yet implemented") # nunique/cov is not supported in specs if spec in ("nunique", "cov", "corr"): @@ -1220,7 +1218,6 @@ def test_shuffle_aggregate_defaults(shuffle_method): assert any("shuffle" in l for l in dsk.layers) -@pytest.mark.xfail(DASK_EXPR_ENABLED, reason="median not yet supported") @pytest.mark.parametrize("spec", [{"c": "median"}, {"b": "median", "c": "max"}]) @pytest.mark.parametrize("keys", ["a", ["a", "d"]]) def test_aggregate_median(spec, keys, shuffle_method): @@ -1238,10 +1235,11 @@ def test_aggregate_median(spec, keys, shuffle_method): expected = pdf.groupby(keys).aggregate(spec) assert_eq(actual, expected) - with pytest.raises(ValueError, match="must use shuffl"): - ddf.groupby(keys).aggregate(spec, shuffle_method=False) - with pytest.raises(ValueError, match="must use shuffl"): - ddf.groupby(keys).median(shuffle_method=False) + if not DASK_EXPR_ENABLED: + with pytest.raises(ValueError, match="must use shuffl"): + ddf.groupby(keys).aggregate(spec, shuffle_method=False).compute() + with pytest.raises(ValueError, match="must use shuffl"): + ddf.groupby(keys).median(shuffle_method=False).compute() @pytest.mark.skipif(DASK_EXPR_ENABLED, reason="deprecated in pandas") @@ -2888,14 +2886,22 @@ def test_groupby_aggregate_partial_function_unexpected_args(agg): with pytest.raises( TypeError, - match="doesn't support positional arguments|'Series' object cannot be interpreted as an integer", + match=( + "doesn't support positional arguments" + "|'Series' object cannot be interpreted as an integer" + "|cannot convert the series to " + ), ): agg(ddf.groupby("a")) # SeriesGroupBy with pytest.raises( TypeError, - match="doesn't support positional arguments|'Series' object cannot be interpreted as an integer", + match=( + "doesn't support positional arguments" + "|'Series' object cannot be interpreted as an integer" + "|cannot convert the series to " + ), ): agg(ddf.groupby("a")["b"]) @@ -3244,10 +3250,8 @@ def test_groupby_sort_true_split_out(): M.sum(ddf.groupby("x", sort=True), split_out=1) M.sum(ddf.groupby("x", sort=False), split_out=2) - # Warns for sort=None - with pytest.warns(None): - ddf.groupby("x").sum(split_out=2) - ddf.groupby("x").agg("sum", split_out=2) + ddf.groupby("x").sum(split_out=2) + ddf.groupby("x").agg("sum", split_out=2) with pytest.raises(NotImplementedError): # Cannot use sort=True with split_out>1 using non-shuffle-based approach diff --git a/dask/dataframe/tests/test_shuffle.py b/dask/dataframe/tests/test_shuffle.py index d048f031ebf..b0714b33021 100644 --- a/dask/dataframe/tests/test_shuffle.py +++ b/dask/dataframe/tests/test_shuffle.py @@ -1720,8 +1720,18 @@ def test_sort_values_bool_ascending(): ddf = dd.from_pandas(df, npartitions=10) # attempt to sort with list of ascending booleans - with pytest.raises(NotImplementedError): + with pytest.raises(ValueError, match="length"): ddf.sort_values(by="a", ascending=[True, False]) + with pytest.raises(ValueError, match="length"): + ddf.sort_values(by=["a", "b"], ascending=[True]) + assert_eq( + ddf.sort_values(by="a", ascending=[True]), + df.sort_values(by="a", ascending=[True]), + ) + assert_eq( + ddf.sort_values(by=["a", "b"], ascending=[True, False]), + df.sort_values(by=["a", "b"], ascending=[True, False]), + ) @pytest.mark.parametrize("npartitions", [1, 3]) diff --git a/dask/sizeof.py b/dask/sizeof.py index 5a98d946553..9ad81b2a1c9 100644 --- a/dask/sizeof.py +++ b/dask/sizeof.py @@ -213,10 +213,7 @@ def sizeof_pandas_index(i): @sizeof.register(pd.MultiIndex) def sizeof_pandas_multiindex(i): - p = sum(sizeof(lev) for lev in i.levels) - for c in i.codes: - p += c.nbytes - return p + return sum(sizeof(l) for l in i.levels) + sum(c.nbytes for c in i.codes) @sizeof.register_lazy("scipy") diff --git a/dask/tests/test_base.py b/dask/tests/test_base.py index 962855cab2b..273917687c6 100644 --- a/dask/tests/test_base.py +++ b/dask/tests/test_base.py @@ -1,22 +1,18 @@ from __future__ import annotations import dataclasses -import datetime -import decimal import inspect import os -import pathlib import subprocess import sys import time from collections import OrderedDict from concurrent.futures import Executor -from enum import Enum, Flag, IntEnum, IntFlag from operator import add, mul -from typing import NamedTuple, Union +from typing import NamedTuple import pytest -from tlz import compose, curry, merge, partial +from tlz import merge, partial import dask import dask.bag as db @@ -26,14 +22,11 @@ collections_to_dsk, compute, compute_as_if_collection, - function_cache, get_collection_names, get_name_from_key, get_scheduler, is_dask_collection, named_schedulers, - normalize_function, - normalize_token, optimize, persist, replace_name_in_key, @@ -41,7 +34,6 @@ unpack_collections, visualize, ) -from dask.core import literal from dask.delayed import Delayed, delayed from dask.diagnostics import Profiler from dask.highlevelgraph import HighLevelGraph @@ -51,7 +43,6 @@ da = import_or_none("dask.array") dd = import_or_none("dask.dataframe") np = import_or_none("numpy") -sp = import_or_none("scipy.sparse") pd = import_or_none("pandas") # Arbitrary dask keys @@ -59,624 +50,6 @@ h2 = "h2" -def f1(a, b, c=1): - pass - - -def f2(a, b=1, c=2): - pass - - -def f3(a): - pass - - -def test_normalize_function(): - assert normalize_function(f2) - - assert normalize_function(lambda a: a) - - assert normalize_function(partial(f2, b=2)) == normalize_function(partial(f2, b=2)) - - assert normalize_function(partial(f2, b=2)) != normalize_function(partial(f2, b=3)) - - assert normalize_function(partial(f1, b=2)) != normalize_function(partial(f2, b=2)) - - assert normalize_function(compose(f2, f3)) == normalize_function(compose(f2, f3)) - - assert normalize_function(compose(f2, f3)) != normalize_function(compose(f2, f1)) - - assert normalize_function(curry(f2)) == normalize_function(curry(f2)) - assert normalize_function(curry(f2)) != normalize_function(curry(f1)) - assert normalize_function(curry(f2, b=1)) == normalize_function(curry(f2, b=1)) - assert normalize_function(curry(f2, b=1)) != normalize_function(curry(f2, b=2)) - - -def test_tokenize(): - a = (1, 2, 3) - assert isinstance(tokenize(a), (str, bytes)) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_array_consistent_on_values(): - assert tokenize(np.random.RandomState(1234).random_sample(1000)) == tokenize( - np.random.RandomState(1234).random_sample(1000) - ) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_array_supports_uneven_sizes(): - tokenize(np.random.random(7).astype(dtype="i2")) - - -@pytest.mark.skipif("not np") -def test_tokenize_discontiguous_numpy_array(): - tokenize(np.random.random(8)[::2]) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_datetime(): - tokenize(np.array(["2000-01-01T12:00:00"], dtype="M8[ns]")) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_scalar(): - assert tokenize(np.array(1.0, dtype="f8")) == tokenize(np.array(1.0, dtype="f8")) - assert tokenize( - np.array([(1, 2)], dtype=[("a", "i4"), ("b", "i8")])[0] - ) == tokenize(np.array([(1, 2)], dtype=[("a", "i4"), ("b", "i8")])[0]) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_scalar_string_rep(): - # Test tokenizing numpy scalars doesn't depend on their string representation - with np.printoptions(formatter={"all": lambda x: "foo"}): - assert tokenize(np.array(1)) != tokenize(np.array(2)) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_array_on_object_dtype(): - a = np.array(["a", "aa", "aaa"], dtype=object) - assert tokenize(a) == tokenize(a) - assert tokenize(np.array(["a", None, "aaa"], dtype=object)) == tokenize( - np.array(["a", None, "aaa"], dtype=object) - ) - assert tokenize( - np.array([(1, "a"), (1, None), (1, "aaa")], dtype=object) - ) == tokenize(np.array([(1, "a"), (1, None), (1, "aaa")], dtype=object)) - - # Trigger non-deterministic hashing for object dtype - class NoPickle: - pass - - x = np.array(["a", None, NoPickle], dtype=object) - assert tokenize(x) != tokenize(x) - - with dask.config.set({"tokenize.ensure-deterministic": True}): - with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): - tokenize(x) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_memmap_offset(tmpdir): - # Test two different memmaps into the same numpy file - fn = str(tmpdir.join("demo_data")) - - with open(fn, "wb") as f: - f.write(b"ashekwicht") - - with open(fn, "rb") as f: - mmap1 = np.memmap(f, dtype=np.uint8, mode="r", offset=0, shape=5) - mmap2 = np.memmap(f, dtype=np.uint8, mode="r", offset=5, shape=5) - - assert tokenize(mmap1) != tokenize(mmap2) - # also make sure that they tokenize correctly when taking sub-arrays - sub1 = mmap1[1:-1] - sub2 = mmap2[1:-1] - assert tokenize(sub1) != tokenize(sub2) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_memmap(): - with tmpfile(".npy") as fn: - x = np.arange(5) - np.save(fn, x) - y = tokenize(np.load(fn, mmap_mode="r")) - - with tmpfile(".npy") as fn: - x = np.arange(5) - np.save(fn, x) - z = tokenize(np.load(fn, mmap_mode="r")) - - assert y != z - - with tmpfile(".npy") as fn: - x = np.random.normal(size=(10, 10)) - np.save(fn, x) - mm = np.load(fn, mmap_mode="r") - mm2 = np.load(fn, mmap_mode="r") - a = tokenize(mm[0, :]) - b = tokenize(mm[1, :]) - c = tokenize(mm[0:3, :]) - d = tokenize(mm[:, 0]) - assert len({a, b, c, d}) == 4 - assert tokenize(mm) == tokenize(mm2) - assert tokenize(mm[1, :]) == tokenize(mm2[1, :]) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_memmap_no_filename(): - # GH 1562: - with tmpfile(".npy") as fn1, tmpfile(".npy") as fn2: - x = np.arange(5) - np.save(fn1, x) - np.save(fn2, x) - - a = np.load(fn1, mmap_mode="r") - b = a + a - assert tokenize(b) == tokenize(b) - - -@pytest.mark.skipif("not np") -def test_tokenize_numpy_ufunc_consistent(): - assert tokenize(np.sin) == "02106e2c67daf452fb480d264e0dac21" - assert tokenize(np.cos) == "c99e52e912e4379882a9a4b387957a0b" - - # Make a ufunc that isn't in the numpy namespace. Similar to - # any found in other packages. - inc = np.frompyfunc(lambda x: x + 1, 1, 1) - assert tokenize(inc) == tokenize(inc) - - -def test_tokenize_partial_func_args_kwargs_consistent(): - f = partial(f3, f2, c=f1) - res = normalize_token(f) - sol = ( - b"\x80\x04\x95\x1f\x00\x00\x00\x00\x00\x00\x00\x8c\x14dask.tests.test_base\x94\x8c\x02f3\x94\x93\x94.", - ( - b"\x80\x04\x95\x1f\x00\x00\x00\x00\x00\x00\x00\x8c\x14dask.tests.test_base\x94\x8c\x02f2\x94\x93\x94.", - ), - ( - ( - "c", - b"\x80\x04\x95\x1f\x00\x00\x00\x00\x00\x00\x00\x8c\x14dask.tests.test_base\x94\x8c\x02f1\x94\x93\x94.", - ), - ), - ) - assert res == sol - - -def test_normalize_base(): - for i in [ - 1, - 1.1, - "1", - slice(1, 2, 3), - decimal.Decimal("1.1"), - datetime.date(2021, 6, 25), - pathlib.PurePath("/this/that"), - ]: - assert normalize_token(i) is i - - -def test_tokenize_object(): - o = object() - # Defaults to non-deterministic tokenization - assert normalize_token(o) != normalize_token(o) - - with dask.config.set({"tokenize.ensure-deterministic": True}): - with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): - normalize_token(o) - - -def test_tokenize_function_cloudpickle(): - a, b = (lambda x: x, lambda x: x) - # No error by default - tokenize(a) - - try: - import dask.dataframe as dd - - if dd._dask_expr_enabled(): - pytest.xfail("dask-expr does a check and serializes if possible") - except ImportError: - pass - - with dask.config.set({"tokenize.ensure-deterministic": True}): - with pytest.raises(RuntimeError, match="may not be deterministically hashed"): - tokenize(b) - - -def test_tokenize_callable(): - def my_func(a, b, c=1): - return a + b + c - - assert tokenize(my_func) == tokenize(my_func) # Consistent token - - -@pytest.mark.skipif("not pd") -def test_tokenize_pandas(): - a = pd.DataFrame({"x": [1, 2, 3], "y": ["4", "asd", None]}, index=[1, 2, 3]) - b = pd.DataFrame({"x": [1, 2, 3], "y": ["4", "asd", None]}, index=[1, 2, 3]) - - assert tokenize(a) == tokenize(b) - b.index.name = "foo" - assert tokenize(a) != tokenize(b) - - a = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "a"]}) - b = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "a"]}) - a["z"] = a.y.astype("category") - assert tokenize(a) != tokenize(b) - b["z"] = a.y.astype("category") - assert tokenize(a) == tokenize(b) - - -@pytest.mark.skipif("not pd") -def test_tokenize_pandas_invalid_unicode(): - # see https://github.com/dask/dask/issues/2713 - df = pd.DataFrame( - {"x\ud83d": [1, 2, 3], "y\ud83d": ["4", "asd\ud83d", None]}, index=[1, 2, 3] - ) - tokenize(df) - - -@pytest.mark.skipif("not pd") -def test_tokenize_pandas_mixed_unicode_bytes(): - df = pd.DataFrame( - {"ö".encode(): [1, 2, 3], "ö": ["ö", "ö".encode(), None]}, - index=[1, 2, 3], - ) - tokenize(df) - - -@pytest.mark.skipif("not pd") -def test_tokenize_pandas_no_pickle(): - class NoPickle: - # pickling not supported because it is a local class - pass - - df = pd.DataFrame({"x": ["foo", None, NoPickle()]}) - tokenize(df) - - -@pytest.mark.skipif("not dd") -def test_tokenize_pandas_extension_array(): - arrays = [ - pd.array([1, 0, None], dtype="Int64"), - pd.array(["2000"], dtype="Period[D]"), - pd.array([1, 0, 0], dtype="Sparse[int]"), - pd.array([pd.Timestamp("2000")], dtype="datetime64[ns]"), - pd.array([pd.Timestamp("2000", tz="CET")], dtype="datetime64[ns, CET]"), - pd.array( - ["a", "b"], - dtype=pd.api.types.CategoricalDtype(["a", "b", "c"], ordered=False), - ), - ] - - arrays.extend( - [ - pd.array(["a", "b", None], dtype="string"), - pd.array([True, False, None], dtype="boolean"), - ] - ) - - for arr in arrays: - assert tokenize(arr) == tokenize(arr) - - -@pytest.mark.skipif("not pd") -def test_tokenize_na(): - assert tokenize(pd.NA) == tokenize(pd.NA) - - -@pytest.mark.skipif("not pd") -def test_tokenize_offset(): - for offset in [ - pd.offsets.Second(1), - pd.offsets.MonthBegin(2), - pd.offsets.Day(1), - pd.offsets.BQuarterEnd(2), - pd.DateOffset(years=1), - pd.DateOffset(months=7), - pd.DateOffset(days=10), - ]: - assert tokenize(offset) == tokenize(offset) - - -@pytest.mark.skipif("not pd") -def test_tokenize_pandas_index(): - idx = pd.Index(["a", "b"]) - assert tokenize(idx) == tokenize(idx) - - idx = pd.MultiIndex.from_product([["a", "b"], [0, 1]]) - assert tokenize(idx) == tokenize(idx) - - -def test_tokenize_kwargs(): - assert tokenize(5, x=1) == tokenize(5, x=1) - assert tokenize(5) != tokenize(5, x=1) - assert tokenize(5, x=1) != tokenize(5, x=2) - assert tokenize(5, x=1) != tokenize(5, y=1) - assert tokenize(5, foo="bar") != tokenize(5, {"foo": "bar"}) - - -def test_tokenize_same_repr(): - class Foo: - def __init__(self, x): - self.x = x - - def __repr__(self): - return "a foo" - - assert tokenize(Foo(1)) != tokenize(Foo(2)) - - -def test_tokenize_method(): - class Foo: - def __init__(self, x): - self.x = x - - def __dask_tokenize__(self): - return self.x - - a, b = Foo(1), Foo(2) - assert tokenize(a) == tokenize(a) - assert tokenize(a) != tokenize(b) - - for ensure in [True, False]: - with dask.config.set({"tokenize.ensure-deterministic": ensure}): - assert tokenize(a) == tokenize(a) - - # dispatch takes precedence - before = tokenize(a) - normalize_token.register(Foo, lambda self: self.x + 1) - after = tokenize(a) - assert before != after - - -@pytest.mark.skipif("not np") -def test_tokenize_sequences(): - assert tokenize([1]) != tokenize([2]) - assert tokenize([1]) != tokenize((1,)) - assert tokenize([1]) == tokenize([1]) - - x = np.arange(2000) # long enough to drop information in repr - y = np.arange(2000) - y[1000] = 0 # middle isn't printed in repr - assert tokenize([x]) != tokenize([y]) - - -def test_tokenize_dict(): - assert tokenize({"x": 1, 1: "x"}) == tokenize({"x": 1, 1: "x"}) - - -def test_tokenize_set(): - assert tokenize({1, 2, "x", (1, "x")}) == tokenize({1, 2, "x", (1, "x")}) - - -def test_tokenize_ordered_dict(): - from collections import OrderedDict - - a = OrderedDict([("a", 1), ("b", 2)]) - b = OrderedDict([("a", 1), ("b", 2)]) - c = OrderedDict([("b", 2), ("a", 1)]) - - assert tokenize(a) == tokenize(b) - assert tokenize(a) != tokenize(c) - - -def test_tokenize_timedelta(): - assert tokenize(datetime.timedelta(days=1)) == tokenize(datetime.timedelta(days=1)) - assert tokenize(datetime.timedelta(days=1)) != tokenize(datetime.timedelta(days=2)) - - -@pytest.mark.parametrize("enum_type", [Enum, IntEnum, IntFlag, Flag]) -def test_tokenize_enum(enum_type): - class Color(enum_type): - RED = 1 - BLUE = 2 - - assert tokenize(Color.RED) == tokenize(Color.RED) - assert tokenize(Color.RED) != tokenize(Color.BLUE) - - -@dataclasses.dataclass -class ADataClass: - a: int - - -@dataclasses.dataclass -class BDataClass: - a: float - - -def test_tokenize_dataclass(): - a1 = ADataClass(1) - a2 = ADataClass(2) - assert tokenize(a1) == tokenize(a1) - assert tokenize(a1) != tokenize(a2) - - # Same field names and values, but dataclass types are different - b1 = BDataClass(1) - assert tokenize(a1) != tokenize(b1) - - class SubA(ADataClass): - pass - - assert dataclasses.is_dataclass( - SubA - ), "Python regression: SubA should be considered a dataclass" - assert tokenize(SubA(1)) == tokenize(SubA(1)) - assert tokenize(SubA(1)) != tokenize(a1) - - # Same name, same values, new definition: tokenize differently - ADataClassRedefinedDifferently = dataclasses.make_dataclass( - "ADataClass", [("a", Union[int, str])] - ) - assert tokenize(a1) != tokenize(ADataClassRedefinedDifferently(1)) - - -def test_tokenize_range(): - assert tokenize(range(5, 10, 2)) == tokenize(range(5, 10, 2)) # Identical ranges - assert tokenize(range(5, 10, 2)) != tokenize(range(1, 10, 2)) # Different start - assert tokenize(range(5, 10, 2)) != tokenize(range(5, 15, 2)) # Different stop - assert tokenize(range(5, 10, 2)) != tokenize(range(5, 10, 1)) # Different step - - -@pytest.mark.skipif("not np") -def test_tokenize_object_array_with_nans(): - a = np.array(["foo", "Jos\xe9", np.nan], dtype="O") - assert tokenize(a) == tokenize(a) - - -@pytest.mark.parametrize( - "x", [1, True, "a", b"a", 1.0, 1j, 1.0j, [], (), {}, None, str, int] -) -def test_tokenize_base_types(x): - assert tokenize(x) == tokenize(x), x - - -def test_tokenize_literal(): - assert tokenize(literal(["x", 1])) == tokenize(literal(["x", 1])) - - -@pytest.mark.skipif("not np") -@pytest.mark.filterwarnings("ignore:the matrix:PendingDeprecationWarning") -def test_tokenize_numpy_matrix(): - rng = np.random.RandomState(1234) - a = np.asmatrix(rng.rand(100)) - b = a.copy() - assert tokenize(a) == tokenize(b) - - b[:10] = 1 - assert tokenize(a) != tokenize(b) - - -@pytest.mark.skipif("not sp") -@pytest.mark.parametrize("cls_name", ("dia", "bsr", "coo", "csc", "csr", "dok", "lil")) -def test_tokenize_dense_sparse_array(cls_name): - rng = np.random.RandomState(1234) - - a = sp.rand(10, 10000, random_state=rng).asformat(cls_name) - b = a.copy() - - assert tokenize(a) == tokenize(b) - - # modifying the data values - if hasattr(b, "data"): - b.data[:10] = 1 - elif cls_name == "dok": - b[3, 3] = 1 - else: - raise ValueError - - assert tokenize(a) != tokenize(b) - - # modifying the data indices - b = a.copy().asformat("coo") - b.row[:10] = np.arange(10) - b = b.asformat(cls_name) - assert tokenize(a) != tokenize(b) - - -@pytest.mark.skipif( - sys.platform == "win32" and sys.version_info[:2] == (3, 9), - reason="https://github.com/ipython/ipython/issues/12197", -) -def test_tokenize_object_with_recursion_error(): - cycle = dict(a=None) - cycle["a"] = cycle - - assert len(tokenize(cycle)) == 32 - - with dask.config.set({"tokenize.ensure-deterministic": True}): - with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): - tokenize(cycle) - - -def test_tokenize_datetime_date(): - # Same date - assert tokenize(datetime.date(2021, 6, 25)) == tokenize(datetime.date(2021, 6, 25)) - # Different year - assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2022, 6, 25)) - # Different month - assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2021, 7, 25)) - # Different day - assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2021, 6, 26)) - - -def test_tokenize_datetime_time(): - # Same time - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) == tokenize( - datetime.time(1, 2, 3, 4, datetime.timezone.utc) - ) - assert tokenize(datetime.time(1, 2, 3, 4)) == tokenize(datetime.time(1, 2, 3, 4)) - assert tokenize(datetime.time(1, 2, 3)) == tokenize(datetime.time(1, 2, 3)) - assert tokenize(datetime.time(1, 2)) == tokenize(datetime.time(1, 2)) - # Different hour - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( - datetime.time(2, 2, 3, 4, datetime.timezone.utc) - ) - # Different minute - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( - datetime.time(1, 3, 3, 4, datetime.timezone.utc) - ) - # Different second - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( - datetime.time(1, 2, 4, 4, datetime.timezone.utc) - ) - # Different micros - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( - datetime.time(1, 2, 3, 5, datetime.timezone.utc) - ) - # Different tz - assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( - datetime.time(1, 2, 3, 4) - ) - - -def test_tokenize_datetime_datetime(): - # Same datetime - required = [1, 2, 3] # year, month, day - optional = [4, 5, 6, 7, datetime.timezone.utc] - for i in range(len(optional) + 1): - args = required + optional[:i] - assert tokenize(datetime.datetime(*args)) == tokenize(datetime.datetime(*args)) - - # Different year - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(2, 2, 3, 4, 5, 6, 7, datetime.timezone.utc)) - # Different month - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 1, 3, 4, 5, 6, 7, datetime.timezone.utc)) - # Different day - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 2, 4, 5, 6, 7, datetime.timezone.utc)) - # Different hour - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 3, 3, 5, 6, 7, datetime.timezone.utc)) - # Different minute - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 3, 4, 4, 6, 7, datetime.timezone.utc)) - # Different second - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 5, 7, datetime.timezone.utc)) - # Different micros - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 6, 6, datetime.timezone.utc)) - # Different tz - assert tokenize( - datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) - ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 6, 7, None)) - - def test_is_dask_collection(): class DummyCollection: def __init__(self, dsk): @@ -730,6 +103,10 @@ def optimize(self, fuse=False): def test_unpack_collections(): + @dataclasses.dataclass + class ADataClass: + a: int + class ANamedTuple(NamedTuple): a: int # type: ignore[annotation-unchecked] @@ -1269,24 +646,6 @@ def test_visualize_order(): assert 'color="#' in text -def test_use_cloudpickle_to_tokenize_functions_in__main__(): - from textwrap import dedent - - defn = dedent( - """ - def inc(): - return x - """ - ) - - __main__ = sys.modules["__main__"] - exec(compile(defn, "", "exec"), __main__.__dict__) - f = __main__.inc - - t = normalize_token(f) - assert b"cloudpickle" in t - - def inc_to_dec(dsk, keys): dsk = dict(dsk) for key in dsk: @@ -1522,26 +881,6 @@ def test_persist_item_change_name(): db.utils.assert_eq(b, 4) -def test_normalize_function_limited_size(): - for _ in range(1000): - normalize_function(lambda x: x) - - assert 50 < len(function_cache) < 600 - - -def test_normalize_function_dataclass_field_no_repr(): - A = dataclasses.make_dataclass( - "A", - [("param", float, dataclasses.field(repr=False))], - namespace={"__dask_tokenize__": lambda self: self.param}, - ) - - a1, a2 = A(1), A(2) - - assert normalize_function(a1) == normalize_function(a1) - assert normalize_function(a1) != normalize_function(a2) - - def test_optimize_globals(): da = pytest.importorskip("dask.array") diff --git a/dask/tests/test_sizeof.py b/dask/tests/test_sizeof.py index 9b43a7ae48e..ef3d6ba37b8 100644 --- a/dask/tests/test_sizeof.py +++ b/dask/tests/test_sizeof.py @@ -78,7 +78,7 @@ def test_pandas_contiguous_dtypes(): @requires_pandas def test_pandas_multiindex(): - index = pd.MultiIndex.from_product([range(5), ["a", "b", "c", "d", "e"]]) + index = pd.MultiIndex.from_product([range(50), list("abcdefghilmnopqrstuvwxyz")]) actual_size = sys.getsizeof(index) assert 0.5 * actual_size < sizeof(index) < 2 * actual_size diff --git a/dask/tests/test_tokenize.py b/dask/tests/test_tokenize.py new file mode 100644 index 00000000000..745970d1af8 --- /dev/null +++ b/dask/tests/test_tokenize.py @@ -0,0 +1,707 @@ +from __future__ import annotations + +import dataclasses +import datetime +import decimal +import pathlib +import subprocess +import sys +import textwrap +from enum import Enum, Flag, IntEnum, IntFlag +from typing import Union + +import pytest +from tlz import compose, curry, partial + +import dask +from dask.base import function_cache, normalize_function, normalize_token, tokenize +from dask.core import literal +from dask.utils import tmpfile +from dask.utils_test import import_or_none + +dd = import_or_none("dask.dataframe") +np = import_or_none("numpy") +sp = import_or_none("scipy.sparse") +pd = import_or_none("pandas") + + +def f1(a, b, c=1): + pass + + +def f2(a, b=1, c=2): + pass + + +def f3(a): + pass + + +def test_normalize_function(): + assert normalize_function(f2) + + assert normalize_function(lambda a: a) + + assert normalize_function(partial(f2, b=2)) == normalize_function(partial(f2, b=2)) + + assert normalize_function(partial(f2, b=2)) != normalize_function(partial(f2, b=3)) + + assert normalize_function(partial(f1, b=2)) != normalize_function(partial(f2, b=2)) + + assert normalize_function(compose(f2, f3)) == normalize_function(compose(f2, f3)) + + assert normalize_function(compose(f2, f3)) != normalize_function(compose(f2, f1)) + + assert normalize_function(curry(f2)) == normalize_function(curry(f2)) + assert normalize_function(curry(f2)) != normalize_function(curry(f1)) + assert normalize_function(curry(f2, b=1)) == normalize_function(curry(f2, b=1)) + assert normalize_function(curry(f2, b=1)) != normalize_function(curry(f2, b=2)) + + +def test_tokenize(): + a = (1, 2, 3) + assert isinstance(tokenize(a), (str, bytes)) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_array_consistent_on_values(): + assert tokenize(np.random.RandomState(1234).random_sample(1000)) == tokenize( + np.random.RandomState(1234).random_sample(1000) + ) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_array_supports_uneven_sizes(): + tokenize(np.random.random(7).astype(dtype="i2")) + + +@pytest.mark.skipif("not np") +def test_tokenize_discontiguous_numpy_array(): + tokenize(np.random.random(8)[::2]) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_datetime(): + tokenize(np.array(["2000-01-01T12:00:00"], dtype="M8[ns]")) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_scalar(): + assert tokenize(np.array(1.0, dtype="f8")) == tokenize(np.array(1.0, dtype="f8")) + assert tokenize( + np.array([(1, 2)], dtype=[("a", "i4"), ("b", "i8")])[0] + ) == tokenize(np.array([(1, 2)], dtype=[("a", "i4"), ("b", "i8")])[0]) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_scalar_string_rep(): + # Test tokenizing numpy scalars doesn't depend on their string representation + with np.printoptions(formatter={"all": lambda x: "foo"}): + assert tokenize(np.array(1)) != tokenize(np.array(2)) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_array_on_object_dtype(): + a = np.array(["a", "aa", "aaa"], dtype=object) + assert tokenize(a) == tokenize(a) + assert tokenize(np.array(["a", None, "aaa"], dtype=object)) == tokenize( + np.array(["a", None, "aaa"], dtype=object) + ) + assert tokenize( + np.array([(1, "a"), (1, None), (1, "aaa")], dtype=object) + ) == tokenize(np.array([(1, "a"), (1, None), (1, "aaa")], dtype=object)) + + # Trigger non-deterministic hashing for object dtype + class NoPickle: + pass + + x = np.array(["a", None, NoPickle], dtype=object) + assert tokenize(x) != tokenize(x) + + with dask.config.set({"tokenize.ensure-deterministic": True}): + with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): + tokenize(x) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_memmap_offset(tmpdir): + # Test two different memmaps into the same numpy file + fn = str(tmpdir.join("demo_data")) + + with open(fn, "wb") as f: + f.write(b"ashekwicht") + + with open(fn, "rb") as f: + mmap1 = np.memmap(f, dtype=np.uint8, mode="r", offset=0, shape=5) + mmap2 = np.memmap(f, dtype=np.uint8, mode="r", offset=5, shape=5) + + assert tokenize(mmap1) != tokenize(mmap2) + # also make sure that they tokenize correctly when taking sub-arrays + sub1 = mmap1[1:-1] + sub2 = mmap2[1:-1] + assert tokenize(sub1) != tokenize(sub2) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_memmap(): + with tmpfile(".npy") as fn: + x = np.arange(5) + np.save(fn, x) + y = tokenize(np.load(fn, mmap_mode="r")) + + with tmpfile(".npy") as fn: + x = np.arange(5) + np.save(fn, x) + z = tokenize(np.load(fn, mmap_mode="r")) + + assert y != z + + with tmpfile(".npy") as fn: + x = np.random.normal(size=(10, 10)) + np.save(fn, x) + mm = np.load(fn, mmap_mode="r") + mm2 = np.load(fn, mmap_mode="r") + a = tokenize(mm[0, :]) + b = tokenize(mm[1, :]) + c = tokenize(mm[0:3, :]) + d = tokenize(mm[:, 0]) + assert len({a, b, c, d}) == 4 + assert tokenize(mm) == tokenize(mm2) + assert tokenize(mm[1, :]) == tokenize(mm2[1, :]) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_memmap_no_filename(): + # GH 1562: + with tmpfile(".npy") as fn1, tmpfile(".npy") as fn2: + x = np.arange(5) + np.save(fn1, x) + np.save(fn2, x) + + a = np.load(fn1, mmap_mode="r") + b = a + a + assert tokenize(b) == tokenize(b) + + +@pytest.mark.skipif("not np") +def test_tokenize_numpy_ufunc_consistent(): + assert tokenize(np.sin) == "02106e2c67daf452fb480d264e0dac21" + assert tokenize(np.cos) == "c99e52e912e4379882a9a4b387957a0b" + + # Make a ufunc that isn't in the numpy namespace. Similar to + # any found in other packages. + inc = np.frompyfunc(lambda x: x + 1, 1, 1) + assert tokenize(inc) == tokenize(inc) + + +def test_tokenize_partial_func_args_kwargs_consistent(): + f = partial(f3, f2, c=f1) + res = normalize_token(f) + sol = ( + b"\x80\x04\x95#\x00\x00\x00\x00\x00\x00\x00\x8c\x18" + b"dask.tests.test_tokenize\x94\x8c\x02f3\x94\x93\x94.", + ( + b"\x80\x04\x95#\x00\x00\x00\x00\x00\x00\x00\x8c\x18" + b"dask.tests.test_tokenize\x94\x8c\x02f2\x94\x93\x94.", + ), + ( + ( + "c", + b"\x80\x04\x95#\x00\x00\x00\x00\x00\x00\x00\x8c\x18" + b"dask.tests.test_tokenize\x94\x8c\x02f1\x94\x93\x94.", + ), + ), + ) + assert res == sol + + +def test_normalize_base(): + for i in [ + 1, + 1.1, + "1", + slice(1, 2, 3), + decimal.Decimal("1.1"), + datetime.date(2021, 6, 25), + pathlib.PurePath("/this/that"), + ]: + assert normalize_token(i) is i + + +def test_tokenize_object(): + o = object() + # Defaults to non-deterministic tokenization + assert normalize_token(o) != normalize_token(o) + + with dask.config.set({"tokenize.ensure-deterministic": True}): + with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): + normalize_token(o) + + +def test_tokenize_function_cloudpickle(): + a, b = (lambda x: x, lambda x: x) + # No error by default + tokenize(a) + + try: + import dask.dataframe as dd + + if dd._dask_expr_enabled(): + pytest.xfail("dask-expr does a check and serializes if possible") + except ImportError: + pass + + with dask.config.set({"tokenize.ensure-deterministic": True}): + with pytest.raises(RuntimeError, match="may not be deterministically hashed"): + tokenize(b) + + +def test_tokenize_callable(): + def my_func(a, b, c=1): + return a + b + c + + assert tokenize(my_func) == tokenize(my_func) # Consistent token + + +@pytest.mark.skipif("not pd") +def test_tokenize_pandas(): + a = pd.DataFrame({"x": [1, 2, 3], "y": ["4", "asd", None]}, index=[1, 2, 3]) + b = pd.DataFrame({"x": [1, 2, 3], "y": ["4", "asd", None]}, index=[1, 2, 3]) + + assert tokenize(a) == tokenize(b) + b.index.name = "foo" + assert tokenize(a) != tokenize(b) + + a = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "a"]}) + b = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "a"]}) + a["z"] = a.y.astype("category") + assert tokenize(a) != tokenize(b) + b["z"] = a.y.astype("category") + assert tokenize(a) == tokenize(b) + + +@pytest.mark.skipif("not pd") +def test_tokenize_pandas_invalid_unicode(): + # see https://github.com/dask/dask/issues/2713 + df = pd.DataFrame( + {"x\ud83d": [1, 2, 3], "y\ud83d": ["4", "asd\ud83d", None]}, index=[1, 2, 3] + ) + tokenize(df) + + +@pytest.mark.skipif("not pd") +def test_tokenize_pandas_mixed_unicode_bytes(): + df = pd.DataFrame( + {"ö".encode(): [1, 2, 3], "ö": ["ö", "ö".encode(), None]}, + index=[1, 2, 3], + ) + tokenize(df) + + +@pytest.mark.skipif("not pd") +def test_tokenize_pandas_no_pickle(): + class NoPickle: + # pickling not supported because it is a local class + pass + + df = pd.DataFrame({"x": ["foo", None, NoPickle()]}) + tokenize(df) + + +@pytest.mark.skipif("not dd") +def test_tokenize_pandas_extension_array(): + arrays = [ + pd.array([1, 0, None], dtype="Int64"), + pd.array(["2000"], dtype="Period[D]"), + pd.array([1, 0, 0], dtype="Sparse[int]"), + pd.array([pd.Timestamp("2000")], dtype="datetime64[ns]"), + pd.array([pd.Timestamp("2000", tz="CET")], dtype="datetime64[ns, CET]"), + pd.array( + ["a", "b"], + dtype=pd.api.types.CategoricalDtype(["a", "b", "c"], ordered=False), + ), + ] + + arrays.extend( + [ + pd.array(["a", "b", None], dtype="string"), + pd.array([True, False, None], dtype="boolean"), + ] + ) + + for arr in arrays: + assert tokenize(arr) == tokenize(arr) + + +@pytest.mark.skipif("not pd") +def test_tokenize_na(): + assert tokenize(pd.NA) == tokenize(pd.NA) + + +@pytest.mark.skipif("not pd") +def test_tokenize_offset(): + for offset in [ + pd.offsets.Second(1), + pd.offsets.MonthBegin(2), + pd.offsets.Day(1), + pd.offsets.BQuarterEnd(2), + pd.DateOffset(years=1), + pd.DateOffset(months=7), + pd.DateOffset(days=10), + ]: + assert tokenize(offset) == tokenize(offset) + + +@pytest.mark.skipif("not pd") +def test_tokenize_pandas_index(): + idx = pd.Index(["a", "b"]) + assert tokenize(idx) == tokenize(idx) + + idx = pd.MultiIndex.from_product([["a", "b"], [0, 1]]) + assert tokenize(idx) == tokenize(idx) + + +def test_tokenize_kwargs(): + assert tokenize(5, x=1) == tokenize(5, x=1) + assert tokenize(5) != tokenize(5, x=1) + assert tokenize(5, x=1) != tokenize(5, x=2) + assert tokenize(5, x=1) != tokenize(5, y=1) + assert tokenize(5, foo="bar") != tokenize(5, {"foo": "bar"}) + + +def test_tokenize_same_repr(): + class Foo: + def __init__(self, x): + self.x = x + + def __repr__(self): + return "a foo" + + assert tokenize(Foo(1)) != tokenize(Foo(2)) + + +def test_tokenize_method(): + class Foo: + def __init__(self, x): + self.x = x + + def __dask_tokenize__(self): + return self.x + + a, b = Foo(1), Foo(2) + assert tokenize(a) == tokenize(a) + assert tokenize(a) != tokenize(b) + + for ensure in [True, False]: + with dask.config.set({"tokenize.ensure-deterministic": ensure}): + assert tokenize(a) == tokenize(a) + + # dispatch takes precedence + before = tokenize(a) + normalize_token.register(Foo, lambda self: self.x + 1) + after = tokenize(a) + assert before != after + + +@pytest.mark.skipif("not np") +def test_tokenize_sequences(): + assert tokenize([1]) != tokenize([2]) + assert tokenize([1]) != tokenize((1,)) + assert tokenize([1]) == tokenize([1]) + + x = np.arange(2000) # long enough to drop information in repr + y = np.arange(2000) + y[1000] = 0 # middle isn't printed in repr + assert tokenize([x]) != tokenize([y]) + + +def test_tokenize_dict(): + assert tokenize({"x": 1, 1: "x"}) == tokenize({"x": 1, 1: "x"}) + + +def test_tokenize_set(): + assert tokenize({1, 2, "x", (1, "x")}) == tokenize({1, 2, "x", (1, "x")}) + + +def test_tokenize_ordered_dict(): + from collections import OrderedDict + + a = OrderedDict([("a", 1), ("b", 2)]) + b = OrderedDict([("a", 1), ("b", 2)]) + c = OrderedDict([("b", 2), ("a", 1)]) + + assert tokenize(a) == tokenize(b) + assert tokenize(a) != tokenize(c) + + +def test_tokenize_timedelta(): + assert tokenize(datetime.timedelta(days=1)) == tokenize(datetime.timedelta(days=1)) + assert tokenize(datetime.timedelta(days=1)) != tokenize(datetime.timedelta(days=2)) + + +@pytest.mark.parametrize("enum_type", [Enum, IntEnum, IntFlag, Flag]) +def test_tokenize_enum(enum_type): + class Color(enum_type): + RED = 1 + BLUE = 2 + + assert tokenize(Color.RED) == tokenize(Color.RED) + assert tokenize(Color.RED) != tokenize(Color.BLUE) + + +@dataclasses.dataclass +class ADataClass: + a: int + + +@dataclasses.dataclass +class BDataClass: + a: float + + +def test_tokenize_dataclass(): + a1 = ADataClass(1) + a2 = ADataClass(2) + assert tokenize(a1) == tokenize(a1) + assert tokenize(a1) != tokenize(a2) + + # Same field names and values, but dataclass types are different + b1 = BDataClass(1) + assert tokenize(a1) != tokenize(b1) + + class SubA(ADataClass): + pass + + assert dataclasses.is_dataclass( + SubA + ), "Python regression: SubA should be considered a dataclass" + assert tokenize(SubA(1)) == tokenize(SubA(1)) + assert tokenize(SubA(1)) != tokenize(a1) + + # Same name, same values, new definition: tokenize differently + ADataClassRedefinedDifferently = dataclasses.make_dataclass( + "ADataClass", [("a", Union[int, str])] + ) + assert tokenize(a1) != tokenize(ADataClassRedefinedDifferently(1)) + + +def test_tokenize_range(): + assert tokenize(range(5, 10, 2)) == tokenize(range(5, 10, 2)) # Identical ranges + assert tokenize(range(5, 10, 2)) != tokenize(range(1, 10, 2)) # Different start + assert tokenize(range(5, 10, 2)) != tokenize(range(5, 15, 2)) # Different stop + assert tokenize(range(5, 10, 2)) != tokenize(range(5, 10, 1)) # Different step + + +@pytest.mark.skipif("not np") +def test_tokenize_object_array_with_nans(): + a = np.array(["foo", "Jos\xe9", np.nan], dtype="O") + assert tokenize(a) == tokenize(a) + + +@pytest.mark.parametrize( + "x", [1, True, "a", b"a", 1.0, 1j, 1.0j, [], (), {}, None, str, int] +) +def test_tokenize_base_types(x): + assert tokenize(x) == tokenize(x), x + + +def test_tokenize_literal(): + assert tokenize(literal(["x", 1])) == tokenize(literal(["x", 1])) + + +@pytest.mark.skipif("not np") +@pytest.mark.filterwarnings("ignore:the matrix:PendingDeprecationWarning") +def test_tokenize_numpy_matrix(): + rng = np.random.RandomState(1234) + a = np.asmatrix(rng.rand(100)) + b = a.copy() + assert tokenize(a) == tokenize(b) + + b[:10] = 1 + assert tokenize(a) != tokenize(b) + + +@pytest.mark.skipif("not sp") +@pytest.mark.parametrize("cls_name", ("dia", "bsr", "coo", "csc", "csr", "dok", "lil")) +def test_tokenize_dense_sparse_array(cls_name): + rng = np.random.RandomState(1234) + + a = sp.rand(10, 10000, random_state=rng).asformat(cls_name) + b = a.copy() + + assert tokenize(a) == tokenize(b) + + # modifying the data values + if hasattr(b, "data"): + b.data[:10] = 1 + elif cls_name == "dok": + b[3, 3] = 1 + else: + raise ValueError + + assert tokenize(a) != tokenize(b) + + # modifying the data indices + b = a.copy().asformat("coo") + b.row[:10] = np.arange(10) + b = b.asformat(cls_name) + assert tokenize(a) != tokenize(b) + + +@pytest.mark.skipif( + sys.platform == "win32" and sys.version_info[:2] == (3, 9), + reason="https://github.com/ipython/ipython/issues/12197", +) +def test_tokenize_object_with_recursion_error(): + cycle = dict(a=None) + cycle["a"] = cycle + + assert len(tokenize(cycle)) == 32 + + with dask.config.set({"tokenize.ensure-deterministic": True}): + with pytest.raises(RuntimeError, match="cannot be deterministically hashed"): + tokenize(cycle) + + +def test_tokenize_datetime_date(): + # Same date + assert tokenize(datetime.date(2021, 6, 25)) == tokenize(datetime.date(2021, 6, 25)) + # Different year + assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2022, 6, 25)) + # Different month + assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2021, 7, 25)) + # Different day + assert tokenize(datetime.date(2021, 6, 25)) != tokenize(datetime.date(2021, 6, 26)) + + +def test_tokenize_datetime_time(): + # Same time + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) == tokenize( + datetime.time(1, 2, 3, 4, datetime.timezone.utc) + ) + assert tokenize(datetime.time(1, 2, 3, 4)) == tokenize(datetime.time(1, 2, 3, 4)) + assert tokenize(datetime.time(1, 2, 3)) == tokenize(datetime.time(1, 2, 3)) + assert tokenize(datetime.time(1, 2)) == tokenize(datetime.time(1, 2)) + # Different hour + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( + datetime.time(2, 2, 3, 4, datetime.timezone.utc) + ) + # Different minute + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( + datetime.time(1, 3, 3, 4, datetime.timezone.utc) + ) + # Different second + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( + datetime.time(1, 2, 4, 4, datetime.timezone.utc) + ) + # Different micros + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( + datetime.time(1, 2, 3, 5, datetime.timezone.utc) + ) + # Different tz + assert tokenize(datetime.time(1, 2, 3, 4, datetime.timezone.utc)) != tokenize( + datetime.time(1, 2, 3, 4) + ) + + +def test_tokenize_datetime_datetime(): + # Same datetime + required = [1, 2, 3] # year, month, day + optional = [4, 5, 6, 7, datetime.timezone.utc] + for i in range(len(optional) + 1): + args = required + optional[:i] + assert tokenize(datetime.datetime(*args)) == tokenize(datetime.datetime(*args)) + + # Different year + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(2, 2, 3, 4, 5, 6, 7, datetime.timezone.utc)) + # Different month + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 1, 3, 4, 5, 6, 7, datetime.timezone.utc)) + # Different day + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 2, 4, 5, 6, 7, datetime.timezone.utc)) + # Different hour + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 3, 3, 5, 6, 7, datetime.timezone.utc)) + # Different minute + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 3, 4, 4, 6, 7, datetime.timezone.utc)) + # Different second + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 5, 7, datetime.timezone.utc)) + # Different micros + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 6, 6, datetime.timezone.utc)) + # Different tz + assert tokenize( + datetime.datetime(1, 2, 3, 4, 5, 6, 7, datetime.timezone.utc) + ) != tokenize(datetime.datetime(1, 2, 3, 4, 5, 6, 7, None)) + + +def test_tokenize_functions_main(): + script = """ + + def inc(x): + return x + 1 + + inc2 = inc + def sum(x, y): + return x + y + + from dask.base import tokenize + assert tokenize(inc) != tokenize(sum) + # That this is an alias shouldn't matter + assert tokenize(inc) == tokenize(inc2) + + def inc(x): + return x + 1 + + assert tokenize(inc2) != tokenize(inc) + + def inc(y): + return y + 1 + + assert tokenize(inc2) != tokenize(inc) + + def inc(x): + # Foo + return x + 1 + + assert tokenize(inc2) != tokenize(inc) + + def inc(x): + y = x + return y + 1 + + assert tokenize(inc2) != tokenize(inc) + """ + proc = subprocess.run([sys.executable, "-c", textwrap.dedent(script)]) + proc.check_returncode() + + +def test_normalize_function_limited_size(): + for _ in range(1000): + normalize_function(lambda x: x) + + assert 50 < len(function_cache) < 600 + + +def test_normalize_function_dataclass_field_no_repr(): + A = dataclasses.make_dataclass( + "A", + [("param", float, dataclasses.field(repr=False))], + namespace={"__dask_tokenize__": lambda self: self.param}, + ) + + a1, a2 = A(1), A(2) + + assert normalize_function(a1) == normalize_function(a1) + assert normalize_function(a1) != normalize_function(a2) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 4aa396d3805..b99d0ec80c5 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,85 @@ Changelog ========= +.. _v2024.1.1: + +2024.1.1 +-------- + +Released on January 26, 2024 + +Highlights +^^^^^^^^^^ + +Pandas 2.2 and Scipy 1.12 support +""""""""""""""""""""""""""""""""" +This release contains compatibility updates for the latest ``pandas`` and ``scipy`` releases. + +See :pr:`10834`, :pr:`10849`, :pr:`10845`, and :pr-distributed:`8474` from `crusaderky`_ for details. + +Deprecations +"""""""""""" +- Deprecate ``convert_dtype`` in ``apply`` (:pr:`10827`) `Miles`_ +- Deprecate ``axis`` in ``DataFrame.rolling`` (:pr:`10803`) `Miles`_ +- Deprecate ``out=`` and ``dtype=`` parameter in most DataFrame methods (:pr:`10800`) `crusaderky`_ +- Deprecate ``axis`` in ``groupby`` cumulative transformers (:pr:`10796`) `Miles`_ +- Rename ``shuffle`` to ``shuffle_method`` in remaining methods (:pr:`10797`) `Miles`_ + +.. dropdown:: Additional changes + + - Add recommended deployment options to deployment docs (:pr:`10866`) `James Bourbeau`_ + - Improve ``_agg_finalize`` to confirm to output expectation (:pr:`10835`) `Hendrik Makait`_ + - Implement deterministic tokenization for hlg (:pr:`10817`) `Patrick Hoefler`_ + - Refactor: move tests for ``tokenize()`` to its own module (:pr:`10863`) `crusaderky`_ + - Update DataFrame examples section (:pr:`10856`) `James Bourbeau`_ + - Temporarily pin ``mimesis<13.1.0`` (:pr:`10860`) `James Bourbeau`_ + - Trivial cosmetic tweaks to ``_testing.py`` (:pr:`10857`) `crusaderky`_ + - Unskip and adjust tests for ``groupby``-aggregate with ``median`` using ``dask-expr`` (:pr:`10832`) `Hendrik Makait`_ + - Fix test for ``sizeof(pd.MultiIndex)`` in upstream CI (:pr:`10850`) `crusaderky`_ + - ``numpy`` 2.0: fix slicing by ``uint64`` array (:pr:`10854`) `crusaderky`_ + - Rename ``numpy`` version constants to match ``pandas`` (:pr:`10843`) `crusaderky`_ + - Bump ``actions/cache`` from 3 to 4 (:pr:`10852`) + - Update gpuCI ``RAPIDS_VER`` to ``24.04`` (:pr:`10841`) + - Fix deprecations in doctest (:pr:`10844`) `crusaderky`_ + - Changed ``dtype`` arithmetics in ``numpy`` 2.x (:pr:`10831`) `crusaderky`_ + - Adjust tests for ``median`` support in ``dask-expr`` (:pr:`10839`) `Patrick Hoefler`_ + - Adjust tests for ``median`` support in ``groupby-aggregate`` in ``dask-expr`` (:pr:`10840`) `Hendrik Makait`_ + - ``numpy`` 2.x: fix ``std()`` on ``MaskedArray`` (:pr:`10837`) `crusaderky`_ + - Fail ``dask-expr`` ci if tests fail (:pr:`10829`) `Patrick Hoefler`_ + - Activate ``query_planning`` when exporting tests (:pr:`10833`) `Patrick Hoefler`_ + - Expose dataframe tests (:pr:`10830`) `Patrick Hoefler`_ + - ``numpy`` 2: deprecations in n-dimensional ``fft`` functions (:pr:`10821`) `crusaderky`_ + - Generalize ``CreationDispatch`` for ``dask-expr`` (:pr:`10794`) `Richard (Rick) Zamora`_ + - Remove circular import when ``dask-expr`` enabled (:pr:`10824`) `Miles`_ + - Minor[CI]: ``publish-test-results`` not marked as failed (:pr:`10825`) `Miles`_ + - Fix more tests to use ``pytest.warns()`` (:pr:`10818`) `Michał Górny`_ + - ``np.unique()``: inverse is shaped in ``numpy`` 2 (:pr:`10819`) `crusaderky`_ + - Pin ``test_split_adaptive_files`` to ``pyarrow`` engine (:pr:`10820`) `Patrick Hoefler`_ + - Adjust remaining tests in ``dask/dask`` (:pr:`10813`) `Patrick Hoefler`_ + - Restrict test to Arrow only (:pr:`10814`) `Patrick Hoefler`_ + - Filter warnings from ``std`` test (:pr:`10815`) `Patrick Hoefler`_ + - Adjust mostly indexing tests (:pr:`10790`) `Patrick Hoefler`_ + - Updates to deployment docs (:pr:`10778`) `Sarah Charlotte Johnson`_ + - Unblock documentation build (:pr:`10807`) `Miles`_ + - Adjust ``test_to_datetime`` for ``dask-expr`` compatibility `Hendrik Makait`_ + - Upstream CI tweaks (:pr:`10806`) `crusaderky`_ + - Improve tests for ``to_numeric`` (:pr:`10804`) `Hendrik Makait`_ + - Fix test-report cache key indent (:pr:`10798`) `Miles`_ + - Add test-report workflow (:pr:`10783`) `Miles`_ + + - Handle matrix subclass serialization (:pr-distributed:`8480`) `Florian Jetter`_ + - Use smallest data type for partition column in P2P (:pr-distributed:`8479`) `Florian Jetter`_ + - ``pandas`` 2.2: fix ``test_dataframe_groupby_tasks`` (:pr-distributed:`8475`) `crusaderky`_ + - Bump ``actions/cache`` from 3 to 4 (:pr-distributed:`8477`) + - ``pandas`` 2.2 vs. ``pyarrow`` 14: deprecated ``DatetimeTZBlock`` (:pr-distributed:`8476`) `crusaderky`_ + - ``pandas`` 2.2.0: Deprecated frequency alias ``M`` in favor of ``ME`` (:pr-distributed:`8473`) `Hendrik Makait`_ + - Fix docs build (:pr-distributed:`8472`) `Hendrik Makait`_ + - Fix P2P-based joins with explicit ``npartitions`` (:pr-distributed:`8470`) `Hendrik Makait`_ + - Ignore ``dask-expr`` in ``test_report.py`` script (:pr-distributed:`8464`) `Miles`_ + - Nit: hardcode Python version in test report environment (:pr-distributed:`8462`) `crusaderky`_ + - Change ``test_report.py`` - skip bad artifacts in ``dask/dask`` (:pr-distributed:`8461`) `Miles`_ + - Replace all occurrences of ``sys.is_finalizing`` (:pr-distributed:`8449`) `Florian Jetter`_ + .. _v2024.1.0: 2024.1.0 diff --git a/docs/source/dataframe.rst b/docs/source/dataframe.rst index a80dc9d5965..2152375093d 100644 --- a/docs/source/dataframe.rst +++ b/docs/source/dataframe.rst @@ -174,8 +174,19 @@ You should probably stick to just using pandas if ... Examples -------- -Visit https://examples.dask.org/dataframe.html to see and run examples using -Dask DataFrame. +Dask DataFrame is used across a wide variety of applications — anywhere where working +with large tabular dataset. Here are a few large-scale examples: + +- `Parquet ETL with Dask DataFrame `_ +- `XGBoost model training with Dask DataFrame `_ +- `Visualize 1,000,000,000 points `_ + +These examples all process larger-than-memory datasets on Dask clusters deployed with +`Coiled `_, +but there are many options for managing and deploying Dask. +See our :doc:`deploying` documentation for more information on deployment options. + +You can also visit https://examples.dask.org/dataframe.html for a collection of additional examples. .. raw:: html diff --git a/docs/source/deploying-cloud.rst b/docs/source/deploying-cloud.rst index fa56966c0c4..b4a57b863d6 100644 --- a/docs/source/deploying-cloud.rst +++ b/docs/source/deploying-cloud.rst @@ -1,76 +1,131 @@ Cloud ===== -There are a variety of ways to deploy Dask on cloud providers. -Cloud providers provide managed services, +There are a variety of ways to deploy Dask on the cloud. +Cloud providers offer managed services, like VMs, Kubernetes, Yarn, or custom APIs with which Dask can connect easily. -You may want to consider the following options: -1. A managed Kubernetes service and Dask's - :doc:`Kubernetes integration `. -2. A managed Yarn service, - like `Amazon EMR `_ - or `Google Cloud DataProc `_ - and `Dask-Yarn `_. +.. grid:: 1 1 2 2 - Specific documentation for the popular Amazon EMR service can be found - `here `_. -3. Directly launching cloud resources such as VMs or containers via a cluster manager with - `Dask Cloud Provider `_. -4. A commercial Dask deployment option like `Coiled `_ to handle the creation and management of Dask clusters on a cloud computing environment (AWS and GCP). + .. grid-item:: + :columns: 7 -Cloud Deployment Example ------------------------- + Some common deployment options you may want to consider are: -Using `Dask Cloud Provider `_ to launch a cluster of -VMs on a platform like `DigitalOcean `_ can be as convenient as -launching a local cluster. + - A commercial Dask deployment option like `Coiled `_ + to handle the creation and management of Dask clusters on AWS, GCP, and Azure. + - A managed Kubernetes service and Dask's + :doc:`Kubernetes integration `. + - Directly launching cloud resources such as VMs or containers via a cluster manager with + `Dask Cloud Provider `_. + - A managed Yarn service, + like `Amazon EMR `_ + or `Google Cloud DataProc `_ + and `Dask-Yarn `_ + (specific documentation for the popular Amazon EMR service can be found + `here `_.) -.. code-block:: python - >>> import dask.config + .. grid-item:: + :columns: 5 + :child-align: center - >>> dask.config.set({"cloudprovider.digitalocean.token": "yourAPItoken"}) + .. figure:: images/cloud-provider-logos.svg - >>> from dask_cloudprovider.digitalocean import DropletCluster +Cloud Deployment Examples +------------------------- - >>> cluster = DropletCluster(n_workers=1) - Creating scheduler instance - Created droplet dask-38b817c1-scheduler - Waiting for scheduler to run - Scheduler is running - Creating worker instance - Created droplet dask-38b817c1-worker-dc95260d +.. tab-set:: -Many of the cluster managers in Dask Cloud Provider work by launching VMs with a startup script -that pulls down the :doc:`Dask Docker image ` and runs Dask components within that container. -As with all cluster managers the VM resources, Docker image, etc are all configurable. + .. tab-item:: Coiled -You can then connect a client and work with the cluster as if it were on your local machine. + `Coiled `_ + deploys managed Dask clusters on AWS, GCP, and Azure. It's free for most users and + has several features that address common + :doc:`deployment pain points ` like: -.. code-block:: python + - Easy to use API + - Automatic software synchronization + - Easy access to any cloud hardware (like GPUs) in any region + - Robust logging, cost controls, and metrics collection - >>> from dask.distributed import Client + .. code-block:: python - >>> client = Client(cluster) + >>> import coiled + >>> cluster = coiled.Cluster( + ... n_workers=100, # Size of cluster + ... region="us-west-2", # Same region as data + ... vm_type="m6i.xlarge", # Hardware of your choosing + ... ) + >>> client = cluster.get_client() + + Coiled is recommended for deploying Dask on the cloud. + Though there are non-commercial, open source options like + Dask Cloud Provider, Dask-Gateway, and Dask-Yarn that are also available + (see :ref:`cloud deployment options ` + for additional options.) + + + .. tab-item:: Dask Cloud Provider + + Using `Dask Cloud Provider `_ to launch a cluster of + VMs on a platform like `DigitalOcean `_ can be as convenient as + launching a local cluster. + + .. code-block:: python + + >>> import dask.config + >>> dask.config.set({"cloudprovider.digitalocean.token": "yourAPItoken"}) + >>> from dask_cloudprovider.digitalocean import DropletCluster + >>> cluster = DropletCluster(n_workers=1) + Creating scheduler instance + Created droplet dask-38b817c1-scheduler + Waiting for scheduler to run + Scheduler is running + Creating worker instance + Created droplet dask-38b817c1-worker-dc95260d + + Many of the cluster managers in Dask Cloud Provider work by launching VMs with a startup script + that pulls down the :doc:`Dask Docker image ` and runs Dask components within that container. + As with all cluster managers the VM resources, Docker image, etc are all configurable. + + You can then connect a client and work with the cluster as if it were on your local machine. + + .. code-block:: python + + >>> client = cluster.get_client() Data Access ----------- -You may want to install additional libraries in your Jupyter and worker images -to access the object stores of each cloud (see :doc:`how-to/connect-to-remote-data`): +In addition to deploying Dask clusters on the cloud, most cloud users will also want +to access cloud-hosted data on their respective cloud provider. + +We recommend installing additional libraries (listed below) for easy data access on your cloud provider. +See :doc:`how-to/connect-to-remote-data` for more information. + +.. tab-set:: + + .. tab-item:: AWS + + Use `s3fs `_ for accessing data on Amazon's S3. + + .. code-block:: bash + + pip install s3fs + + .. tab-item:: GCP + + Use `gcsfs `_ for accessing data on Google's GCS. + + .. code-block:: bash + + pip install gcsfs + + .. tab-item:: Azure -- `s3fs `_ for Amazon's S3 -- `gcsfs `_ for Google's GCS -- `adlfs `_ for Microsoft's ADL + Use `adlfs `_ for accessing data on Microsoft's Data Lake or Blob Storage. -Historical Libraries --------------------- + .. code-block:: bash -Dask previously maintained libraries for deploying Dask on -Amazon's EC2 and Google GKE. -Due to sporadic interest, -and churn both within the Dask library and EC2 itself, -these were not well maintained. -They have since been deprecated in favor of the -:doc:`Kubernetes ` solutions. + pip install adlfs diff --git a/docs/source/deploying.rst b/docs/source/deploying.rst index 139ffc5e980..11df5d5e763 100644 --- a/docs/source/deploying.rst +++ b/docs/source/deploying.rst @@ -7,23 +7,23 @@ Deploy Dask Clusters :columns: 12 12 5 5 Dask works well at many scales ranging from a single machine to clusters of - many machines. This section describes the many ways to deploy and run Dask, including the following: + many machines. This page describes the many ways to deploy and run Dask, including the following: - :doc:`deploying-python` + - :doc:`deploying-cloud` - :doc:`deploying-hpc` - :doc:`deploying-kubernetes` - - :doc:`deploying-cloud` .. toctree:: :maxdepth: 1 :hidden: deploying-python.rst - deploying-cli.rst - deploying-ssh.rst + deploying-cloud.rst deploying-hpc.rst deploying-kubernetes.rst - deploying-cloud.rst + deploying-cli.rst + deploying-ssh.rst deploying-extra.rst .. grid-item:: @@ -38,14 +38,14 @@ Deploy Dask Clusters Local Machine ------------- -You don't need to do any setup to run Dask. In this case, it will use threads -in your local process. +You don't need to do any setup to run Dask. Dask will use threads +on your local machine by default. .. code-block:: python import dask.dataframe as dd df = dd.read_csv(...) - df.x.sum().compute() # This uses threads in your local process by default + df.x.sum().compute() # This uses threads on your local machine Alternatively, you can set up a fully-featured Dask cluster on your local machine. This gives you access to multi-process computation and diagnostic @@ -54,7 +54,7 @@ dashboards. .. code-block:: python from dask.distributed import LocalCluster - cluster = LocalCluster() + cluster = LocalCluster() # Fully-featured local Dask cluster client = cluster.get_client() # Dask works as normal and leverages the infrastructure defined above @@ -80,65 +80,71 @@ cluster managers, and so it's easy to swap out when you're ready to scale up. The following resources explain how to set up Dask on a variety of local and distributed hardware. +.. _cloud-deployment-options: + +Cloud +----- +|Coiled|_ **is recommended for deploying Dask on the cloud.** +Though there are other options you may consider depending on your specific needs: + +- `Coiled `_: Commercial Dask deployment option, which handles the creation and management of Dask clusters on cloud computing environments (AWS, GCP, and Azure). +- `Dask Cloud Provider `_: Constructing and managing ephemeral Dask clusters on AWS, DigitalOcean, Google Cloud, Azure, and Hetzner. +- `Dask-Yarn `_: Deploy Dask on YARN clusters, such as are found in traditional Hadoop installations. + +See :doc:`deploying-cloud` for more details. + +.. _Coiled: https://coiled.io?utm_source=dask-docs&utm_medium=deploying +.. |Coiled| replace:: **Coiled** + + High Performance Computing -------------------------- +|Dask-Jobqueue|_ **is recommended for deploying Dask on HPC systems.** +Though there are other options you may consider depending on your specific needs: -- `Dask-Jobqueue `_ - Provides cluster managers for PBS, SLURM, LSF, SGE and other resource managers. -- `Dask-MPI `_ - Deploy Dask from within an existing MPI environment. -- `Dask Gateway for Jobqueue `_ - Multi-tenant, secure clusters. Once configured, users can launch clusters without direct access to the underlying HPC backend. +- `Dask-Jobqueue `_: Provides cluster managers for PBS, SLURM, LSF, SGE and other resource managers. +- `Dask-MPI `_: Deploy Dask from within an existing MPI environment. +- `Dask Gateway for Jobqueue `_: Multi-tenant, secure clusters. Once configured, users can launch clusters without direct access to the underlying HPC backend. See :doc:`deploying-hpc` for more details. +.. _Dask-Jobqueue: https://jobqueue.dask.org +.. |Dask-Jobqueue| replace:: **Dask-Jobqueue** + Kubernetes ---------- +|Dask-Kubernetes|_ **is recommended for deploying Dask on Kubernetes.** +Though there are other options you may consider depending on your specific needs: -- `Dask Kubernetes Operator `_ - For native Kubernetes integration for fast moving or ephemeral deployments. -- `Dask Gateway for Kubernetes `_ - Multi-tenant, secure clusters. Once configured, users can launch clusters without direct access to the underlying Kubernetes backend. -- `Single Cluster Helm Chart `_ - Single Dask cluster and (optionally) Jupyter on deployed with Helm. +- `Dask Kubernetes Operator `_: For native Kubernetes integration for fast moving or ephemeral deployments. +- `Dask Gateway for Kubernetes `_: Multi-tenant, secure clusters. Once configured, users can launch clusters without direct access to the underlying Kubernetes backend. +- `Single Cluster Helm Chart `_: Single Dask cluster and (optionally) Jupyter on deployed with Helm. See :doc:`deploying-kubernetes` for more details. -Cloud ------ - -- `Dask-Yarn `_ - Deploy Dask on YARN clusters, such as are found in traditional Hadoop installations. -- `Dask Cloud Provider `_ - Constructing and managing ephemeral Dask clusters on AWS, DigitalOcean, Google Cloud, Azure, and Hetzner -- `Coiled `_ - Commercial Dask deployment option, which handles the creation and management of Dask clusters on cloud computing environments (AWS and GCP). - -See :doc:`deploying-cloud` for more details. +.. _Dask-Kubernetes: https://kubernetes.dask.org/en/latest/operator.html +.. |Dask-Kubernetes| replace:: **Dask Kubernetes Operator** .. _managed-cluster-solutions: Managed Solutions ----------------- +|Coiled|_ **is recommended for deploying managed Dask clusters.** +Though there are other options you may consider depending on your specific needs: + +- `Coiled `_: Manages the creation and management of Dask clusters on cloud computing environments (AWS, GCP, and Azure). +- `Domino Data Lab `_: Lets users create Dask clusters in a hosted platform. +- `Saturn Cloud `_: Lets users create Dask clusters in a hosted platform or within their own AWS accounts. -- `Coiled `_ - Manages the creation and management of Dask clusters on cloud computing environments (AWS and GCP). -- `Domino Data Lab `_ - Lets users create Dask clusters in a hosted platform. -- `Saturn Cloud `_ - Lets users create Dask clusters in a hosted platform or within their own AWS accounts. Manual deployments (not recommended) ------------------------------------ You can set up Dask clusters by hand, or with tools like SSH. -- :doc:`Manual Setup ` - The command line interface to set up ``dask-scheduler`` and ``dask-worker`` processes. -- :doc:`deploying-ssh` - Use SSH to set up Dask across an un-managed cluster. -- :doc:`Python API (advanced) ` - Create ``Scheduler`` and ``Worker`` objects from Python as part of a distributed Tornado TCP application. +- :doc:`Manual Setup `: The command line interface to set up ``dask-scheduler`` and ``dask-worker`` processes. +- :doc:`deploying-ssh`: Use SSH to set up Dask across an un-managed cluster. +- :doc:`Python API (advanced) `: Create ``Scheduler`` and ``Worker`` objects from Python as part of a distributed Tornado TCP application. However, we don't recommend this path. Instead, we recommend that you use some common resource manager to help you manage your machines, and then deploy diff --git a/docs/source/images/cloud-provider-logos.svg b/docs/source/images/cloud-provider-logos.svg new file mode 100644 index 00000000000..25c7fdd52f5 --- /dev/null +++ b/docs/source/images/cloud-provider-logos.svg @@ -0,0 +1 @@ + diff --git a/pyproject.toml b/pyproject.toml index 2e410847f1b..82b277a6b96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,7 @@ dataframe = [ "dask[array]", "pandas >= 1.3", ] -distributed = ["distributed == 2024.1.0"] +distributed = ["distributed == 2024.1.1"] diagnostics = [ "bokeh >= 2.4.2", "jinja2 >= 2.10.3", @@ -144,7 +144,6 @@ filterwarnings = [ "ignore:Not prepending group keys:FutureWarning", "ignore:.*:dask.tests.warning_aliases.RemovedIn20Warning", "ignore:When grouping with a length-1 list-like, you will need to pass a length-1 tuple to get_group in a future version of pandas:FutureWarning", - 'ignore:DataFrameGroupBy\.apply operated on the grouping columns\. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation\. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning\.:FutureWarning', 'ignore:Passing a BlockManager to DataFrame is deprecated and will raise in a future version. Use public APIs instead:DeprecationWarning', # https://github.com/apache/arrow/issues/35081 'ignore:The previous implementation of stack is deprecated and will be removed in a future version of pandas\.:FutureWarning', "ignore:Minimal version of pyarrow will soon be increased to 14.0.1",