Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dask_ml.wrappers.ParallelPostFit with custom ParallelPostFit class #832

Merged
merged 17 commits into from
Oct 24, 2022
14 changes: 2 additions & 12 deletions dask_sql/physical/rel/custom/create_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
f"Can not import tuner {experiment_class}. Make sure you spelled it correctly and have installed all packages."
)

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError(
"dask_ml must be installed to use automl and tune hyperparameters"
)
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

model = ModelClass()

Expand All @@ -199,12 +194,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
f"Can not import automl model {automl_class}. Make sure you spelled it correctly and have installed all packages."
)

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError(
"dask_ml must be installed to use automl and tune hyperparameters"
)
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

automl = AutoMLClass(**automl_kwargs)
# should be avoided if data doesn't fit in memory
Expand Down
19 changes: 13 additions & 6 deletions dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -43,7 +44,7 @@ class CreateModelPlugin(BaseRelPlugin):
unsupervised algorithms). This means, you typically
want to set this parameter.
* wrap_predict: Boolean flag, whether to wrap the selected
model with a :class:`dask_ml.wrappers.ParallelPostFit`.
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`.
Have a look into the
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this documentation link too and your own documentation in dask_sql. I would like us to move away from wrap_fit functionality to auto-detecting if its a sklearn/single-GPU cuML/xgboost model vs a dask model.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be resolved by #855

to learn more about it. Defaults to false. Typically you set
Expand Down Expand Up @@ -165,10 +166,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
model = Incremental(estimator=model)

if wrap_predict:
try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError("Wrapping requires dask-ml to be installed.")
from dask_sql.physical.rel.custom.wrappers import ParallelPostFit

# When `wrap_predict` is set to True we train on single partition frames
# because this is only useful for non dask distributed models
Expand All @@ -183,7 +181,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
if "sklearn" in model_class:
output_meta = np.array([])
model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)
else:
model = ParallelPostFit(estimator=model)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
Loading