Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dask_ml.wrappers.Incremental with custom Incremental class #855

Merged
merged 14 commits into from
Nov 14, 2022
16 changes: 6 additions & 10 deletions dask_sql/physical/rel/custom/create_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,9 @@ class CreateExperimentPlugin(BaseRelPlugin):
* model_class: Full path to the class of the model which has to be tuned.
Any model class with sklearn interface is valid, but might or
might not work well with Dask dataframes.
Have a look into the
[dask-ml documentation](https://ml.dask.org/index.html)
for more information on which models work best.
You might need to install necessary packages to use
the models.
* experiment_class : Full path of the Hyperparameter tuner
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
from dask_ml, choose dask tuner class carefully based on what you
exactly need (memory vs compute constrains), refer:
[dask-ml documentation](https://ml.dask.org/hyper-parameter-search.html)
(for tuning hyperparameter of the models both model_class and experiment class are
required parameters.)
* tune_parameters:
Key-value of pairs of Hyperparameters to tune, i.e Search Space for
particular model to tune
Expand All @@ -64,7 +56,7 @@ class CreateExperimentPlugin(BaseRelPlugin):

CREATE EXPERIMENT my_exp WITH(
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
Expand Down Expand Up @@ -174,7 +166,11 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

search = ExperimentClass(model, {**parameters}, **experiment_kwargs)
logger.info(tune_fit_kwargs)
search.fit(X, y, **tune_fit_kwargs)
search.fit(
X.to_dask_array(lengths=True),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could experimentClass be a gpu based model or is it limited to cpu based ones only?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, since I think every example I've seen with experiment_class has been with a CPU dask_ml model... I can try to get a better idea of the scope when the pytests in #886 are updated with other non-dask_ml models. We can see about adding GPU tests there too.

y.to_dask_array(lengths=True),
**tune_fit_kwargs,
)
df = pd.DataFrame(search.cv_results_)
df["model_class"] = model_class

Expand Down
33 changes: 11 additions & 22 deletions dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class CreateModelPlugin(BaseRelPlugin):
* model_class: Full path to the class of the model to train.
Any model class with sklearn interface is valid, but might or
might not work well with Dask dataframes.
Have a look into the
[dask-ml documentation](https://ml.dask.org/index.html)
for more information on which models work best.
You might need to install necessary packages to use
the models.
* target_column: Which column from the data to use as target.
Expand All @@ -45,16 +42,12 @@ class CreateModelPlugin(BaseRelPlugin):
want to set this parameter.
* wrap_predict: Boolean flag, whether to wrap the selected
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`.
Have a look into the
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation)
to learn more about it. Defaults to false. Typically you set
it to true for sklearn models if predicting on big data.
Defaults to false. Typically you set it to true for
sklearn models if predicting on big data.
Comment on lines 44 to +46
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR , can you file an issue to clean up the wrap_predict and wrap_fit arguments. I think we can get rid of this or do a better default based on the class name of the model.

For sklearn and single gpu cuML models, switch this to true else switch this to False.

* wrap_fit: Boolean flag, whether to wrap the selected
model with a :class:`dask_ml.wrappers.Incremental`.
Have a look into the
[dask-ml docu](https://ml.dask.org/incremental.html)
to learn more about it. Defaults to false. Typically you set
it to true for sklearn models if training on big data.
model with a :class:`dask_sql.physical.rel.custom.wrappers.Incremental`.
Defaults to false. Typically you set it to true for
sklearn models if training on big data.
* fit_kwargs: keyword arguments sent to the call to fit().

All other arguments are passed to the constructor of the
Expand All @@ -76,7 +69,7 @@ class CreateModelPlugin(BaseRelPlugin):
Examples:

CREATE MODEL my_model WITH (
model_class = 'dask_ml.xgboost.XGBClassifier',
model_class = 'xgboost.XGBClassifier',
target_column = 'target'
) AS (
SELECT x, y, target
Expand All @@ -95,11 +88,10 @@ class CreateModelPlugin(BaseRelPlugin):
dask dataframes.

* if you are training on relatively small amounts
of data but predicting on large data samples
(and you are not using a model build for usage with dask
from the dask-ml package), you might want to set
`wrap_predict` to True. With this option,
model interference will be parallelized/distributed.
of data but predicting on large data samples,
you might want to set `wrap_predict` to True.
With this option, model interference will be
parallelized/distributed.
* If you are training on large amounts of data,
you can try setting wrap_fit to True. This will
do the same on the training step, but works only on
Expand Down Expand Up @@ -158,10 +150,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model = ModelClass(**kwargs)
if wrap_fit:
try:
from dask_ml.wrappers import Incremental
except ImportError: # pragma: no cover
raise ValueError("Wrapping requires dask-ml to be installed.")
from dask_sql.physical.rel.custom.wrappers import Incremental

model = Incremental(estimator=model)

Expand Down
206 changes: 206 additions & 0 deletions dask_sql/physical/rel/custom/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2017, Dask developers
# Dask-ML project - https://github.com/dask/dask-ml
from typing import Optional, TypeVar

import dask
import dask.array as da
import numpy as np
import sklearn.metrics
import sklearn.utils.multiclass
from dask.array import Array
from dask.utils import derived_from

ArrayLike = TypeVar("ArrayLike", Array, np.ndarray)


def accuracy_score(
y_true: ArrayLike,
y_pred: ArrayLike,
normalize: bool = True,
sample_weight: Optional[ArrayLike] = None,
compute: bool = True,
) -> ArrayLike:
"""Accuracy classification score.
In multilabel classification, this function computes subset accuracy:
the set of labels predicted for a sample must *exactly* match the
corresponding set of labels in y_true.
Read more in the :ref:`User Guide <accuracy_score>`.
Parameters
----------
y_true : 1d array-like, or label indicator array
Ground truth (correct) labels.
y_pred : 1d array-like, or label indicator array
Predicted labels, as returned by a classifier.
normalize : bool, optional (default=True)
If ``False``, return the number of correctly classified samples.
Otherwise, return the fraction of correctly classified samples.
sample_weight : 1d array-like, optional
Sample weights.
.. versionadded:: 0.7.0
Returns
-------
score : scalar dask Array
If ``normalize == True``, return the correctly classified samples
(float), else it returns the number of correctly classified samples
(int).
The best performance is 1 with ``normalize == True`` and the number
of samples with ``normalize == False``.
Notes
-----
In binary and multiclass classification, this function is equal
to the ``jaccard_similarity_score`` function.

"""

if y_true.ndim > 1:
differing_labels = ((y_true - y_pred) == 0).all(1)
score = differing_labels != 0
else:
score = y_true == y_pred

if normalize:
score = da.average(score, weights=sample_weight)
elif sample_weight is not None:
score = da.dot(score, sample_weight)
else:
score = score.sum()

if compute:
score = score.compute()
return score


def _log_loss_inner(
x: ArrayLike, y: ArrayLike, sample_weight: Optional[ArrayLike], **kwargs
):
# da.map_blocks wasn't able to concatenate together the results
# when we reduce down to a scalar per block. So we make an
# array with 1 element.
if sample_weight is not None:
sample_weight = sample_weight.ravel()
return np.array(
[sklearn.metrics.log_loss(x, y, sample_weight=sample_weight, **kwargs)]
)


def log_loss(
y_true, y_pred, eps=1e-15, normalize=True, sample_weight=None, labels=None
):
if not (dask.is_dask_collection(y_true) and dask.is_dask_collection(y_pred)):
return sklearn.metrics.log_loss(
y_true,
y_pred,
eps=eps,
normalize=normalize,
sample_weight=sample_weight,
labels=labels,
)

if y_pred.ndim > 1 and y_true.ndim == 1:
y_true = y_true.reshape(-1, 1)
drop_axis: Optional[int] = 1
if sample_weight is not None:
sample_weight = sample_weight.reshape(-1, 1)
else:
drop_axis = None

result = da.map_blocks(
_log_loss_inner,
y_true,
y_pred,
sample_weight,
chunks=(1,),
drop_axis=drop_axis,
dtype="f8",
eps=eps,
normalize=normalize,
labels=labels,
)
if normalize and sample_weight is not None:
sample_weight = sample_weight.ravel()
block_weights = sample_weight.map_blocks(np.sum, chunks=(1,), keepdims=True)
return da.average(result, 0, weights=block_weights)
elif normalize:
return result.mean()
else:
return result.sum()


def _check_sample_weight(sample_weight: Optional[ArrayLike]):
if sample_weight is not None:
raise ValueError("'sample_weight' is not supported.")


@derived_from(sklearn.metrics)
def mean_squared_error(
y_true: ArrayLike,
y_pred: ArrayLike,
sample_weight: Optional[ArrayLike] = None,
multioutput: Optional[str] = "uniform_average",
squared: bool = True,
compute: bool = True,
) -> ArrayLike:
_check_sample_weight(sample_weight)
output_errors = ((y_pred - y_true) ** 2).mean(axis=0)

if isinstance(multioutput, str) or multioutput is None:
if multioutput == "raw_values":
if compute:
return output_errors.compute()
else:
return output_errors
else:
raise ValueError("Weighted 'multioutput' not supported.")
result = output_errors.mean()
if not squared:
result = da.sqrt(result)
if compute:
result = result.compute()
return result


def _check_reg_targets(
y_true: ArrayLike, y_pred: ArrayLike, multioutput: Optional[str]
):
if multioutput is not None and multioutput != "uniform_average":
raise NotImplementedError("'multioutput' must be 'uniform_average'")

if y_true.ndim == 1:
y_true = y_true.reshape((-1, 1))
if y_pred.ndim == 1:
y_pred = y_pred.reshape((-1, 1))

# TODO: y_type, multioutput
return None, y_true, y_pred, multioutput


@derived_from(sklearn.metrics)
def r2_score(
y_true: ArrayLike,
y_pred: ArrayLike,
sample_weight: Optional[ArrayLike] = None,
multioutput: Optional[str] = "uniform_average",
compute: bool = True,
) -> ArrayLike:
_check_sample_weight(sample_weight)
_, y_true, y_pred, _ = _check_reg_targets(y_true, y_pred, multioutput)
weight = 1.0

numerator = (weight * (y_true - y_pred) ** 2).sum(axis=0, dtype="f8")
denominator = (weight * (y_true - y_true.mean(axis=0)) ** 2).sum(axis=0, dtype="f8")

nonzero_denominator = denominator != 0
nonzero_numerator = numerator != 0
valid_score = nonzero_denominator & nonzero_numerator
output_chunks = getattr(y_true, "chunks", [None, None])[1]
output_scores = da.ones([y_true.shape[1]], chunks=output_chunks)
with np.errstate(all="ignore"):
output_scores[valid_score] = 1 - (
numerator[valid_score] / denominator[valid_score]
)
output_scores[nonzero_numerator & ~nonzero_denominator] = 0.0

result = output_scores.mean(axis=0)
if compute:
result = result.compute()
return result
Loading