From 85f2d7aca4916cc645f699787678c0fcbb0c1eca Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 21 Sep 2022 10:11:26 -0700 Subject: [PATCH 1/2] initial changes --- dask_sql/physical/rel/custom/create_model.py | 9 ++++- dask_sql/physical/rel/custom/predict.py | 38 +++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..8b14f51fc 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,4 +1,5 @@ import logging +import numpy as np from typing import TYPE_CHECKING from dask import delayed @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..ebd4bfeb5 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -1,4 +1,5 @@ import logging +import numpy as np import uuid from typing import TYPE_CHECKING @@ -59,7 +60,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - prediction = model.predict(df[training_columns]) + part = df[training_columns] + output_meta = model.predict_meta + if part.shape[0].compute() == 0 and output_meta is not None: + empty_output = self.handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output + prediction = model.predict(part) predicted_df = df.assign(target=prediction) # Create a temporary context, which includes the @@ -79,3 +86,32 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai dc = DataContainer(predicted_df, cc) return dc + + def handle_empty_partitions(self, output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :] From c56a1555032d96e15f35d91ed8e77a67953edef9 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 21 Sep 2022 11:08:41 -0700 Subject: [PATCH 2/2] fix failures --- dask_sql/physical/rel/custom/create_model.py | 2 +- dask_sql/physical/rel/custom/predict.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 8b14f51fc..d75c56ba9 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,7 +1,7 @@ import logging -import numpy as np from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index ebd4bfeb5..9e03902a8 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -1,8 +1,9 @@ import logging -import numpy as np import uuid from typing import TYPE_CHECKING +import numpy as np + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -61,7 +62,10 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) part = df[training_columns] - output_meta = model.predict_meta + try: + output_meta = model.predict_meta + except AttributeError: + output_meta = None if part.shape[0].compute() == 0 and output_meta is not None: empty_output = self.handle_empty_partitions(output_meta) if empty_output is not None: @@ -87,7 +91,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai return dc - def handle_empty_partitions(self, output_meta): + def handle_empty_partitions(self, output_meta): if hasattr(output_meta, "__array_function__"): if len(output_meta.shape) == 1: shape = 0