From f2913c8d83d64bc106b2be2c587dce7e1713ea04 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 29 Nov 2022 10:38:03 -0800 Subject: [PATCH] Remove all Dask-ML uses (#886) * initial pass * remove all dask ml references * remove use_dask * fix failing test * wrap in try/except * add gpu test * separate gpu test * use gpu_client * remove imports Co-authored-by: Ayush Dattagupta --- .github/workflows/test-upstream.yml | 9 ++-- .github/workflows/test.yml | 9 ++-- .../environment-3.10-dev.yaml | 1 - .../environment-3.8-dev.yaml | 1 - .../environment-3.9-dev.yaml | 1 - continuous_integration/gpuci/environment.yaml | 1 - dask_sql/physical/rel/custom/predict.py | 23 ++++++++-- docker/conda.txt | 1 - docker/main.dockerfile | 1 - docs/source/machine_learning.rst | 8 ++-- docs/source/sql/ml.rst | 3 +- notebooks/Feature Overview.ipynb | 4 +- setup.py | 1 - tests/integration/test_model.py | 44 ++++++++++++------- 14 files changed, 60 insertions(+), 47 deletions(-) diff --git a/.github/workflows/test-upstream.yml b/.github/workflows/test-upstream.yml index bd682c114..584adcfaa 100644 --- a/.github/workflows/test-upstream.yml +++ b/.github/workflows/test-upstream.yml @@ -73,11 +73,10 @@ jobs: mamba install -c conda-forge "sasl>=0.3.1" docker pull bde2020/hive:2.3.2-postgresql-metastore docker pull bde2020/hive-metastore-postgresql:2.3.0 - - name: Install upstream dev Dask / dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile @@ -112,11 +111,10 @@ jobs: which python pip list mamba list - - name: Install upstream dev dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: run a dask cluster run: | if [[ $which_upstream == "Dask" ]]; then @@ -161,12 +159,11 @@ jobs: which python pip list mamba list - - name: Install upstream dev Dask / dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | python -m pip install --no-deps git+https://github.com/dask/dask python -m pip install --no-deps git+https://github.com/dask/distributed - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Try to import dask-sql run: | python -c "import dask_sql; print('ok')" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 624ec0022..82d81e6d4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -64,11 +64,10 @@ jobs: mamba install -c conda-forge "sasl>=0.3.1" docker pull bde2020/hive:2.3.2-postgresql-metastore docker pull bde2020/hive-metastore-postgresql:2.3.0 - - name: Optionally install upstream dev Dask / dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile @@ -108,11 +107,10 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: run a dask cluster env: UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }} @@ -153,12 +151,11 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev Dask / dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | python -m pip install --no-deps git+https://github.com/dask/dask python -m pip install --no-deps git+https://github.com/dask/distributed - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Try to import dask-sql run: | python -c "import dask_sql; print('ok')" diff --git a/continuous_integration/environment-3.10-dev.yaml b/continuous_integration/environment-3.10-dev.yaml index e86d4e62f..d5ffa777a 100644 --- a/continuous_integration/environment-3.10-dev.yaml +++ b/continuous_integration/environment-3.10-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/continuous_integration/environment-3.8-dev.yaml b/continuous_integration/environment-3.8-dev.yaml index 33b6492db..93bab825c 100644 --- a/continuous_integration/environment-3.8-dev.yaml +++ b/continuous_integration/environment-3.8-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml=2022.1.22 - dask=2022.3.0 - fastapi=0.69.0 - fugue=0.7.0 diff --git a/continuous_integration/environment-3.9-dev.yaml b/continuous_integration/environment-3.9-dev.yaml index 8a2a2bcb0..4c5c23511 100644 --- a/continuous_integration/environment-3.9-dev.yaml +++ b/continuous_integration/environment-3.9-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/continuous_integration/gpuci/environment.yaml b/continuous_integration/gpuci/environment.yaml index c839083e6..ba248a2ff 100644 --- a/continuous_integration/gpuci/environment.yaml +++ b/continuous_integration/gpuci/environment.yaml @@ -6,7 +6,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..1d1f2fd10 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -2,6 +2,9 @@ import uuid from typing import TYPE_CHECKING +import dask.dataframe as dd +import pandas as pd + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -30,8 +33,7 @@ class PredictModelPlugin(BaseRelPlugin): Please note however, that it will need to act on Dask dataframes. If you are using a model not optimized for this, it might be that you run out of memory if your data is larger than the RAM of a single machine. - To prevent this, have a look into the dask-ml package, - especially the [ParallelPostFit](https://ml.dask.org/meta-estimators.html) + To prevent this, have a look into the dask_sql.physical.rel.custom.wrappers.ParallelPostFit meta-estimator. If you are using a model trained with `CREATE MODEL` and the `wrap_predict` flag, this is done automatically. @@ -59,8 +61,21 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - prediction = model.predict(df[training_columns]) - predicted_df = df.assign(target=prediction) + try: + prediction = model.predict(df[training_columns]) + predicted_df = df.assign(target=prediction) + except TypeError: + df = df.set_index(df.columns[0], drop=False) + prediction = model.predict(df[training_columns]) + # Convert numpy.ndarray to Dask Series + prediction = dd.from_pandas( + pd.Series(prediction, index=df.index), + npartitions=df.npartitions, + ) + predicted_df = df.assign(target=prediction) + # Need to drop first column to reset index + # because the first column is equal to the index + predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index() # Create a temporary context, which includes the # new "table" so that we can use the normal diff --git a/docker/conda.txt b/docker/conda.txt index 32a08c7a9..3d57e18dc 100644 --- a/docker/conda.txt +++ b/docker/conda.txt @@ -16,7 +16,6 @@ uvicorn>=0.13.4 pyarrow>=6.0.1 prompt_toolkit>=3.0.8 pygments>=2.7.1 -dask-ml>=2022.1.22 scikit-learn>=1.0.0 intake>=0.6.0 pre-commit>=2.11.1 diff --git a/docker/main.dockerfile b/docker/main.dockerfile index 8b3c7f07b..8c908ce19 100644 --- a/docker/main.dockerfile +++ b/docker/main.dockerfile @@ -27,7 +27,6 @@ RUN mamba install -y \ nest-asyncio \ # additional dependencies "pyarrow>=6.0.1" \ - "dask-ml>=2022.1.22" \ "scikit-learn>=1.0.0" \ "intake>=0.6.0" \ && conda clean -ay diff --git a/docs/source/machine_learning.rst b/docs/source/machine_learning.rst index fac0daacb..3dd301863 100644 --- a/docs/source/machine_learning.rst +++ b/docs/source/machine_learning.rst @@ -125,8 +125,7 @@ following sql statements Want to increase the performance of your model by tuning the parameters? Use the hyperparameter tuning directly in SQL using below SQL syntax, choose different tuners -from the dask_ml package based on memory and compute constraints and -for more details refer to the `dask ml documentation `_ +based on memory and compute constraints. .. TODO - add a GPU section to these examples once we have working CREATE EXPERIMENT tests for GPU @@ -135,7 +134,7 @@ for more details refer to the `dask ml documentation `_ +To prevent this, have a look into the `dask_sql.physical.rel.custom.wrappers.ParallelPostFit` meta-estimator. If you are using a model trained with ``CREATE MODEL`` and the ``wrap_predict`` flag set to true, this is done automatically. diff --git a/notebooks/Feature Overview.ipynb b/notebooks/Feature Overview.ipynb index 28538ab64..ac23a9777 100644 --- a/notebooks/Feature Overview.ipynb +++ b/notebooks/Feature Overview.ipynb @@ -590,7 +590,7 @@ "metadata": {}, "source": [ "- Tune single model with different Hyperparameters \n", - " - install **dask_ml** for tunning\n", + " - install **sklearn** for tuning\n", "- Tune multiple model with different Hyperparameters\n", " - install **tpot** for Automl" ] @@ -604,7 +604,7 @@ "%%sql\n", "CREATE EXPERIMENT my_exp WITH (\n", " model_class = 'sklearn.ensemble.GradientBoostingClassifier',\n", - " experiment_class = 'dask_ml.model_selection.GridSearchCV',\n", + " experiment_class = 'sklearn.model_selection.GridSearchCV',\n", " tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],\n", " max_depth = ARRAY [3,4,5,10]),\n", " target_column = 'species'\n", diff --git a/setup.py b/setup.py index c982e40a0..0f8520de9 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,6 @@ "mock>=4.0.3", "sphinx>=3.2.1", "pyarrow>=6.0.1", - "dask-ml>=2022.1.22", "scikit-learn>=1.0.0", "intake>=0.6.0", "pre-commit", diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index ad48e5b44..d1d89248f 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -18,8 +18,6 @@ xgboost = None dask_cudf = None -pytest.importorskip("dask_ml") - def check_trained_model(c, model_name=None): if model_name is None: @@ -157,7 +155,24 @@ def test_clustering_and_prediction(c, training_df): c.sql( """ CREATE MODEL my_model WITH ( - model_class = 'dask_ml.cluster.KMeans' + model_class = 'sklearn.cluster.KMeans' + ) AS ( + SELECT x, y + FROM timeseries + LIMIT 100 + ) + """ + ) + + check_trained_model(c) + + +@pytest.mark.gpu +def test_gpu_clustering_and_prediction(c, gpu_training_df, gpu_client): + c.sql( + """ + CREATE MODEL my_model WITH ( + model_class = 'cuml.dask.cluster.KMeans' ) AS ( SELECT x, y FROM timeseries @@ -244,7 +259,7 @@ def test_show_models(c, training_df): c.sql( """ CREATE MODEL my_model2 WITH ( - model_class = 'dask_ml.cluster.KMeans' + model_class = 'sklearn.cluster.KMeans' ) AS ( SELECT x, y FROM timeseries @@ -691,7 +706,7 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -731,7 +746,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( model_class = 'that.is.not.a.python.class', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -794,7 +809,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -816,7 +831,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -831,7 +846,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -847,7 +862,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE OR REPLACE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -867,8 +882,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp1 WITH ( - model_class = 'dask_ml.cluster.KMeans', - experiment_class = 'dask_ml.model_selection.RandomizedSearchCV', + model_class = 'sklearn.cluster.KMeans', + experiment_class = 'sklearn.model_selection.RandomizedSearchCV', tune_parameters = (n_clusters = ARRAY [3,4,16],tol = ARRAY [0.1,0.01,0.001], max_iter = ARRAY [3,4,5,10]) ) AS ( @@ -889,7 +904,7 @@ def test_experiment_automl_classifier(c, client, training_df): """ CREATE EXPERIMENT my_automl_exp1 WITH ( automl_class = 'tpot.TPOTClassifier', - automl_kwargs = (population_size = 2 ,generations=2,cv=2,n_jobs=-1,use_dask=True), + automl_kwargs = (population_size=2, generations=2, cv=2, n_jobs=-1), target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -914,11 +929,10 @@ def test_experiment_automl_regressor(c, client, training_df): """ CREATE EXPERIMENT my_automl_exp2 WITH ( automl_class = 'tpot.TPOTRegressor', - automl_kwargs = (population_size = 2, + automl_kwargs = (population_size=2, generations=2, cv=2, n_jobs=-1, - use_dask=True, max_eval_time_mins=1), target_column = 'target'