From 7f27005c034a74520f6290f2c8e0aeb8e4d39e09 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 30 Sep 2022 15:11:23 -0700 Subject: [PATCH 01/15] create ParallelPostFit class --- dask_sql/physical/rel/custom/create_model.py | 5 +- dask_sql/physical/rel/custom/wrappers.py | 448 +++++++++++++++++++ 2 files changed, 449 insertions(+), 4 deletions(-) create mode 100644 dask_sql/physical/rel/custom/wrappers.py diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..d961c6cbe 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -165,10 +165,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model = Incremental(estimator=model) if wrap_predict: - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError("Wrapping requires dask-ml to be installed.") + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit # When `wrap_predict` is set to True we train on single partition frames # because this is only useful for non dask distributed models diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py new file mode 100644 index 000000000..65ee761d2 --- /dev/null +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -0,0 +1,448 @@ +# Copyright 2017, Dask developers +# Dask-ML project - https://github.com/dask/dask-ml +"""Meta-estimators for parallelizing estimators using the scikit-learn API.""" +import logging +import warnings + +import dask.array as da +import dask.dataframe as dd +import dask.delayed +import numpy as np +import sklearn.base +import sklearn.metrics + +from dask_ml.utils import _timer + +logger = logging.getLogger(__name__) + + +class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixin): + """Meta-estimator for parallel predict and transform. + + Parameters + ---------- + estimator : Estimator + The underlying estimator that is fit. + + scoring : string or callable, optional + A single string (see :ref:`scoring_parameter`) or a callable + (see :ref:`scoring`) to evaluate the predictions on the test set. + + For evaluating multiple metrics, either give a list of (unique) + strings or a dict with names as keys and callables as values. + + NOTE that when using custom scorers, each scorer should return a + single value. Metric functions returning a list/array of values + can be wrapped into multiple scorers that return one value each. + + See :ref:`multimetric_grid_search` for an example. + + .. warning:: + + If None, the estimator's default scorer (if available) is used. + Most scikit-learn estimators will convert large Dask arrays to + a single NumPy array, which may exhaust the memory of your worker. + You probably want to always specify `scoring`. + + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + """ + + class_name = "ParallelPostFit" + + def __init__( + self, + estimator=None, + scoring=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, + ): + self.estimator = estimator + self.scoring = scoring + self.predict_meta = predict_meta + self.predict_proba_meta = predict_proba_meta + self.transform_meta = transform_meta + + def _check_array(self, X): + """Validate an array for post-fit tasks. + + Parameters + ---------- + X : Union[Array, DataFrame] + + Returns + ------- + same type as 'X' + + Notes + ----- + The following checks are applied. + + - Ensure that the array is blocked only along the samples. + """ + if isinstance(X, da.Array): + if X.ndim == 2 and X.numblocks[1] > 1: + logger.debug("auto-rechunking 'X'") + if not np.isnan(X.chunks[0]).any(): + X = X.rechunk({0: "auto", 1: -1}) + else: + X = X.rechunk({1: -1}) + return X + + @property + def _postfit_estimator(self): + # The estimator instance to use for postfit tasks like score + return self.estimator + + def fit(self, X, y=None, **kwargs): + """Fit the underlying estimator. + + Parameters + ---------- + X, y : array-like + **kwargs + Additional fit-kwargs for the underlying estimator. + + Returns + ------- + self : object + """ + logger.info("Starting fit") + with _timer("fit", _logger=logger): + result = self.estimator.fit(X, y, **kwargs) + + # Copy over learned attributes + copy_learned_attributes(result, self) + copy_learned_attributes(result, self.estimator) + return self + + def partial_fit(self, X, y=None, **kwargs): + logger.info("Starting partial_fit") + with _timer("fit", _logger=logger): + result = self.estimator.partial_fit(X, y, **kwargs) + + # Copy over learned attributes + copy_learned_attributes(result, self) + copy_learned_attributes(result, self.estimator) + return self + + def transform(self, X): + """Transform block or partition-wise for dask inputs. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``transform`` method, then + an ``AttributeError`` is raised. + + Parameters + ---------- + X : array-like + + Returns + ------- + transformed : array-like + """ + self._check_method("transform") + X = self._check_array(X) + meta = self.transform_meta + + if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _transform, self._postfit_estimator, X + ) + return X.map_blocks( + _transform, estimator=self._postfit_estimator, meta=meta + ) + elif isinstance(X, dd._Frame): + if meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + meta = dd.core.no_default + return X.map_partitions( + _transform, estimator=self._postfit_estimator, meta=meta + ) + else: + return _transform(X, estimator=self._postfit_estimator) + + def score(self, X, y, compute=True): + """Returns the score on the given data. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Input data, where n_samples is the number of samples and + n_features is the number of features. + + y : array-like, shape = [n_samples] or [n_samples, n_output], optional + Target relative to X for classification or regression; + None for unsupervised learning. + + Returns + ------- + score : float + return self.estimator.score(X, y) + """ + scoring = self.scoring + X = self._check_array(X) + y = self._check_array(y) + + if not scoring: + if type(self._postfit_estimator).score == sklearn.base.RegressorMixin.score: + scoring = "r2" + elif ( + type(self._postfit_estimator).score + == sklearn.base.ClassifierMixin.score + ): + scoring = "accuracy" + else: + scoring = self.scoring + + if scoring: + if not dask.is_dask_collection(X) and not dask.is_dask_collection(y): + scorer = sklearn.metrics.get_scorer(scoring) + else: + # TODO: implement Dask-ML's get_scorer() function + # scorer = get_scorer(scoring, compute=compute) + pass + return scorer(self, X, y) + else: + return self._postfit_estimator.score(X, y) + + def predict(self, X): + """Predict for X. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + Parameters + ---------- + X : array-like + + Returns + ------- + y : array-like + """ + self._check_method("predict") + X = self._check_array(X) + meta = self.predict_meta + + if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict, self._postfit_estimator, X + ) + + result = X.map_blocks( + _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta + ) + return result + + elif isinstance(X, dd._Frame): + if meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict, estimator=self._postfit_estimator, meta=meta + ) + else: + return _predict(X, estimator=self._postfit_estimator) + + def predict_proba(self, X): + """Probability estimates. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``predict_proba`` + method, then an ``AttributeError`` is raised. + + Parameters + ---------- + X : array or dataframe + + Returns + ------- + y : array-like + """ + X = self._check_array(X) + + self._check_method("predict_proba") + + meta = self.predict_proba_meta + + if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict_proba, self._postfit_estimator, X + ) + # XXX: multiclass + return X.map_blocks( + _predict_proba, + estimator=self._postfit_estimator, + meta=meta, + chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), + ) + elif isinstance(X, dd._Frame): + if meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict_proba, estimator=self._postfit_estimator, meta=meta + ) + else: + return _predict_proba(X, estimator=self._postfit_estimator) + + def predict_log_proba(self, X): + """Log of probability estimates. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``predict_proba`` + method, then an ``AttributeError`` is raised. + + Parameters + ---------- + X : array or dataframe + + Returns + ------- + y : array-like + """ + self._check_method("predict_log_proba") + return da.log(self.predict_proba(X)) + + def _check_method(self, method): + """Check if self.estimator has 'method'. + + Raises + ------ + AttributeError + """ + estimator = self._postfit_estimator + if not hasattr(estimator, method): + msg = "The wrapped estimator '{}' does not have a '{}' method.".format( + estimator, method + ) + raise AttributeError(msg) + return getattr(estimator, method) + + +def _first_block(dask_object): + """Extract the first block / partition from a dask object""" + if isinstance(dask_object, da.Array): + if dask_object.ndim > 1 and dask_object.numblocks[-1] != 1: + raise NotImplementedError( + "IID estimators require that the array " + "blocked only along the first axis. " + "Rechunk your array before fitting." + ) + shape = (dask_object.chunks[0][0],) + if dask_object.ndim > 1: + shape = shape + (dask_object.chunks[1][0],) + + return da.from_delayed( + dask_object.to_delayed().flatten()[0], shape, dask_object.dtype + ) + + if isinstance(dask_object, dd._Frame): + return dask_object.get_partition(0) + + else: + return dask_object + + +def _predict(part, estimator): + return estimator.predict(part) + + +def _predict_proba(part, estimator): + return estimator.predict_proba(part) + + +def _transform(part, estimator): + return estimator.transform(part) + + +def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): + """ + Returns the output metadata array + for the model function (predict, transform etc) + by running the appropriate function on dummy data + of shape (1, n_features) + + Parameters + ---------- + + model_fun: Model function + _predict, _transform etc + + estimator : Estimator + The underlying estimator that is fit. + + input_dask_ar: The input dask_array + + Returns + ------- + metadata: metadata of output dask array + + """ + # sklearn fails if input array has size size + # It requires at least 1 sample to run successfully + input_meta = input_dask_ar._meta + if hasattr(input_meta, "__array_function__"): + ar = np.zeros( + shape=(1, input_dask_ar.shape[1]), + dtype=input_dask_ar.dtype, + like=input_meta, + ) + elif "scipy.sparse" in type(input_meta).__module__: + # sparse matrices dont support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + ar = type(input_meta)((1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) + else: + func_name = model_fn.__name__.strip("_") + msg = ( + f"Metadata for {func_name} is not provided, so Dask is " + f"running the {func_name} " + "function on a small dataset to guess output metadata. " + "As a result, It is possible that Dask will guess incorrectly.\n" + "To silence this warning, provide explicit " + f"`{func_name}_meta` to the dask_ml.wrapper." + "\nExample: \n" + "wrap_clf = dask_ml.wrappers.Incremental(GradientBoostingClassifier(), " + f"{func_name}_meta = np.array([1],dtype=np.int8))" + ) + warnings.warn(msg) + ar = np.zeros(shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) + return model_fn(ar, estimator) + + +def copy_learned_attributes(from_estimator, to_estimator): + attrs = {k: v for k, v in vars(from_estimator).items() if k.endswith("_")} + + for k, v in attrs.items(): + setattr(to_estimator, k, v) From a95741f1370890a06b751787b1f6d7b13608ba2c Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 30 Sep 2022 15:37:08 -0700 Subject: [PATCH 02/15] _timer --- dask_sql/physical/rel/custom/wrappers.py | 35 ++++++++++++++++++------ 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 65ee761d2..2bdc231a3 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -1,8 +1,11 @@ # Copyright 2017, Dask developers # Dask-ML project - https://github.com/dask/dask-ml """Meta-estimators for parallelizing estimators using the scikit-learn API.""" +import contextlib +import datetime import logging import warnings +from timeit import default_timer as tic import dask.array as da import dask.dataframe as dd @@ -11,8 +14,6 @@ import sklearn.base import sklearn.metrics -from dask_ml.utils import _timer - logger = logging.getLogger(__name__) @@ -429,12 +430,7 @@ def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): f"Metadata for {func_name} is not provided, so Dask is " f"running the {func_name} " "function on a small dataset to guess output metadata. " - "As a result, It is possible that Dask will guess incorrectly.\n" - "To silence this warning, provide explicit " - f"`{func_name}_meta` to the dask_ml.wrapper." - "\nExample: \n" - "wrap_clf = dask_ml.wrappers.Incremental(GradientBoostingClassifier(), " - f"{func_name}_meta = np.array([1],dtype=np.int8))" + "As a result, It is possible that Dask will guess incorrectly." ) warnings.warn(msg) ar = np.zeros(shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) @@ -446,3 +442,26 @@ def copy_learned_attributes(from_estimator, to_estimator): for k, v in attrs.items(): setattr(to_estimator, k, v) + + +@contextlib.contextmanager +def _timer(name, _logger=None, level="info"): + """ + Output execution time of a function to the given logger level + Parameters + ---------- + name : str + How to name the timer (will be in the logs) + logger : logging.logger + The optional logger where to write + level : str + On which level to log the performance measurement + """ + start = tic() + _logger = _logger or logger + _logger.info("Starting %s", name) + yield + stop = tic() + delta = datetime.timedelta(seconds=stop - start) + _logger_level = getattr(_logger, level) + _logger_level("Finished %s in %s", name, delta) # nicer formatting for time. From c8d901dc27fd570f70c5aad3903cc9fe0bfd12ef Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 30 Sep 2022 15:55:00 -0700 Subject: [PATCH 03/15] update create_experiment --- dask_sql/physical/rel/custom/create_experiment.py | 14 ++------------ docs/source/sql/ml.rst | 2 +- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_experiment.py b/dask_sql/physical/rel/custom/create_experiment.py index 1bfd27a89..27b829642 100644 --- a/dask_sql/physical/rel/custom/create_experiment.py +++ b/dask_sql/physical/rel/custom/create_experiment.py @@ -166,12 +166,7 @@ def convert( f"Can not import tuner {experiment_class}. Make sure you spelled it correctly and have installed all packages." ) - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError( - "dask_ml must be installed to use automl and tune hyperparameters" - ) + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit model = ModelClass() @@ -197,12 +192,7 @@ def convert( f"Can not import automl model {automl_class}. Make sure you spelled it correctly and have installed all packages." ) - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError( - "dask_ml must be installed to use automl and tune hyperparameters" - ) + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit automl = AutoMLClass(**automl_kwargs) # should be avoided if data doesn't fit in memory diff --git a/docs/source/sql/ml.rst b/docs/source/sql/ml.rst index 931cdc5ee..5c3a3b9d1 100644 --- a/docs/source/sql/ml.rst +++ b/docs/source/sql/ml.rst @@ -62,7 +62,7 @@ The key-value parameters control, how and which model is trained: want to set this parameter. * ``wrap_predict``: Boolean flag, whether to wrap the selected - model with a :class:`dask_ml.wrappers.ParallelPostFit`. + model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. Have a look into the `dask-ml docu on ParallelPostFit `_ to learn more about it. Defaults to false. Typically you set From 272501b89fb28e49f23564c9dab333b326b02bce Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 4 Oct 2022 15:59:58 -0700 Subject: [PATCH 04/15] update comment --- dask_sql/physical/rel/custom/create_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index d961c6cbe..33dd1d38b 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -43,7 +43,7 @@ class CreateModelPlugin(BaseRelPlugin): unsupervised algorithms). This means, you typically want to set this parameter. * wrap_predict: Boolean flag, whether to wrap the selected - model with a :class:`dask_ml.wrappers.ParallelPostFit`. + model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. Have a look into the [dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation) to learn more about it. Defaults to false. Typically you set From 47c9a76ccadeac557ec15b6ab4de2d518f5bf8cd Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 09:53:18 -0700 Subject: [PATCH 05/15] migrate changes from 799 --- dask_sql/physical/rel/custom/create_model.py | 12 +- dask_sql/physical/rel/custom/wrappers.py | 111 ++++++++++++++----- tests/integration/test_model.py | 76 +++++++++++++ 3 files changed, 169 insertions(+), 30 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 33dd1d38b..7df1afbfe 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer @@ -180,7 +181,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + if "sklearn" in model_class: + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba=output_meta, + transform_meta=output_meta, + ) + else: + model = ParallelPostFit(estimator=model) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 2bdc231a3..d34cec148 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -164,23 +164,22 @@ def transform(self, X): """ self._check_method("transform") X = self._check_array(X) - meta = self.transform_meta + output_meta = self.transform_meta if isinstance(X, da.Array): - if meta is None: - meta = _get_output_dask_ar_meta_for_estimator( + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( _transform, self._postfit_estimator, X ) return X.map_blocks( - _transform, estimator=self._postfit_estimator, meta=meta + _transform, estimator=self._postfit_estimator, meta=output_meta ) elif isinstance(X, dd._Frame): - if meta is None: - # dask-dataframe relies on dd.core.no_default - # for infering meta - meta = dd.core.no_default - return X.map_partitions( - _transform, estimator=self._postfit_estimator, meta=meta + return _get_output_df_for_estimator( + model_fn=_transform, + X=X, + output_meta=output_meta, + estimator=self._postfit_estimator, ) else: return _transform(X, estimator=self._postfit_estimator) @@ -246,24 +245,28 @@ def predict(self, X): """ self._check_method("predict") X = self._check_array(X) - meta = self.predict_meta + output_meta = self.predict_meta if isinstance(X, da.Array): - if meta is None: - meta = _get_output_dask_ar_meta_for_estimator( + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( _predict, self._postfit_estimator, X ) result = X.map_blocks( - _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta + _predict, + estimator=self._postfit_estimator, + drop_axis=1, + meta=output_meta, ) return result elif isinstance(X, dd._Frame): - if meta is None: - meta = dd.core.no_default - return X.map_partitions( - _predict, estimator=self._postfit_estimator, meta=meta + return _get_output_df_for_estimator( + model_fn=_predict, + X=X, + output_meta=output_meta, + estimator=self._postfit_estimator, ) else: return _predict(X, estimator=self._postfit_estimator) @@ -290,25 +293,26 @@ def predict_proba(self, X): self._check_method("predict_proba") - meta = self.predict_proba_meta + output_meta = self.predict_proba_meta if isinstance(X, da.Array): - if meta is None: - meta = _get_output_dask_ar_meta_for_estimator( + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( _predict_proba, self._postfit_estimator, X ) # XXX: multiclass return X.map_blocks( _predict_proba, estimator=self._postfit_estimator, - meta=meta, + meta=output_meta, chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) elif isinstance(X, dd._Frame): - if meta is None: - meta = dd.core.no_default - return X.map_partitions( - _predict_proba, estimator=self._postfit_estimator, meta=meta + return _get_output_df_for_estimator( + model_fn=_predict_proba, + X=X, + output_meta=output_meta, + estimator=self._postfit_estimator, ) else: return _predict_proba(X, estimator=self._postfit_estimator) @@ -374,18 +378,59 @@ def _first_block(dask_object): return dask_object -def _predict(part, estimator): +def _predict(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output return estimator.predict(part) -def _predict_proba(part, estimator): +def _predict_proba(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output return estimator.predict_proba(part) -def _transform(part, estimator): +def _transform(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output return estimator.transform(part) +def handle_empty_partitions(output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implemented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :] + + def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): """ Returns the output metadata array @@ -437,6 +482,14 @@ def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): return model_fn(ar, estimator) +def _get_output_df_for_estimator(model_fn, X, output_meta, estimator): + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = model_fn(X._meta_nonempty, estimator) + return X.map_partitions(model_fn, estimator, output_meta, meta=output_meta) + + def copy_learned_attributes(from_estimator, to_estimator): attrs = {k: v for k, v in vars(from_estimator).items() if k.endswith("_")} diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 044a56fcc..8c97b770c 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -924,3 +924,79 @@ def test_experiment_automl_regressor(c, client, training_df): ), "Best model was not registered" check_trained_model(c, "my_automl_exp2") + + +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@skip_if_external_scheduler +def test_predict_with_nullable_types(c): + df = pd.DataFrame( + { + "rough_day_of_year": [0, 1, 2, 3], + "prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0], + "rained": [False, False, False, True], + } + ) + c.create_table("train_set", df) + + model_class = "'sklearn.linear_model.LogisticRegression'" + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + expected = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + df = pd.DataFrame( + { + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), + "prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), + "rained": pd.Series([False, False, False, True]), + } + ) + c.create_table("train_set", df) + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + result = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + assert_eq( + expected, + result, + check_dtype=False, + ) From 2195275de140ca0965a62475fbd217ba90382236 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 10:53:05 -0700 Subject: [PATCH 06/15] predict_proba_meta --- dask_sql/physical/rel/custom/create_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 7df1afbfe..179dd7971 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -186,7 +186,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model = ParallelPostFit( estimator=model, predict_meta=output_meta, - predict_proba=output_meta, + predict_proba_meta=output_meta, transform_meta=output_meta, ) else: From 063453401a8408e593c103cc0a46d8be61bf3809 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 11:27:14 -0700 Subject: [PATCH 07/15] fix gpu? --- dask_sql/physical/rel/custom/wrappers.py | 65 ++++++++++++++---------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index d34cec148..5b7e5dee2 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -175,12 +175,19 @@ def transform(self, X): _transform, estimator=self._postfit_estimator, meta=output_meta ) elif isinstance(X, dd._Frame): - return _get_output_df_for_estimator( - model_fn=_transform, - X=X, - output_meta=output_meta, - estimator=self._postfit_estimator, - ) + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = _transform(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _transform, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + return _transform(X, estimator=self._postfit_estimator) else: return _transform(X, estimator=self._postfit_estimator) @@ -262,12 +269,19 @@ def predict(self, X): return result elif isinstance(X, dd._Frame): - return _get_output_df_for_estimator( - model_fn=_predict, - X=X, - output_meta=output_meta, - estimator=self._postfit_estimator, - ) + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = _predict(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _predict, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + return _predict(X, estimator=self._postfit_estimator) else: return _predict(X, estimator=self._postfit_estimator) @@ -308,12 +322,19 @@ def predict_proba(self, X): chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) elif isinstance(X, dd._Frame): - return _get_output_df_for_estimator( - model_fn=_predict_proba, - X=X, - output_meta=output_meta, - estimator=self._postfit_estimator, - ) + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = _predict_proba(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _predict_proba, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + return _predict_proba(X, estimator=self._postfit_estimator) else: return _predict_proba(X, estimator=self._postfit_estimator) @@ -482,14 +503,6 @@ def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): return model_fn(ar, estimator) -def _get_output_df_for_estimator(model_fn, X, output_meta, estimator): - if output_meta is None: - # dask-dataframe relies on dd.core.no_default - # for infering meta - output_meta = model_fn(X._meta_nonempty, estimator) - return X.map_partitions(model_fn, estimator, output_meta, meta=output_meta) - - def copy_learned_attributes(from_estimator, to_estimator): attrs = {k: v for k, v in vars(from_estimator).items() if k.endswith("_")} From c75fab100700d6dba02be570e51cd4e8d8b07290 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 11:49:43 -0700 Subject: [PATCH 08/15] fix TypeError? --- dask_sql/physical/rel/custom/wrappers.py | 30 +++--------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 5b7e5dee2..13d561705 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -187,7 +187,7 @@ def transform(self, X): meta=output_meta, ) except ValueError: - return _transform(X, estimator=self._postfit_estimator) + return self._postfit_estimator.transform(X) else: return _transform(X, estimator=self._postfit_estimator) @@ -281,7 +281,7 @@ def predict(self, X): meta=output_meta, ) except ValueError: - return _predict(X, estimator=self._postfit_estimator) + return self._postfit_estimator.predict(X) else: return _predict(X, estimator=self._postfit_estimator) @@ -334,7 +334,7 @@ def predict_proba(self, X): meta=output_meta, ) except ValueError: - return _predict_proba(X, estimator=self._postfit_estimator) + return self._postfit_estimator.predict_proba(X) else: return _predict_proba(X, estimator=self._postfit_estimator) @@ -375,30 +375,6 @@ def _check_method(self, method): return getattr(estimator, method) -def _first_block(dask_object): - """Extract the first block / partition from a dask object""" - if isinstance(dask_object, da.Array): - if dask_object.ndim > 1 and dask_object.numblocks[-1] != 1: - raise NotImplementedError( - "IID estimators require that the array " - "blocked only along the first axis. " - "Rechunk your array before fitting." - ) - shape = (dask_object.chunks[0][0],) - if dask_object.ndim > 1: - shape = shape + (dask_object.chunks[1][0],) - - return da.from_delayed( - dask_object.to_delayed().flatten()[0], shape, dask_object.dtype - ) - - if isinstance(dask_object, dd._Frame): - return dask_object.get_partition(0) - - else: - return dask_object - - def _predict(part, estimator, output_meta=None): if part.shape[0] == 0 and output_meta is not None: empty_output = handle_empty_partitions(output_meta) From 68de80843b76c834861acc0b4c2db6a210b51357 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 12:25:44 -0700 Subject: [PATCH 09/15] trying again --- dask_sql/physical/rel/custom/wrappers.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 13d561705..19140e788 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -176,8 +176,6 @@ def transform(self, X): ) elif isinstance(X, dd._Frame): if output_meta is None: - # dask-dataframe relies on dd.core.no_default - # for infering meta output_meta = _transform(X._meta_nonempty, self._postfit_estimator) try: return X.map_partitions( @@ -187,7 +185,13 @@ def transform(self, X): meta=output_meta, ) except ValueError: - return self._postfit_estimator.transform(X) + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + meta = dd.core.no_default + return X.map_partitions( + _transform, estimator=self._postfit_estimator, meta=meta + ) else: return _transform(X, estimator=self._postfit_estimator) @@ -281,7 +285,11 @@ def predict(self, X): meta=output_meta, ) except ValueError: - return self._postfit_estimator.predict(X) + if output_meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict, estimator=self._postfit_estimator, meta=meta + ) else: return _predict(X, estimator=self._postfit_estimator) @@ -334,7 +342,11 @@ def predict_proba(self, X): meta=output_meta, ) except ValueError: - return self._postfit_estimator.predict_proba(X) + if output_meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict_proba, estimator=self._postfit_estimator, meta=meta + ) else: return _predict_proba(X, estimator=self._postfit_estimator) From b2ca477af472174c06344f5926b4fef5460ce594 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 12:38:12 -0700 Subject: [PATCH 10/15] meta to output_meta --- dask_sql/physical/rel/custom/wrappers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 19140e788..9eb0378df 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -188,9 +188,9 @@ def transform(self, X): if output_meta is None: # dask-dataframe relies on dd.core.no_default # for infering meta - meta = dd.core.no_default + output_meta = dd.core.no_default return X.map_partitions( - _transform, estimator=self._postfit_estimator, meta=meta + _transform, estimator=self._postfit_estimator, meta=output_meta ) else: return _transform(X, estimator=self._postfit_estimator) @@ -286,9 +286,9 @@ def predict(self, X): ) except ValueError: if output_meta is None: - meta = dd.core.no_default + output_meta = dd.core.no_default return X.map_partitions( - _predict, estimator=self._postfit_estimator, meta=meta + _predict, estimator=self._postfit_estimator, meta=output_meta ) else: return _predict(X, estimator=self._postfit_estimator) @@ -343,9 +343,9 @@ def predict_proba(self, X): ) except ValueError: if output_meta is None: - meta = dd.core.no_default + output_meta = dd.core.no_default return X.map_partitions( - _predict_proba, estimator=self._postfit_estimator, meta=meta + _predict_proba, estimator=self._postfit_estimator, meta=output_meta ) else: return _predict_proba(X, estimator=self._postfit_estimator) From a44ea195e64c38d415e9694197e440b1b5803b9a Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 11 Oct 2022 15:08:53 -0700 Subject: [PATCH 11/15] remove _timer --- dask_sql/physical/rel/custom/wrappers.py | 32 ++---------------------- 1 file changed, 2 insertions(+), 30 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 9eb0378df..a1605ba0e 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -1,11 +1,8 @@ # Copyright 2017, Dask developers # Dask-ML project - https://github.com/dask/dask-ml """Meta-estimators for parallelizing estimators using the scikit-learn API.""" -import contextlib -import datetime import logging import warnings -from timeit import default_timer as tic import dask.array as da import dask.dataframe as dd @@ -126,8 +123,7 @@ def fit(self, X, y=None, **kwargs): self : object """ logger.info("Starting fit") - with _timer("fit", _logger=logger): - result = self.estimator.fit(X, y, **kwargs) + result = self.estimator.fit(X, y, **kwargs) # Copy over learned attributes copy_learned_attributes(result, self) @@ -136,8 +132,7 @@ def fit(self, X, y=None, **kwargs): def partial_fit(self, X, y=None, **kwargs): logger.info("Starting partial_fit") - with _timer("fit", _logger=logger): - result = self.estimator.partial_fit(X, y, **kwargs) + result = self.estimator.partial_fit(X, y, **kwargs) # Copy over learned attributes copy_learned_attributes(result, self) @@ -496,26 +491,3 @@ def copy_learned_attributes(from_estimator, to_estimator): for k, v in attrs.items(): setattr(to_estimator, k, v) - - -@contextlib.contextmanager -def _timer(name, _logger=None, level="info"): - """ - Output execution time of a function to the given logger level - Parameters - ---------- - name : str - How to name the timer (will be in the logs) - logger : logging.logger - The optional logger where to write - level : str - On which level to log the performance measurement - """ - start = tic() - _logger = _logger or logger - _logger.info("Starting %s", name) - yield - stop = tic() - delta = datetime.timedelta(seconds=stop - start) - _logger_level = getattr(_logger, level) - _logger_level("Finished %s in %s", name, delta) # nicer formatting for time. From c26c5f4750a663c461970e6d790ec9ab0c974e40 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 18 Oct 2022 11:18:26 -0700 Subject: [PATCH 12/15] try import sklearn --- dask_sql/physical/rel/custom/wrappers.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index a1605ba0e..af658ed8a 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -8,8 +8,11 @@ import dask.dataframe as dd import dask.delayed import numpy as np -import sklearn.base -import sklearn.metrics +try: + import sklearn.base + import sklearn.metrics +except ImportError: # pragma: no cover + raise ValueError("sklearn must be installed") logger = logging.getLogger(__name__) From b31c3558109a7561c4b0ca73696de194149d257b Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 18 Oct 2022 11:22:21 -0700 Subject: [PATCH 13/15] style fix --- dask_sql/physical/rel/custom/wrappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index af658ed8a..b6eb2fd36 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -8,6 +8,7 @@ import dask.dataframe as dd import dask.delayed import numpy as np + try: import sklearn.base import sklearn.metrics From 009fd28eb3f1066f6c57895952ff6f45f66fbcf0 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Mon, 24 Oct 2022 11:01:45 -0700 Subject: [PATCH 14/15] Update wrappers.py --- dask_sql/physical/rel/custom/wrappers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index b6eb2fd36..175454c3b 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -233,7 +233,7 @@ def score(self, X, y, compute=True): else: # TODO: implement Dask-ML's get_scorer() function # scorer = get_scorer(scoring, compute=compute) - pass + raise NotImplementedError("get_scorer function not implemented") return scorer(self, X, y) else: return self._postfit_estimator.score(X, y) From ef087dbecb27b758194a0e75adc4e56fed5e13d6 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Mon, 24 Oct 2022 11:21:44 -0700 Subject: [PATCH 15/15] use ImportError --- dask_sql/physical/rel/custom/wrappers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 175454c3b..7ed0d0dea 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -13,7 +13,7 @@ import sklearn.base import sklearn.metrics except ImportError: # pragma: no cover - raise ValueError("sklearn must be installed") + raise ImportError("sklearn must be installed") logger = logging.getLogger(__name__)