Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all Dask-ML uses #886

Merged
merged 13 commits into from
Nov 29, 2022
Merged
9 changes: 3 additions & 6 deletions .github/workflows/test-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ jobs:
mamba install -c conda-forge "sasl>=0.3.1"
docker pull bde2020/hive:2.3.2-postgresql-metastore
docker pull bde2020/hive-metastore-postgresql:2.3.0
- name: Install upstream dev Dask / dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Test with pytest
run: |
pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile
Expand Down Expand Up @@ -112,11 +111,10 @@ jobs:
which python
pip list
mamba list
- name: Install upstream dev dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: run a dask cluster
run: |
if [[ $which_upstream == "Dask" ]]; then
Expand Down Expand Up @@ -161,12 +159,11 @@ jobs:
which python
pip list
mamba list
- name: Install upstream dev Dask / dask-ml
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
python -m pip install --no-deps git+https://github.com/dask/dask
python -m pip install --no-deps git+https://github.com/dask/distributed
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Try to import dask-sql
run: |
python -c "import dask_sql; print('ok')"
Expand Down
9 changes: 3 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ jobs:
mamba install -c conda-forge "sasl>=0.3.1"
docker pull bde2020/hive:2.3.2-postgresql-metastore
docker pull bde2020/hive-metastore-postgresql:2.3.0
- name: Optionally install upstream dev Dask / dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Test with pytest
run: |
pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile
Expand Down Expand Up @@ -108,11 +107,10 @@ jobs:
which python
pip list
mamba list
- name: Optionally install upstream dev dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
mamba update dask
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: run a dask cluster
env:
UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }}
Expand Down Expand Up @@ -153,12 +151,11 @@ jobs:
which python
pip list
mamba list
- name: Optionally install upstream dev Dask / dask-ml
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
python -m pip install --no-deps git+https://github.com/dask/dask
python -m pip install --no-deps git+https://github.com/dask/distributed
python -m pip install --no-deps git+https://github.com/dask/dask-ml
- name: Try to import dask-sql
run: |
python -c "import dask_sql; print('ok')"
1 change: 0 additions & 1 deletion continuous_integration/environment-3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-3.8-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml=2022.1.22
- dask=2022.3.0
- fastapi=0.69.0
- fugue=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/gpuci/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ channels:
- conda-forge
- nodefaults
dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
Expand Down
23 changes: 19 additions & 4 deletions dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import uuid
from typing import TYPE_CHECKING

import dask.dataframe as dd
import pandas as pd

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -30,8 +33,7 @@ class PredictModelPlugin(BaseRelPlugin):
Please note however, that it will need to act on Dask dataframes. If you
are using a model not optimized for this, it might be that you run out of memory if
your data is larger than the RAM of a single machine.
To prevent this, have a look into the dask-ml package,
especially the [ParallelPostFit](https://ml.dask.org/meta-estimators.html)
To prevent this, have a look into the dask_sql.physical.rel.custom.wrappers.ParallelPostFit
meta-estimator. If you are using a model trained with `CREATE MODEL`
and the `wrap_predict` flag, this is done automatically.
Expand Down Expand Up @@ -59,8 +61,21 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
predicted_df = df.assign(target=prediction)
try:
prediction = model.predict(df[training_columns])
predicted_df = df.assign(target=prediction)
except TypeError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios do we hit this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to allow test_clustering_and_prediction to pass. It would error with a TypeError: Column assignment doesn't support type numpy.ndarray because sklearn returns the list of clusters as a Numpy array (like array([1, 3, 4, 7, 7, 2, 0, 7, 5, 1, 2, 0, 1, 6, 0, 3, 6, 6, 4, 7, 0, 3, 2, 0, 3, 4, 5, 4, 1, 0], dtype=int32)), but Dask does not support column assignment with this datatype. So we have to convert it to a Dask Series before assignment.

I think it should be OK to use Pandas in the except block because this should only happen in the CPU case with sklearn. As an example, test_gpu_clustering_and_prediction uses cuml.dask and doesn't need to go into the except block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. So this is specifically for the case where predict returns a numpy array instead of a Dask Array.

df = df.set_index(df.columns[0], drop=False)
prediction = model.predict(df[training_columns])
# Convert numpy.ndarray to Dask Series
prediction = dd.from_pandas(
pd.Series(prediction, index=df.index),
npartitions=df.npartitions,
)
predicted_df = df.assign(target=prediction)
# Need to drop first column to reset index
# because the first column is equal to the index
predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index()

# Create a temporary context, which includes the
# new "table" so that we can use the normal
Expand Down
1 change: 0 additions & 1 deletion docker/conda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ uvicorn>=0.13.4
pyarrow>=6.0.1
prompt_toolkit>=3.0.8
pygments>=2.7.1
dask-ml>=2022.1.22
scikit-learn>=1.0.0
intake>=0.6.0
pre-commit>=2.11.1
Expand Down
1 change: 0 additions & 1 deletion docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ RUN mamba install --freeze-installed -y \
nest-asyncio \
# additional dependencies
"pyarrow>=6.0.1" \
"dask-ml>=2022.1.22" \
"scikit-learn>=1.0.0" \
"intake>=0.6.0" \
&& conda clean -ay
Expand Down
8 changes: 3 additions & 5 deletions docs/source/machine_learning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ following sql statements
Want to increase the performance of your model by tuning the
parameters? Use the hyperparameter tuning directly
in SQL using below SQL syntax, choose different tuners
from the dask_ml package based on memory and compute constraints and
for more details refer to the `dask ml documentation <https://ml.dask.org/hyper-parameter-search.html#incremental-hyperparameter-optimization>`_
based on memory and compute constraints.

..
TODO - add a GPU section to these examples once we have working CREATE EXPERIMENT tests for GPU
Expand All @@ -135,7 +134,7 @@ for more details refer to the `dask ml documentation <https://ml.dask.org/hyper-
CREATE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
Expand Down Expand Up @@ -258,7 +257,6 @@ and the boolean target ``label``.
SELECT * FROM training_data
-- We can now train a model from the sklearn package.
-- Make sure to install it together with dask-ml with conda or pip.
CREATE OR REPLACE MODEL my_model WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
wrap_predict = True,
Expand All @@ -282,7 +280,7 @@ and the boolean target ``label``.
-- experiment to tune different hyperparameters
CREATE EXPERIMENT my_exp WITH(
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'dask_ml.model_selection.GridSearchCV',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
Expand Down
3 changes: 1 addition & 2 deletions docs/source/sql/ml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ A model can be anything which has a ``predict`` function.
Please note however, that it will need to act on Dask dataframes. If you
are using a model not optimized for this, it might be that you run out of memory if
your data is larger than the RAM of a single machine.
To prevent this, have a look into the dask-ml package,
especially the `ParallelPostFit <https://ml.dask.org/meta-estimators.html>`_
To prevent this, have a look into the `dask_sql.physical.rel.custom.wrappers.ParallelPostFit`
meta-estimator. If you are using a model trained with ``CREATE MODEL``
and the ``wrap_predict`` flag set to true, this is done automatically.

Expand Down
4 changes: 2 additions & 2 deletions notebooks/Feature Overview.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@
"metadata": {},
"source": [
"- Tune single model with different Hyperparameters \n",
" - install **dask_ml** for tunning\n",
" - install **sklearn** for tuning\n",
"- Tune multiple model with different Hyperparameters\n",
" - install **tpot** for Automl"
]
Expand All @@ -604,7 +604,7 @@
"%%sql\n",
"CREATE EXPERIMENT my_exp WITH (\n",
" model_class = 'sklearn.ensemble.GradientBoostingClassifier',\n",
" experiment_class = 'dask_ml.model_selection.GridSearchCV',\n",
" experiment_class = 'sklearn.model_selection.GridSearchCV',\n",
" tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],\n",
" max_depth = ARRAY [3,4,5,10]),\n",
" target_column = 'species'\n",
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
"mock>=4.0.3",
"sphinx>=3.2.1",
"pyarrow>=6.0.1",
"dask-ml>=2022.1.22",
"scikit-learn>=1.0.0",
"intake>=0.6.0",
"pre-commit",
Expand Down
Loading