Skip to content

Commit

Permalink
Replace dask_ml.wrappers.ParallelPostFit with custom `ParallelPostF…
Browse files Browse the repository at this point in the history
…it` class (#832)

* create ParallelPostFit class

* _timer

* update create_experiment

* update comment

* migrate changes from 799

* predict_proba_meta

* fix gpu?

* fix TypeError?

* trying again

* meta to output_meta

* remove _timer

* try import sklearn

* style fix

* Update wrappers.py

* use ImportError
  • Loading branch information
sarahyurick authored Oct 24, 2022
1 parent e9ff9cd commit feecf41
Show file tree
Hide file tree
Showing 5 changed files with 589 additions and 19 deletions.
14 changes: 2 additions & 12 deletions dask_sql/physical/rel/custom/create_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
f"Can not import tuner {experiment_class}. Make sure you spelled it correctly and have installed all packages."
)

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError(
"dask_ml must be installed to use automl and tune hyperparameters"
)
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

model = ModelClass()

Expand All @@ -199,12 +194,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
f"Can not import automl model {automl_class}. Make sure you spelled it correctly and have installed all packages."
)

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError(
"dask_ml must be installed to use automl and tune hyperparameters"
)
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

automl = AutoMLClass(**automl_kwargs)
# should be avoided if data doesn't fit in memory
Expand Down
19 changes: 13 additions & 6 deletions dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -43,7 +44,7 @@ class CreateModelPlugin(BaseRelPlugin):
unsupervised algorithms). This means, you typically
want to set this parameter.
* wrap_predict: Boolean flag, whether to wrap the selected
model with a :class:`dask_ml.wrappers.ParallelPostFit`.
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`.
Have a look into the
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation)
to learn more about it. Defaults to false. Typically you set
Expand Down Expand Up @@ -165,10 +166,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
model = Incremental(estimator=model)

if wrap_predict:
try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError("Wrapping requires dask-ml to be installed.")
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

# When `wrap_predict` is set to True we train on single partition frames
# because this is only useful for non dask distributed models
Expand All @@ -183,7 +181,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
if "sklearn" in model_class:
output_meta = np.array([])
model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)
else:
model = ParallelPostFit(estimator=model)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
Loading

0 comments on commit feecf41

Please sign in to comment.