Skip to content

Commit

Permalink
Remove all Dask-ML uses (#886)
Browse files Browse the repository at this point in the history
* initial pass

* remove all dask ml references

* remove use_dask

* fix failing test

* wrap in try/except

* add gpu test

* separate gpu test

* use gpu_client

* remove imports

Co-authored-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
sarahyurick and ayushdg authored Nov 29, 2022
1 parent 161e276 commit f2913c8
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 47 deletions.
9 changes: 3 additions & 6 deletions .github/workflows/test-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ jobs:
mamba install -c conda-forge "sasl>=0.3.1"
docker pull bde2020/hive:2.3.2-postgresql-metastore
docker pull bde2020/hive-metastore-postgresql:2.3.0
- name: Install upstream dev Dask / dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Test with pytest
run: |
pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile
Expand Down Expand Up @@ -112,11 +111,10 @@ jobs:
which python
pip list
mamba list
- name: Install upstream dev dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: run a dask cluster
run: |
if [[ $which_upstream == "Dask" ]]; then
Expand Down Expand Up @@ -161,12 +159,11 @@ jobs:
which python
pip list
mamba list
- name: Install upstream dev Dask / dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
python -m pip install --no-deps git+https://github.com/dask/dask
python -m pip install --no-deps git+https://github.com/dask/distributed
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Try to import dask-sql
run: |
python -c "import dask_sql; print('ok')"
Expand Down
9 changes: 3 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ jobs:
mamba install -c conda-forge "sasl>=0.3.1"
docker pull bde2020/hive:2.3.2-postgresql-metastore
docker pull bde2020/hive-metastore-postgresql:2.3.0
- name: Optionally install upstream dev Dask / dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Test with pytest
run: |
pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile
Expand Down Expand Up @@ -108,11 +107,10 @@ jobs:
which python
pip list
mamba list
- name: Optionally install upstream dev dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: run a dask cluster
env:
UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }}
Expand Down Expand Up @@ -153,12 +151,11 @@ jobs:
which python
pip list
mamba list
- name: Optionally install upstream dev Dask / dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
python -m pip install --no-deps git+https://github.com/dask/dask
python -m pip install --no-deps git+https://github.com/dask/distributed
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Try to import dask-sql
run: |
python -c "import dask_sql; print('ok')"
1 change: 0 additions & 1 deletion continuous_integration/environment-3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-3.8-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml=2022.1.22
- dask=2022.3.0
- fastapi=0.69.0
- fugue=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/gpuci/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
23 changes: 19 additions & 4 deletions dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import uuid
from typing import TYPE_CHECKING

import dask.dataframe as dd
import pandas as pd

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -30,8 +33,7 @@ class PredictModelPlugin(BaseRelPlugin):
Please note however, that it will need to act on Dask dataframes. If you
are using a model not optimized for this, it might be that you run out of memory if
your data is larger than the RAM of a single machine.
To prevent this, have a look into the dask-ml package,
especially the [ParallelPostFit](https://ml.dask.org/meta-estimators.html)
To prevent this, have a look into the dask_sql.physical.rel.custom.wrappers.ParallelPostFit
meta-estimator. If you are using a model trained with `CREATE MODEL`
and the `wrap_predict` flag, this is done automatically.
Expand Down Expand Up @@ -59,8 +61,21 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
predicted_df = df.assign(target=prediction)
try:
prediction = model.predict(df[training_columns])
predicted_df = df.assign(target=prediction)
except TypeError:
df = df.set_index(df.columns[0], drop=False)
prediction = model.predict(df[training_columns])
# Convert numpy.ndarray to Dask Series
prediction = dd.from_pandas(
pd.Series(prediction, index=df.index),
npartitions=df.npartitions,
)
predicted_df = df.assign(target=prediction)
# Need to drop first column to reset index
# because the first column is equal to the index
predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index()

# Create a temporary context, which includes the
# new "table" so that we can use the normal
Expand Down
1 change: 0 additions & 1 deletion docker/conda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ uvicorn>=0.13.4
pyarrow>=6.0.1
prompt_toolkit>=3.0.8
pygments>=2.7.1
dask-ml>=2022.1.22
scikit-learn>=1.0.0
intake>=0.6.0
pre-commit>=2.11.1
Expand Down
1 change: 0 additions & 1 deletion docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ RUN mamba install -y \
nest-asyncio \
# additional dependencies
"pyarrow>=6.0.1" \
"dask-ml>=2022.1.22" \
"scikit-learn>=1.0.0" \
"intake>=0.6.0" \
&& conda clean -ay
Expand Down
8 changes: 3 additions & 5 deletions docs/source/machine_learning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ following sql statements
Want to increase the performance of your model by tuning the
parameters? Use the hyperparameter tuning directly
in SQL using below SQL syntax, choose different tuners
from the dask_ml package based on memory and compute constraints and
for more details refer to the `dask ml documentation <https://ml.dask.org/hyper-parameter-search.html#incremental-hyperparameter-optimization>`_
based on memory and compute constraints.

..
TODO - add a GPU section to these examples once we have working CREATE EXPERIMENT tests for GPU
Expand All @@ -135,7 +134,7 @@ for more details refer to the `dask ml documentation <https://ml.dask.org/hyper-
CREATE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
Expand Down Expand Up @@ -258,7 +257,6 @@ and the boolean target ``label``.
SELECT * FROM training_data
-- We can now train a model from the sklearn package.
-- Make sure to install it together with dask-ml with conda or pip.
CREATE OR REPLACE MODEL my_model WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
wrap_predict = True,
Expand All @@ -282,7 +280,7 @@ and the boolean target ``label``.
-- experiment to tune different hyperparameters
CREATE EXPERIMENT my_exp WITH(
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
Expand Down
3 changes: 1 addition & 2 deletions docs/source/sql/ml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ A model can be anything which has a ``predict`` function.
Please note however, that it will need to act on Dask dataframes. If you
are using a model not optimized for this, it might be that you run out of memory if
your data is larger than the RAM of a single machine.
To prevent this, have a look into the dask-ml package,
especially the `ParallelPostFit <https://ml.dask.org/meta-estimators.html>`_
To prevent this, have a look into the `dask_sql.physical.rel.custom.wrappers.ParallelPostFit`
meta-estimator. If you are using a model trained with ``CREATE MODEL``
and the ``wrap_predict`` flag set to true, this is done automatically.

Expand Down
4 changes: 2 additions & 2 deletions notebooks/Feature Overview.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@
"metadata": {},
"source": [
"- Tune single model with different Hyperparameters \n",
" - install **dask_ml** for tunning\n",
" - install **sklearn** for tuning\n",
"- Tune multiple model with different Hyperparameters\n",
" - install **tpot** for Automl"
]
Expand All @@ -604,7 +604,7 @@
"%%sql\n",
"CREATE EXPERIMENT my_exp WITH (\n",
" model_class = 'sklearn.ensemble.GradientBoostingClassifier',\n",
" experiment_class = 'dask_ml.model_selection.GridSearchCV',\n",
" experiment_class = 'sklearn.model_selection.GridSearchCV',\n",
" tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],\n",
" max_depth = ARRAY [3,4,5,10]),\n",
" target_column = 'species'\n",
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
"mock>=4.0.3",
"sphinx>=3.2.1",
"pyarrow>=6.0.1",
"dask-ml>=2022.1.22",
"scikit-learn>=1.0.0",
"intake>=0.6.0",
"pre-commit",
Expand Down
44 changes: 29 additions & 15 deletions tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
xgboost = None
dask_cudf = None

pytest.importorskip("dask_ml")


def check_trained_model(c, model_name=None):
if model_name is None:
Expand Down Expand Up @@ -157,7 +155,24 @@ def test_clustering_and_prediction(c, training_df):
c.sql(
"""
CREATE MODEL my_model WITH (
model_class = 'dask_ml.cluster.KMeans'
model_class = 'sklearn.cluster.KMeans'
) AS (
SELECT x, y
FROM timeseries
LIMIT 100
)
"""
)

check_trained_model(c)


@pytest.mark.gpu
def test_gpu_clustering_and_prediction(c, gpu_training_df, gpu_client):
c.sql(
"""
CREATE MODEL my_model WITH (
model_class = 'cuml.dask.cluster.KMeans'
) AS (
SELECT x, y
FROM timeseries
Expand Down Expand Up @@ -244,7 +259,7 @@ def test_show_models(c, training_df):
c.sql(
"""
CREATE MODEL my_model2 WITH (
model_class = 'dask_ml.cluster.KMeans'
model_class = 'sklearn.cluster.KMeans'
) AS (
SELECT x, y
FROM timeseries
Expand Down Expand Up @@ -691,7 +706,7 @@ def test_ml_experiment(c, client, training_df):
c.sql(
"""
CREATE EXPERIMENT my_exp WITH (
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand Down Expand Up @@ -731,7 +746,7 @@ def test_ml_experiment(c, client, training_df):
"""
CREATE EXPERIMENT IF NOT EXISTS my_exp WITH (
model_class = 'that.is.not.a.python.class',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand Down Expand Up @@ -794,7 +809,7 @@ def test_ml_experiment(c, client, training_df):
"""
CREATE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand All @@ -816,7 +831,7 @@ def test_ml_experiment(c, client, training_df):
"""
CREATE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand All @@ -831,7 +846,7 @@ def test_ml_experiment(c, client, training_df):
"""
CREATE EXPERIMENT IF NOT EXISTS my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand All @@ -847,7 +862,7 @@ def test_ml_experiment(c, client, training_df):
"""
CREATE OR REPLACE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]),
target_column = 'target'
Expand All @@ -867,8 +882,8 @@ def test_ml_experiment(c, client, training_df):
c.sql(
"""
CREATE EXPERIMENT my_exp1 WITH (
model_class = 'dask_ml.cluster.KMeans',
experiment_class = 'dask_ml.model_selection.RandomizedSearchCV',
model_class = 'sklearn.cluster.KMeans',
experiment_class = 'sklearn.model_selection.RandomizedSearchCV',
tune_parameters = (n_clusters = ARRAY [3,4,16],tol = ARRAY [0.1,0.01,0.001],
max_iter = ARRAY [3,4,5,10])
) AS (
Expand All @@ -889,7 +904,7 @@ def test_experiment_automl_classifier(c, client, training_df):
"""
CREATE EXPERIMENT my_automl_exp1 WITH (
automl_class = 'tpot.TPOTClassifier',
automl_kwargs = (population_size = 2 ,generations=2,cv=2,n_jobs=-1,use_dask=True),
automl_kwargs = (population_size=2, generations=2, cv=2, n_jobs=-1),
target_column = 'target'
) AS (
SELECT x, y, x*y > 0 AS target
Expand All @@ -914,11 +929,10 @@ def test_experiment_automl_regressor(c, client, training_df):
"""
CREATE EXPERIMENT my_automl_exp2 WITH (
automl_class = 'tpot.TPOTRegressor',
automl_kwargs = (population_size = 2,
automl_kwargs = (population_size=2,
generations=2,
cv=2,
n_jobs=-1,
use_dask=True,
max_eval_time_mins=1),
target_column = 'target'
Expand Down

0 comments on commit f2913c8

Please sign in to comment.