diff --git a/src/sensai/catboost.py b/src/sensai/catboost.py index f15461581..f3e1e6bc5 100644 --- a/src/sensai/catboost.py +++ b/src/sensai/catboost.py @@ -44,6 +44,9 @@ def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): self.log.info(f"Updating model parameters with {args}") self.modelArgs.update(args) + def is_sample_weight_supported(self) -> bool: + return True + # noinspection DuplicatedCode class CatBoostVectorClassificationModel(AbstractSkLearnVectorClassificationModel): @@ -79,3 +82,6 @@ def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): args = {"cat_features": col_indices} self.log.info(f"Updating model parameters with {args}") self.modelArgs.update(args) + + def is_sample_weight_supported(self) -> bool: + return True diff --git a/src/sensai/data.py b/src/sensai/data.py index 25f11d80e..af5996288 100644 --- a/src/sensai/data.py +++ b/src/sensai/data.py @@ -1,13 +1,15 @@ import logging +import math import random from abc import ABC, abstractmethod -from typing import Tuple, Sequence, TypeVar, Generic +from typing import Tuple, Sequence, TypeVar, Generic, Optional, Union import numpy as np import pandas as pd import scipy.stats from sklearn.model_selection import StratifiedShuffleSplit +from .util.pickle import setstate from .util.string import ToStringMixin log = logging.getLogger(__name__) @@ -57,8 +59,14 @@ class InputOutputData(BaseInputOutputData[pd.DataFrame], ToStringMixin): """ Holds input and output data for learning problems """ - def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame): + def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[Union[pd.Series, "DataPointWeighting"]] = None): super().__init__(inputs, outputs) + if isinstance(weights, DataPointWeighting): + weights = weights.compute_weights(inputs, outputs) + self.weights = weights + + def __setstate__(self, state): + setstate(InputOutputData, self, state, new_optional_properties=["weights"]) def _tostring_object_info(self) -> str: return f"N={len(self.inputs)}, numInputColumns={len(self.inputs.columns)}, numOutputColumns={len(self.outputs.columns)}" @@ -74,15 +82,32 @@ def from_data_frame(cls, df: pd.DataFrame, *output_columns: str) -> "InputOutput outputs = df[list(output_columns)] return cls(inputs, outputs) + def to_data_frame(self, add_weights: bool = False, weights_col_name: str = "weights") -> pd.DataFrame: + """ + :param add_weights: whether to add the weights as a column (provided that weights are present) + :param weights_col_name: the column name to use for weights if `add_weights` is True + :return: a data frame containing both the inputs and outputs (and optionally the weights) + """ + df = pd.concat([self.inputs, self.outputs], axis=1) + if add_weights and self.weights is not None: + df[weights_col_name] = self.weights + return df + def filter_indices(self, indices: Sequence[int]) -> __qualname__: inputs = self.inputs.iloc[indices] outputs = self.outputs.iloc[indices] - return InputOutputData(inputs, outputs) + weights = None + if self.weights is not None: + weights = self.weights.iloc[indices] + return InputOutputData(inputs, outputs, weights) def filter_index(self, index_elements: Sequence[any]) -> __qualname__: inputs = self.inputs.loc[index_elements] outputs = self.outputs.loc[index_elements] - return InputOutputData(inputs, outputs) + weights = None + if self.weights is not None: + weights = self.weights + return InputOutputData(inputs, outputs, weights) @property def input_dim(self): @@ -103,6 +128,9 @@ def compute_input_output_correlation(self): correlations[outputCol][inputCol] = pcc return correlations + def apply_weighting(self, weighting: "DataPointWeighting"): + self.weights = weighting.compute_weights(self.inputs, self.outputs) + TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData) @@ -264,4 +292,78 @@ def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: first_set_indices.append(i) else: second_set_indices.append(i) - return first_set_indices, second_set_indices \ No newline at end of file + return first_set_indices, second_set_indices + + +class DataPointWeighting(ABC): + @abstractmethod + def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series: + pass + + +class DataPointWeightingRegressionTargetIntervalTotalWeight(DataPointWeighting): + """ + Based on relative weights specified for intervals of the regression target, + will weight individual data point weights such that the sum of weights of data points within each interval + satisfies the user-specified relative weight, while ensuring that the total weight of all data points + is still equal to the number of data points. + + For example, if one specifies `interval_weights` as [(0.5, 1), (inf, 2)], then the data points with target values + up to 0.5 will get 1/3 of the weight and the remaining data points will get 2/3 of the weight. + So if there are 100 data points and 50 of them are in the first interval (up to 0.5), then these 50 data points + will each get weight 1/3*100/50=2/3 and the remaining 50 data points will each get weight 2/3*100/50=4/3. + The sum of all weights is the number of data points, i.e. 100. + + Example: + + >>> targets = [0.1, 0.2, 0.5, 0.7, 0.8, 0.6] + >>> x = pd.DataFrame({"foo": np.zeros(len(targets))}) + >>> y = pd.DataFrame({"target": targets}) + >>> weighting = DataPointWeightingRegressionTargetIntervalTotalWeight([(0.5, 1), (1.0, 2)]) + >>> weights = weighting.compute_weights(x, y) + >>> assert(np.isclose(weights.sum(), len(y))) + >>> weights.tolist() + [0.6666666666666666, + 0.6666666666666666, + 0.6666666666666666, + 1.3333333333333333, + 1.3333333333333333, + 1.3333333333333333] + """ + def __init__(self, intervals_weights: Sequence[Tuple[float, float]]): + """ + :param intervals_weights: a sequence of tuples (upper_bound, rel_total_weight) where upper_bound is the upper bound + of the interval, `(lower_bound, upper_bound]`; `lower_bound` is the upper bound of the preceding interval + or -inf for the first interval. `rel_total_weight` specifies the relative weight of all data points within + the interval. + """ + a = -math.inf + sum_rel_weights = sum(t[1] for t in intervals_weights) + self.intervals = [] + for b, rel_weight in intervals_weights: + self.intervals.append(self.Interval(a, b, rel_weight / sum_rel_weights)) + a = b + + class Interval: + def __init__(self, a: float, b: float, weight_fraction: float): + self.a = a + self.b = b + self.weight_fraction = weight_fraction + + def contains(self, x: float): + return self.a < x <= self.b + + def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series: + assert len(y.columns) == 1, f"Only a single regression target is supported {self.__class__.__name__}" + targets = y.iloc[:, 0] + n = len(x) + weights = np.zeros(n) + num_weighted = 0 + for interval in self.intervals: + mask = np.array([interval.contains(x) for x in targets]) + subset_size = mask.sum() + num_weighted += subset_size + weights[mask] = interval.weight_fraction * n / subset_size + if num_weighted != n: + raise Exception("Not all data points were weighted. Most likely, the intervals do not cover the entire range of targets") + return pd.Series(weights, index=x.index) diff --git a/src/sensai/ensemble/ensemble_base.py b/src/sensai/ensemble/ensemble_base.py index 6eb8d2994..2df84b38a 100644 --- a/src/sensai/ensemble/ensemble_base.py +++ b/src/sensai/ensemble/ensemble_base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from concurrent.futures.process import ProcessPoolExecutor -from typing import Sequence, List +from typing import Sequence, List, Optional from inspect import currentframe, getframeinfo import pandas as pd @@ -20,7 +20,9 @@ def __init__(self, models: Sequence[VectorModel], num_processes=1): self.models = list(models) super().__init__(check_input_columns=False) - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) + if self.num_processes == 1 or len(self.models) == 1: for model in self.models: model.fit(x, y) diff --git a/src/sensai/evaluation/eval_stats/eval_stats_base.py b/src/sensai/evaluation/eval_stats/eval_stats_base.py index bfc5a3a50..520857be0 100644 --- a/src/sensai/evaluation/eval_stats/eval_stats_base.py +++ b/src/sensai/evaluation/eval_stats/eval_stats_base.py @@ -5,6 +5,7 @@ import pandas as pd from matplotlib import pyplot as plt +from ...util.pickle import setstate from ...util.plot import ScatterPlot, HistogramPlot, Plot, HeatMapPlot from ...util.string import ToStringMixin, dict_string from ...vector_model import VectorModel @@ -18,6 +19,7 @@ TMetric = TypeVar("TMetric", bound="Metric") TVectorModel = TypeVar("TVectorModel", bound=VectorModel) +Array = Union[np.ndarray, pd.Series, list] PredictionArray = Union[np.ndarray, pd.Series, pd.DataFrame, list] @@ -242,23 +244,29 @@ class PredictionEvalStats(EvalStats[TMetric], ABC): and computes corresponding metrics """ def __init__(self, y_predicted: Optional[PredictionArray], y_true: Optional[PredictionArray], - metrics: List[TMetric], additional_metrics: List[TMetric] = None): + metrics: List[TMetric], additional_metrics: List[TMetric] = None, + weights: Optional[Array] = None): """ :param y_predicted: sequence of predicted values, or, in case of multi-dimensional predictions, either a data frame with one column per dimension or a nested sequence of values :param y_true: sequence of ground truth labels of same shape as y_predicted :param metrics: list of metrics to be computed on the provided data :param additional_metrics: the metrics to additionally compute. This should only be provided if metrics is None + :param weights: weights for each data point contained in `y_predicted` and `y_true` """ self.y_true = [] self.y_predicted = [] + self.weights: Optional[List[float]] = None self.y_true_multidim = None self.y_predicted_multidim = None if y_predicted is not None: - self.add_all(y_predicted, y_true) + self.add_all(y_predicted=y_predicted, y_true=y_true, weights=weights) super().__init__(metrics, additional_metrics=additional_metrics) - def add(self, y_predicted, y_true): + def __setstate__(self, state): + return setstate(PredictionEvalStats, self, state, new_optional_properties=["weights"]) + + def add(self, y_predicted, y_true, weight: Optional[float] = None): """ Adds a single pair of values to the evaluation Parameters: @@ -267,12 +275,17 @@ def add(self, y_predicted, y_true): """ self.y_true.append(y_true) self.y_predicted.append(y_predicted) + if weight is not None: + if self.weights is None: + self.weights = [] + self.weights.append(weight) - def add_all(self, y_predicted: PredictionArray, y_true: PredictionArray): + def add_all(self, y_predicted: PredictionArray, y_true: PredictionArray, weights: Optional[Array] = None): """ :param y_predicted: sequence of predicted values, or, in case of multi-dimensional predictions, either a data frame with one column per dimension or a nested sequence of values :param y_true: sequence of ground truth labels of same shape as y_predicted + :param weights: optional weights of data points """ def is_sequence(x): return isinstance(x, pd.Series) or isinstance(x, list) or isinstance(x, np.ndarray) @@ -313,6 +326,12 @@ def is_sequence(x): else: raise Exception(f"Unhandled data types: {type(y_predicted)}, {type(y_true)}") + if weights is not None: + if self.weights is None: + self.weights = [] + assert len(weights) == len(self.y_predicted) - len(self.weights), "Length of weights does not match" + self.weights.extend(weights) + def _tostring_object_info(self) -> str: return f"{super()._tostring_object_info()}, N={len(self.y_predicted)}" @@ -336,3 +355,6 @@ def create_figure(self, eval_stats: TEvalStats, subtitle: str) -> Optional[plt.F :return: the figure or None if this plot is not applicable/cannot be created """ pass + + def is_applicable(self, eval_stats: TEvalStats) -> bool: + return True diff --git a/src/sensai/evaluation/eval_stats/eval_stats_regression.py b/src/sensai/evaluation/eval_stats/eval_stats_regression.py index 29b2e26ec..73c5d0948 100644 --- a/src/sensai/evaluation/eval_stats/eval_stats_regression.py +++ b/src/sensai/evaluation/eval_stats/eval_stats_regression.py @@ -5,23 +5,30 @@ import numpy as np from matplotlib import pyplot as plt from matplotlib.colors import LinearSegmentedColormap +from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score -from .eval_stats_base import PredictionEvalStats, Metric, EvalStatsCollection, PredictionArray, EvalStatsPlot -from ...vector_model import VectorRegressionModel, InputOutputData +from . import BinaryClassificationMetric +from .eval_stats_base import PredictionEvalStats, Metric, EvalStatsCollection, PredictionArray, EvalStatsPlot, Array +from ...util import kwarg_if_not_none from ...util.plot import HistogramPlot +from ...vector_model import VectorRegressionModel, InputOutputData log = logging.getLogger(__name__) class RegressionMetric(Metric["RegressionEvalStats"], ABC): - def compute_value_for_eval_stats(self, eval_stats: "RegressionEvalStats", model: VectorRegressionModel = None, - io_data: InputOutputData = None): - return self.compute_value(np.array(eval_stats.y_true), np.array(eval_stats.y_predicted), model=model, io_data=io_data) + def compute_value_for_eval_stats(self, eval_stats: "RegressionEvalStats"): + weights = np.array(eval_stats.weights) if eval_stats.weights is not None else None + return self.compute_value(np.array(eval_stats.y_true), np.array(eval_stats.y_predicted), + model=eval_stats.model, + io_data=eval_stats.ioData, + **kwarg_if_not_none("weights", weights)) - @classmethod @abstractmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, + model: VectorRegressionModel = None, + io_data: InputOutputData = None, + weights: Optional[np.ndarray] = None): pass @classmethod @@ -36,80 +43,152 @@ def compute_abs_errors(cls, y_true: np.ndarray, y_predicted: np.ndarray): class RegressionMetricMAE(RegressionMetric): name = "MAE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - return np.mean(cls.compute_abs_errors(y_true, y_predicted)) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return mean_absolute_error(y_true, y_predicted, sample_weight=weights) class RegressionMetricMSE(RegressionMetric): name = "MSE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - residuals = y_predicted - y_true - return np.sum(residuals * residuals) / len(residuals) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return mean_squared_error(y_true, y_predicted, sample_weight=weights) class RegressionMetricRMSE(RegressionMetric): name = "RMSE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - errors = cls.compute_errors(y_true, y_predicted) - return np.sqrt(np.mean(errors * errors)) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return np.sqrt(mean_squared_error(y_true, y_predicted, sample_weight=weights)) class RegressionMetricRRSE(RegressionMetric): name = "RRSE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - mean_y = np.mean(y_true) - residuals = y_predicted - y_true - mean_deviation = y_true - mean_y - return np.sqrt(np.sum(residuals * residuals) / np.sum(mean_deviation * mean_deviation)) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + r2 = r2_score(y_true, y_predicted, sample_weight=weights) + return np.sqrt(1 - r2) class RegressionMetricR2(RegressionMetric): name = "R2" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - rrse = RegressionMetricRRSE.compute_value(y_true, y_predicted) - return 1.0 - rrse*rrse + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return r2_score(y_true, y_predicted, sample_weight=weights) class RegressionMetricPCC(RegressionMetric): + """ + Pearson's correlation coefficient, aka Pearson's R. + This metric does not consider sample weights. + """ name = "PCC" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): cov = np.cov([y_true, y_predicted]) return cov[0][1] / np.sqrt(cov[0][0] * cov[1][1]) class RegressionMetricStdDevAE(RegressionMetric): + """ + The standard deviation of the absolute error. + This metric does not consider sample weights. + """ + name = "StdDevAE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - return np.std(cls.compute_abs_errors(y_true, y_predicted)) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return np.std(self.compute_abs_errors(y_true, y_predicted)) class RegressionMetricMedianAE(RegressionMetric): + """ + The median absolute error. + This metric does not consider sample weights. + """ name = "MedianAE" - @classmethod - def compute_value(cls, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, - io_data: InputOutputData = None): - return np.median(cls.compute_abs_errors(y_true, y_predicted)) + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + return np.median(self.compute_abs_errors(y_true, y_predicted)) + + +class RegressionMetricFromBinaryClassificationMetric(RegressionMetric): + """ + Supports the computation of binary classification metrics by converting predicted/target values to class labels. + This metric does not consider sample weights. + """ + + class ClassGenerator(ABC): + @abstractmethod + def compute_class(self, predicted_value: float) -> bool: + """ + Computes the class from the given value + + :param predicted_value: the value predicted by the regressor or regressor target value + :return: the class + """ + pass + + @abstractmethod + def get_metric_qualifier(self) -> str: + """ + :return: A (short) string which will be added to the original classification metric's name to + represent the class conversion logic + """ + pass + + class ClassGeneratorPositiveBeyond(ClassGenerator): + def __init__(self, min_value_for_positive: float): + self.min_value_for_positive = min_value_for_positive + + def compute_class(self, predicted_value: float) -> bool: + return predicted_value >= self.min_value_for_positive + + def get_metric_qualifier(self) -> str: + return f">={self.min_value_for_positive}" + + def __init__(self, classification_metric: BinaryClassificationMetric, + class_generator: ClassGenerator): + """ + :param classification_metric: the classification metric (which shall consider `True` as the positive label) + :param class_generator: the class generator, which generates `True` and `False` labels from regression values + """ + super().__init__(name=classification_metric.name + f"[{class_generator.get_metric_qualifier()}]", + bounds=classification_metric.bounds) + self.classification_metric = classification_metric + self.class_generator = class_generator + + def _apply_class_generator(self, y: np.ndarray) -> np.ndarray: + return np.array([self.class_generator.compute_class(v) for v in y]) + + def compute_value(self, y_true: np.ndarray, y_predicted: np.ndarray, model: VectorRegressionModel = None, + io_data: InputOutputData = None, weights: Optional[np.ndarray] = None): + y_true = self._apply_class_generator(y_true) + y_predicted = self._apply_class_generator(y_predicted) + return self.classification_metric.compute_value(y_true=y_true, y_predicted=y_predicted) + + +class HeatMapColorMapFactory(ABC): + @abstractmethod + def create_color_map(self, min_sample_weight: float, total_weight: float, num_quantization_levels: int): + pass + + +class HeatMapColorMapFactoryWhiteToRed(HeatMapColorMapFactory): + def create_color_map(self, min_sample_weight: float, total_weight: float, num_quantization_levels: int): + color_nothing = (1, 1, 1) # white + color_min_sample = (1, 0.96, 0.96) # very slightly red + color_everything = (0.7, 0, 0) # dark red + return LinearSegmentedColormap.from_list("whiteToRed", + ((0, color_nothing), (min_sample_weight/total_weight, color_min_sample), (1, color_everything)), + num_quantization_levels) DEFAULT_REGRESSION_METRICS = (RegressionMetricRRSE(), RegressionMetricR2(), RegressionMetricMAE(), @@ -122,8 +201,7 @@ class RegressionEvalStats(PredictionEvalStats["RegressionMetric"]): """ # class members controlling plot appearance, which can be centrally overridden by a user if necessary - HEATMAP_COLORMAP_FACTORY = lambda self: LinearSegmentedColormap.from_list("whiteToRed", - ((0, (1, 1, 1)), (1/len(self.y_predicted), (1, 0.96, 0.96)), (1, (0.7, 0, 0))), len(self.y_predicted)) + HEATMAP_COLORMAP_FACTORY = HeatMapColorMapFactoryWhiteToRed() HEATMAP_DIAGONAL_COLOR = "green" HEATMAP_ERROR_BOUNDARY_VALUE = None HEATMAP_ERROR_BOUNDARY_COLOR = (0.8, 0.8, 0.8) @@ -131,12 +209,15 @@ class RegressionEvalStats(PredictionEvalStats["RegressionMetric"]): def __init__(self, y_predicted: Optional[PredictionArray] = None, y_true: Optional[PredictionArray] = None, metrics: Optional[Sequence["RegressionMetric"]] = None, additional_metrics: Sequence["RegressionMetric"] = None, - model: VectorRegressionModel = None, io_data: InputOutputData = None): + model: VectorRegressionModel = None, + io_data: InputOutputData = None, + weights: Optional[Array] = None): """ :param y_predicted: the predicted values :param y_true: the true values :param metrics: the metrics to compute for evaluation; if None, will use DEFAULT_REGRESSION_METRICS :param additional_metrics: the metrics to additionally compute + :param weights: optional data point weights """ self.model = model self.ioData = io_data @@ -145,10 +226,10 @@ def __init__(self, y_predicted: Optional[PredictionArray] = None, y_true: Option metrics = DEFAULT_REGRESSION_METRICS metrics = list(metrics) - super().__init__(y_predicted, y_true, metrics, additional_metrics=additional_metrics) + super().__init__(y_predicted, y_true, metrics, additional_metrics=additional_metrics, weights=weights) def compute_metric_value(self, metric: RegressionMetric) -> float: - return metric.compute_value_for_eval_stats(self, model=self.model, io_data=self.ioData) + return metric.compute_value_for_eval_stats(self) def compute_mse(self): """Computes the mean squared error (MSE)""" @@ -234,9 +315,10 @@ def plot_scatter_ground_truth_predictions(self, figure=True, title_add=None, **k return fig def plot_heatmap_ground_truth_predictions(self, figure=True, cmap=None, bins=60, title_add=None, error_boundary: Optional[float] = None, + weighted: bool = False, ax: Optional[plt.Axes] = None, **kwargs) -> Optional[plt.Figure]: """ - :param figure: whether to plot in a separate figure and return that figure + :param figure: whether to create a new figure and return that figure (only applies if ax is None) :param cmap: the colour map to use (see corresponding parameter of plt.imshow for further information); if None, use factory defined in HEATMAP_COLORMAP_FACTORY (which can be centrally set to achieve custom behaviour throughout an application) :param bins: how many bins to use for constructing the heatmap @@ -244,20 +326,25 @@ def plot_heatmap_ground_truth_predictions(self, figure=True, cmap=None, bins=60, :param error_boundary: if not None, add two lines (above and below the diagonal) indicating this absolute regression error boundary; if None (default), use static member HEATMAP_ERROR_BOUNDARY_VALUE (which is also None by default, but can be centrally set to achieve custom behaviour throughout an application) + :param weighted: whether to consider data point weights + :param ax: the axis to plot in. If None, use the current axes (which will be the axis of the newly created figure if figure=True). :param kwargs: will be passed to plt.imshow() - :return: the resulting figure object or None + :return: the newly created figure object (if figure=True) or None """ fig = None title = "Heat Map of Predicted Values vs. Ground Truth" if title_add: title += "\n" + title_add - if figure: + if figure and ax is None: fig = plt.figure(title.replace("\n", " ")) + if ax is None: + ax = plt.gca() + y_range = [min(min(self.y_true), min(self.y_predicted)), max(max(self.y_true), max(self.y_predicted))] # diagonal - plt.plot(y_range, y_range, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_DIAGONAL_COLOR, zorder=2) + ax.plot(y_range, y_range, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_DIAGONAL_COLOR, zorder=2) # error boundaries if error_boundary is None: @@ -265,19 +352,27 @@ def plot_heatmap_ground_truth_predictions(self, figure=True, cmap=None, bins=60, if error_boundary is not None: d = np.array(y_range) offs = np.array([error_boundary, error_boundary]) - plt.plot(d, d + offs, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_ERROR_BOUNDARY_COLOR, zorder=2) - plt.plot(d, d - offs, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_ERROR_BOUNDARY_COLOR, zorder=2) + ax.plot(d, d + offs, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_ERROR_BOUNDARY_COLOR, zorder=2) + ax.plot(d, d - offs, '-', lw=0.75, label="_not in legend", color=self.HEATMAP_ERROR_BOUNDARY_COLOR, zorder=2) # heat map - heatmap, _, _ = np.histogram2d(self.y_true, self.y_predicted, range=[y_range, y_range], bins=bins, density=False) - extent = [y_range[0], y_range[1], y_range[0], y_range[1]] + weights = None if not weighted else self.weights + heatmap, _, _ = np.histogram2d(self.y_true, self.y_predicted, range=(y_range, y_range), bins=bins, density=False, weights=weights) + extent = (y_range[0], y_range[1], y_range[0], y_range[1]) if cmap is None: - cmap = self.HEATMAP_COLORMAP_FACTORY() - plt.imshow(heatmap.T, extent=extent, origin='lower', interpolation="none", cmap=cmap, zorder=1, **kwargs) - - plt.xlabel("ground truth") - plt.ylabel("prediction") - plt.title(title) + num_quantization_levels = min(1000, len(self.y_predicted)) + if not weighted: + min_sample_weight = 1.0 + total_weight = len(self.y_predicted) + else: + min_sample_weight = np.min(self.weights) + total_weight = np.sum(self.weights) + cmap = self.HEATMAP_COLORMAP_FACTORY.create_color_map(min_sample_weight, total_weight, num_quantization_levels) + ax.imshow(heatmap.T, extent=extent, origin='lower', interpolation="none", cmap=cmap, zorder=1, **kwargs) + + ax.set_xlabel("ground truth") + ax.set_ylabel("prediction") + ax.set_title(title) return fig @@ -305,8 +400,17 @@ def create_figure(self, eval_stats: RegressionEvalStats, subtitle: str) -> plt.F class RegressionEvalStatsPlotHeatmapGroundTruthPredictions(RegressionEvalStatsPlot): + def __init__(self, weighted: bool = False): + self.weighted = weighted + + def is_applicable(self, eval_stats: RegressionEvalStats) -> bool: + if self.weighted: + return eval_stats.weights is not None + else: + return True + def create_figure(self, eval_stats: RegressionEvalStats, subtitle: str) -> plt.Figure: - return eval_stats.plot_heatmap_ground_truth_predictions(title_add=subtitle) + return eval_stats.plot_heatmap_ground_truth_predictions(title_add=subtitle, weighted=self.weighted) class RegressionEvalStatsPlotScatterGroundTruthPredictions(RegressionEvalStatsPlot): diff --git a/src/sensai/evaluation/eval_util.py b/src/sensai/evaluation/eval_util.py index 8063d1653..93f9f9b43 100644 --- a/src/sensai/evaluation/eval_util.py +++ b/src/sensai/evaluation/eval_util.py @@ -185,7 +185,7 @@ def create_plots(self, eval_stats: EvalStats, subtitle: str, result_collector: E if len(unknown_disabled_plots) > 0: log.warning(f"Plots were disabled which are not registered: {unknown_disabled_plots}; known plots: {known_plots}") for name, plot in self.plots.items(): - if name not in self.disabled_plots: + if name not in self.disabled_plots and plot.is_applicable(eval_stats): fig = plot.create_figure(eval_stats, subtitle) if fig is not None: result_collector.add_figure(name, fig) @@ -195,7 +195,8 @@ class RegressionEvalStatsPlotCollector(EvalStatsPlotCollector[RegressionEvalStat def __init__(self): super().__init__() self.add_plot("error-dist", RegressionEvalStatsPlotErrorDistribution()) - self.add_plot("heatmap-gt-pred", RegressionEvalStatsPlotHeatmapGroundTruthPredictions()) + self.add_plot("heatmap-gt-pred", RegressionEvalStatsPlotHeatmapGroundTruthPredictions(weighted=False)) + self.add_plot("heatmap-gt-pred-weighted", RegressionEvalStatsPlotHeatmapGroundTruthPredictions(weighted=True)) self.add_plot("scatter-gt-pred", RegressionEvalStatsPlotScatterGroundTruthPredictions()) diff --git a/src/sensai/evaluation/evaluator.py b/src/sensai/evaluation/evaluator.py index 3989aabdf..04c1a5274 100644 --- a/src/sensai/evaluation/evaluator.py +++ b/src/sensai/evaluation/evaluator.py @@ -3,19 +3,21 @@ from abc import ABC, abstractmethod from typing import Tuple, Dict, Any, Generator, Generic, TypeVar, Sequence, Optional, List, Union, Callable +import numpy as np import pandas as pd from .eval_stats import GUESS from .eval_stats.eval_stats_base import EvalStats, EvalStatsCollection from .eval_stats.eval_stats_classification import ClassificationEvalStats, ClassificationMetric from .eval_stats.eval_stats_regression import RegressionEvalStats, RegressionEvalStatsCollection, RegressionMetric +from .result_set import RegressionResultSet from ..data import DataSplitter, DataSplitterFractional, InputOutputData from ..data_transformation import DataFrameTransformer from ..tracking import TrackingMixin, TrackedExperiment from ..util.deprecation import deprecated from ..util.string import ToStringMixin from ..util.typing import PandasNamedTuple -from ..vector_model import VectorClassificationModel, VectorModel, VectorModelBase, VectorModelFittableBase, VectorRegressionModel +from ..vector_model import VectorClassificationModel, VectorModel, VectorModelBase, VectorRegressionModel log = logging.getLogger(__name__) @@ -118,6 +120,60 @@ class VectorRegressionModelEvaluationData(VectorModelEvaluationData[RegressionEv def get_eval_stats_collection(self): return RegressionEvalStatsCollection(list(self.eval_stats_by_var_name.values())) + def to_data_frame(self, modify_input_df: bool = False, output_col_name_override: Optional[str] = None): + """ + Creates a data frame with all inputs, predictions and prediction errors. + For each predicted variable "y", there will be columns "y_predicted", "y_true", "y_error" and + "y_abs_error". + If there is only a single predicted variable, the variable can be renamed for convenience. + + The resulting data frame can be conveniently queried and analysed using class ResultSet. + + :param modify_input_df: whether to modify the input data frame in-place to generate the data frame + (instead of copying it). This can be reasonable in cases where the data is very large. + :param output_col_name_override: overrides the output column name. For example, if this is set to "y", + then the columns named in the description above will be present in the data frame. + :return: a data frame containing all inputs, outputs and prediction errors + """ + df = self.io_data.inputs + if not modify_input_df: + df = df.copy() + for predicted_var_name, eval_stats in self.eval_stats_by_var_name.items(): + y_predicted = np.array(eval_stats.y_predicted) + y_true = np.array(eval_stats.y_true) + if output_col_name_override is not None: + assert(len(self.eval_stats_by_var_name)) == 1, "Column name override is only valid for a single output variable" + predicted_var_name = output_col_name_override + df[RegressionResultSet.col_name_predicted(predicted_var_name)] = y_predicted + df[RegressionResultSet.col_name_ground_truth(predicted_var_name)] = y_true + error = y_predicted - y_true + df[RegressionResultSet.col_name_error(predicted_var_name)] = error + df[RegressionResultSet.col_name_abs_error(predicted_var_name)] = np.abs(error) + return df + + def create_result_set(self, modify_input_df: bool = False, output_col_name_override: Optional[str] = None, + regression_result_set_factory: Callable[[pd.DataFrame, List[str]], RegressionResultSet] = RegressionResultSet) \ + -> RegressionResultSet: + """ + Creates a queryable result set from the prediction results which can be used, in particular, for interactive analyses. + + The result set will contain a data frame, and for each predicted variable "y", + there will be columns "y_predicted", "y_true", "y_error" and "y_abs_error" in this data frame. + If there is only a single predicted variable, the variable can be renamed for convenience. + + The resulting data frame can be conveniently queried and analysed using class ResultSet. + + :param modify_input_df: whether to modify the input data frame in-place to generate the data frame + (instead of copying it). This can be reasonable in cases where the data is very large. + :param output_col_name_override: overrides the output column name. For example, if this is set to "y", + then the columns named in the description above will be present in the data frame. + :return: a data frame containing all inputs, outputs and prediction errors + + :return: the result set + """ + return RegressionResultSet.from_regression_eval_data(self, modify_input_df=modify_input_df, + output_col_name_override=output_col_name_override) + TEvalData = TypeVar("TEvalData", bound=VectorModelEvaluationData) @@ -195,7 +251,7 @@ def set_tracked_experiment(self, tracked_experiment: TrackedExperiment): """ super().set_tracked_experiment(tracked_experiment) - def eval_model(self, model: Union[VectorModelBase, VectorModelFittableBase], on_training_data=False, track=True, + def eval_model(self, model: Union[VectorModelBase, VectorModel], on_training_data=False, track=True, fit=False) -> TEvalData: """ Evaluates the given model @@ -241,11 +297,11 @@ def create_metrics_dict_provider(self, predicted_var_name: Optional[str]) -> Met """ return MetricsDictProviderFromFunction(functools.partial(self._compute_metrics_for_var_name, predictedVarName=predicted_var_name)) - def fit_model(self, model: VectorModelFittableBase): + def fit_model(self, model: VectorModel): """Fits the given model's parameters using this evaluator's training data""" if self.training_data is None: raise Exception(f"Cannot fit model with evaluator {self.__class__.__name__}: no training data provided") - model.fit(self.training_data.inputs, self.training_data.outputs) + model.fit_input_output_data(self.training_data) class RegressionEvaluatorParams(EvaluatorParams): @@ -327,6 +383,7 @@ def _eval_model(self, model: VectorRegressionModel, data: InputOutputData) -> Ve else: raise Exception(f"Model output column '{predictedVarName}' not found in ground truth columns {ground_truth.columns}") eval_stats = RegressionEvalStats(y_predicted=predictions[predictedVarName], y_true=y_true, + weights=data.weights, metrics=self.params.metrics, additional_metrics=self.params.additional_metrics, model=model, @@ -502,7 +559,7 @@ class RuleBasedVectorRegressionModelEvaluator(VectorRegressionModelEvaluator): def __init__(self, data: InputOutputData): super().__init__(data, test_data=data) - def eval_model(self, model: Union[VectorModelBase, VectorModelFittableBase], on_training_data=False, track=True, + def eval_model(self, model: Union[VectorModelBase, VectorModel], on_training_data=False, track=True, fit=False) -> VectorRegressionModelEvaluationData: """ Evaluate the rule based model. The training data and test data coincide, thus fitting the model diff --git a/src/sensai/evaluation/result_set.py b/src/sensai/evaluation/result_set.py new file mode 100644 index 000000000..076a53dcd --- /dev/null +++ b/src/sensai/evaluation/result_set.py @@ -0,0 +1,118 @@ +from typing import Optional, List, TYPE_CHECKING, Callable + +import pandas as pd + +from sensai.evaluation.eval_stats import RegressionEvalStats +from sensai.util.pandas import query_data_frame +from sensai.vector_model import get_predicted_var_name + +if TYPE_CHECKING: + from sensai.evaluation import VectorRegressionModelEvaluationData + + +class ResultSet: + """ + A result set which is designed for interactive result inspection (e.g. in an iPython notebook). + An instance can, for example, be created with a data frame as returned by VectorRegressionModelEvaluationData.to_data_frame + and subsequently be applied to interactively analyse the results. + + The class is designed to be subclassed, such that, in particular, method `_show_df` can be + overridden to display meaningful information (use case-specific) in the notebook environment. + """ + def __init__(self, df: pd.DataFrame): + self.df = df + + def _create_result_set(self, df: pd.DataFrame, parent: "ResultSet"): + """ + Creates a new result set for the given data frame + + :param df: the data frame + :return: the result set + """ + return ResultSet(df) + + def query(self, sql: str) -> "ResultSet": + """ + Queries the result set with the given condition specified in SQL syntax. + + NOTE: Requires duckdb to be installed. + + :param sql: an SQL query starting with the WHERE clause (excluding the 'where' keyword itself) + :return: the result set corresponding to the query + """ + result_df = query_data_frame(self.df, sql) + return self._create_result_set(result_df, self) + + def show(self, first: Optional[int] = None, sample: Optional[int] = None) -> None: + """ + Shows all or some of the result set's contents. + + :param first: if not None, show this many rows from the start of the result set + :param sample: if not None, sample this many rows from the result set to be shown + """ + df = self.df + if first is not None: + df = df.iloc[:first] + if sample is not None: + df = df.sample(sample) + self._show_df(df) + + def _show_df(self, df: pd.DataFrame): + print(df.to_string()) + + +class RegressionResultSet(ResultSet): + def __init__(self, df: pd.DataFrame, predicted_var_names: List[str]): + super().__init__(df) + self.predicted_var_names = predicted_var_names + + @classmethod + def from_regression_eval_data(cls, eval_data: "VectorRegressionModelEvaluationData", modify_input_df: bool = False, + output_col_name_override: Optional[str] = None, + regression_result_set_factory: Callable[[pd.DataFrame, List[str]], "RegressionResultSet"] = None) \ + -> "RegressionResultSet": + df = eval_data.to_data_frame(modify_input_df=modify_input_df, output_col_name_override=output_col_name_override) + if output_col_name_override: + predicted_var_names = [output_col_name_override] + else: + predicted_var_names = eval_data.predicted_var_names + + def default_factory(data_frame: pd.DataFrame, var_names: List[str]): + return cls(data_frame, var_names) + + if regression_result_set_factory is None: + regression_result_set_factory = default_factory + + return regression_result_set_factory(df, predicted_var_names) + + def _create_result_set(self, df: pd.DataFrame, parent: "RegressionResultSet"): + return self.__class__(df, parent.predicted_var_names) + + @staticmethod + def col_name_predicted(predicted_var_name: str): + return f"{predicted_var_name}_predicted" + + @staticmethod + def col_name_ground_truth(predicted_var_name: str): + return f"{predicted_var_name}_true" + + @staticmethod + def col_name_error(predicted_var_name: str): + return f"{predicted_var_name}_error" + + @staticmethod + def col_name_abs_error(predicted_var_name: str): + return f"{predicted_var_name}_abs_error" + + def eval_stats(self, predicted_var_name: Optional[str] = None): + """ + Creates the evaluation stats object for this result object, which can be used to compute metrics + or to create plots. + + :param predicted_var_name: the name of the predicted variable for which to create the object; + can be None if there is but a single variable + :return: the evaluation stats object + """ + predicted_var_name = get_predicted_var_name(predicted_var_name, self.predicted_var_names) + return RegressionEvalStats(y_predicted=self.df[self.col_name_predicted(predicted_var_name)], + y_true=self.df[self.col_name_ground_truth(predicted_var_name)]) diff --git a/src/sensai/lightgbm.py b/src/sensai/lightgbm.py index 287f82c44..d79b7bc73 100644 --- a/src/sensai/lightgbm.py +++ b/src/sensai/lightgbm.py @@ -63,6 +63,9 @@ def __init__(self, categorical_feature_names: Optional[Union[Sequence[str], str] def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): _update_fit_args(self.fitArgs, inputs, outputs, self._categoricalFeatureNameRegex) + def is_sample_weight_supported(self) -> bool: + return True + class LightGBMVectorClassificationModel(AbstractSkLearnVectorClassificationModel, FeatureImportanceProviderSkLearnClassification): log = log.getChild(__qualname__) @@ -112,3 +115,6 @@ def _predict_class_probabilities(self, x: pd.DataFrame): return pd.DataFrame(y, columns=self._labels) else: return super()._predict_class_probabilities(x) + + def is_sample_weight_supported(self) -> bool: + return True diff --git a/src/sensai/naive_bayes.py b/src/sensai/naive_bayes.py index d940d5177..9eff3a8db 100644 --- a/src/sensai/naive_bayes.py +++ b/src/sensai/naive_bayes.py @@ -1,5 +1,6 @@ import collections from math import log, exp +from typing import Optional import numpy as np import pandas as pd @@ -20,7 +21,8 @@ def __init__(self, pseudo_count=0.1): self.conditionals = None self.pseudoCount = pseudo_count - def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) self.prior = collections.defaultdict(lambda: 0) self.conditionals = collections.defaultdict(lambda: [collections.defaultdict(lambda: 0) for _ in range(x.shape[1])]) increment = 1 diff --git a/src/sensai/nearest_neighbors.py b/src/sensai/nearest_neighbors.py index 12f5238e9..d3852807c 100644 --- a/src/sensai/nearest_neighbors.py +++ b/src/sensai/nearest_neighbors.py @@ -233,7 +233,8 @@ def _tostring_excludes(self) -> List[str]: return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] # noinspection DuplicatedCode - def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) assert len(y.columns) == 1, "Expected exactly one column in label set Y" self.df = x.merge(y, how="inner", left_index=True, right_index=True) self.y = y @@ -302,7 +303,8 @@ def _tostring_excludes(self) -> List[str]: return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] # noinspection DuplicatedCode - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) assert len(y.columns) == 1, "Expected exactly one column in label set Y" self.df = x.merge(y, how="inner", left_index=True, right_index=True) self.y = y diff --git a/src/sensai/sklearn/sklearn_base.py b/src/sensai/sklearn/sklearn_base.py index 414510a93..f2726eca8 100644 --- a/src/sensai/sklearn/sklearn_base.py +++ b/src/sensai/sklearn/sklearn_base.py @@ -2,6 +2,7 @@ import logging import re from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import List, Any, Dict, Optional import numpy as np @@ -49,6 +50,13 @@ def _apply_sklearn_input_transformer(inputs: pd.DataFrame, sklearn_input_transfo return pd.DataFrame(input_values, index=inputs.index, columns=inputs.columns) +class ActualFitParams: + def __init__(self, inputs, outputs, kwargs: Dict[str, Any]): + self.inputs = inputs + self.outputs = outputs + self.kwargs = kwargs + + class AbstractSkLearnVectorRegressionModel(VectorRegressionModel, ABC): """ Base class for models built upon scikit-learn's model implementations @@ -101,21 +109,42 @@ def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): """ Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the - underlying model's fit method) + underlying model's `fit` method) + + :param inputs: the training input data + :param outputs: the training output data + """ + pass + + def _compute_actual_fit_params(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> ActualFitParams: + """ + Computes additional arguments to be passed to the model's `fit` method, which are transient and shall not be saved + along with the model as metadata, e.g. larger data structures such as validation data or sample weights. :param inputs: the training input data :param outputs: the training output data + :return: a dictionary of parameters to be passed to `fit`. """ + fit_params = ActualFitParams(inputs, outputs, dict(self.fitArgs)) + if weights is not None: + self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights) + if self.is_sample_weight_supported(): + fit_params.kwargs["sample_weight"] = weights + return fit_params + + @abstractmethod + def is_sample_weight_supported(self) -> bool: pass - def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame): + def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None): inputs = self._transform_input(inputs, fit=True) self._update_model_args(inputs, outputs) self._update_fit_args(inputs, outputs) - self._fit_sklearn(inputs, outputs) + actual_fit_params = self._compute_actual_fit_params(inputs, outputs, weights=weights) + self._fit_sklearn(actual_fit_params) @abstractmethod - def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): + def _fit_sklearn(self, params: ActualFitParams): pass def _predict(self, x: pd.DataFrame): @@ -147,13 +176,13 @@ def _tostring_additional_entries(self) -> Dict[str, Any]: d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" return d - def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): - for predictedVarName in outputs.columns: + def _fit_sklearn(self, params: ActualFitParams): + for predictedVarName in params.outputs.columns: log.info(f"Fitting model for output variable '{predictedVarName}'") model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=copy.deepcopy(self.sklearnOutputTransformer)) - model.fit(inputs, outputs[predictedVarName], **self.fitArgs) + model.fit(params.inputs, params.outputs[predictedVarName], **params.kwargs) self.models[predictedVarName] = model def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: @@ -192,14 +221,14 @@ def _tostring_additional_entries(self) -> Dict[str, Any]: d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" return d - def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): - if len(outputs.columns) > 1: - log.info(f"Fitting a single multi-dimensional model for all {len(outputs.columns)} output dimensions") + def _fit_sklearn(self, params: ActualFitParams): + if len(params.outputs.columns) > 1: + log.info(f"Fitting a single multi-dimensional model for all {len(params.outputs.columns)} output dimensions") self.model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=self.sklearnOutputTransformer) - output_values = outputs.values + output_values = params.outputs.values if output_values.shape[1] == 1: # for 1D output, shape must be (numSamples,) rather than (numSamples, 1) output_values = np.ravel(output_values) - self.model.fit(inputs, output_values, **self.fitArgs) + self.model.fit(params.inputs, output_values, **params.kwargs) def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: y = self.model.predict(inputs) @@ -273,13 +302,21 @@ def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): """ pass - def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame): + @abstractmethod + def is_sample_weight_supported(self) -> bool: + pass + + def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None): inputs = self._transform_input(inputs, fit=True) self._update_model_args(inputs, outputs) self._update_fit_args(inputs, outputs) self.model = create_sklearn_model(self.modelConstructor, self.modelArgs) log.info(f"Fitting sklearn classifier of type {self.model.__class__.__name__}") kwargs = dict(self.fitArgs) + + if self.useBalancedClassWeights and weights is not None: + raise ValueError("Balanced class weights cannot be used in conjunction with user-specified weights") + if self.useBalancedClassWeights: class2weight = self._compute_class_weights(outputs) classes = outputs.iloc[:, 0] @@ -287,6 +324,11 @@ def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame): weights = weights / np.min(weights) kwargs["sample_weight"] = weights + elif weights is not None: + self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights) + if self.is_sample_weight_supported(): + kwargs["sample_weight"] = weights + output_values = np.ravel(outputs.values) if self.useLabelEncoding: output_values = self._encode_labels(output_values) diff --git a/src/sensai/sklearn/sklearn_classification.py b/src/sensai/sklearn/sklearn_classification.py index 9a1d85afe..6283c6df1 100644 --- a/src/sensai/sklearn/sklearn_classification.py +++ b/src/sensai/sklearn/sklearn_classification.py @@ -19,6 +19,9 @@ def __init__(self, min_samples_leaf=1, random_state=42, **model_args): super().__init__(DecisionTreeClassifier, min_samples_leaf=min_samples_leaf, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnRandomForestVectorClassificationModel(AbstractSkLearnVectorClassificationModel, FeatureImportanceProviderSkLearnClassification): @@ -28,6 +31,9 @@ def __init__(self, n_estimators=100, min_samples_leaf=1, random_state=42, use_ba use_balanced_class_weights=use_balanced_class_weights, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnMLPVectorClassificationModel(AbstractSkLearnVectorClassificationModel): def __init__(self, hidden_layer_sizes=(100,), activation: str = "relu", @@ -50,21 +56,33 @@ def __init__(self, hidden_layer_sizes=(100,), activation: str = "relu", random_state=random_state, solver=solver, batch_size=batch_size, max_iter=max_iter, early_stopping=early_stopping, n_iter_no_change=n_iter_no_change, **model_args) + def is_sample_weight_supported(self) -> bool: + return False + class SkLearnMultinomialNBVectorClassificationModel(AbstractSkLearnVectorClassificationModel): def __init__(self, **model_args): super().__init__(sklearn.naive_bayes.MultinomialNB, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnSVCVectorClassificationModel(AbstractSkLearnVectorClassificationModel): def __init__(self, random_state=42, **model_args): super().__init__(sklearn.svm.SVC, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnLogisticRegressionVectorClassificationModel(AbstractSkLearnVectorClassificationModel): def __init__(self, random_state=42, **model_args): super().__init__(sklearn.linear_model.LogisticRegression, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnKNeighborsVectorClassificationModel(AbstractSkLearnVectorClassificationModel): def __init__(self, **model_args): @@ -76,3 +94,6 @@ def _predict_sklearn(self, input_values): inputs = np.ascontiguousarray(input_values) return super()._predict_sklearn(inputs) + + def is_sample_weight_supported(self) -> bool: + return False diff --git a/src/sensai/sklearn/sklearn_regression.py b/src/sensai/sklearn/sklearn_regression.py index 5d8707d87..ebd46311f 100644 --- a/src/sensai/sklearn/sklearn_regression.py +++ b/src/sensai/sklearn/sklearn_regression.py @@ -20,6 +20,9 @@ def __init__(self, n_estimators=100, min_samples_leaf=10, random_state=42, **mod super().__init__(sklearn.ensemble.RandomForestRegressor, n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnLinearRegressionVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel, FeatureImportanceProviderSkLearnRegressionMultiDim): @@ -31,6 +34,9 @@ def __init__(self, fit_intercept=True, **model_args): """ super().__init__(sklearn.linear_model.LinearRegression, fit_intercept=fit_intercept, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnLinearRidgeRegressionVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel, FeatureImportanceProviderSkLearnRegressionMultiDim): @@ -47,6 +53,9 @@ def __init__(self, alpha=1.0, fit_intercept=True, solver="auto", max_iter=None, super().__init__(sklearn.linear_model.Ridge, alpha=alpha, fit_intercept=fit_intercept, max_iter=max_iter, tol=tol, solver=solver, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnLinearLassoRegressionVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel, FeatureImportanceProviderSkLearnRegressionMultiDim): @@ -62,6 +71,9 @@ def __init__(self, alpha=1.0, fit_intercept=True, max_iter=1000, tol=0.0001, **m """ super().__init__(sklearn.linear_model.Lasso, alpha=alpha, fit_intercept=fit_intercept, max_iter=max_iter, tol=tol, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnMultiLayerPerceptronVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel): def __init__(self, @@ -85,38 +97,59 @@ def __init__(self, random_state=random_state, hidden_layer_sizes=hidden_layer_sizes, activation=activation, solver=solver, batch_size=batch_size, max_iter=max_iter, early_stopping=early_stopping, n_iter_no_change=n_iter_no_change, **model_args) + def is_sample_weight_supported(self) -> bool: + return False + class SkLearnSVRVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel): def __init__(self, **model_args): super().__init__(sklearn.svm.SVR, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnLinearSVRVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel): def __init__(self, **model_args): super().__init__(sklearn.svm.LinearSVR, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnGradientBoostingVectorRegressionModel(AbstractSkLearnMultipleOneDimVectorRegressionModel): def __init__(self, random_state=42, **model_args): super().__init__(sklearn.ensemble.GradientBoostingRegressor, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnKNeighborsVectorRegressionModel(AbstractSkLearnMultiDimVectorRegressionModel): def __init__(self, **model_args): super().__init__(sklearn.neighbors.KNeighborsRegressor, **model_args) + def is_sample_weight_supported(self) -> bool: + return False + class SkLearnExtraTreesVectorRegressionModel(AbstractSkLearnMultipleOneDimVectorRegressionModel): def __init__(self, n_estimators=100, min_samples_leaf=10, random_state=42, **model_args): super().__init__(sklearn.ensemble.ExtraTreesRegressor, n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnDummyVectorRegressionModel(AbstractSkLearnMultipleOneDimVectorRegressionModel): def __init__(self, strategy='mean', constant=None, quantile=None): super().__init__(sklearn.dummy.DummyRegressor, strategy=strategy, constant=constant, quantile=quantile) + def is_sample_weight_supported(self) -> bool: + return True + class SkLearnDecisionTreeVectorRegressionModel(AbstractSkLearnMultipleOneDimVectorRegressionModel): def __init__(self, random_state=42, **model_args): @@ -140,3 +173,5 @@ def plot_graphviz_pdf(self, dot_path, predicted_var_name=None): feature_names=self.get_model_input_variable_names(), filled=True) graphviz.Source(dot).render(dot_path) + def is_sample_weight_supported(self) -> bool: + return True diff --git a/src/sensai/tensor_model.py b/src/sensai/tensor_model.py index b40c9f599..9b705153e 100644 --- a/src/sensai/tensor_model.py +++ b/src/sensai/tensor_model.py @@ -135,7 +135,8 @@ def __init__(self, check_input_shape=True, check_input_columns=True): TensorModel.__init__(self) self.check_input_shape = check_input_shape - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) self._fit_tensor_model(x, y) def _predict(self, x: pd.DataFrame) -> pd.DataFrame: @@ -165,7 +166,8 @@ def __init__(self, check_input_shape=True, check_input_columns=True): def _predict_class_probabilities(self, x: pd.DataFrame) -> pd.DataFrame: return self._predict_df_through_array(x, self.get_class_labels()) - def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) self._fit_tensor_model(x, y) def predict(self, x: pd.DataFrame) -> pd.DataFrame: @@ -215,7 +217,8 @@ def __init__(self, check_input_shape=True, check_output_shape=True, check_input_ self.checkInputShape = check_input_shape self.checkOutputShape = check_output_shape - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) self._fit_tensor_model(x, y) def _predict(self, x: pd.DataFrame) -> pd.DataFrame: @@ -254,7 +257,8 @@ def __init__(self, check_input_shape=True, check_output_shape=True, check_input_ self.check_output_shape = check_output_shape self._numPredictedClasses: Optional[int] = None - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): + self._warn_sample_weights_unsupported(False, weights) self._fit_tensor_model(x, y) def is_regression_model(self) -> bool: @@ -263,12 +267,13 @@ def is_regression_model(self) -> bool: def get_num_predicted_classes(self): return self._numPredictedClasses - def fit(self, x: pd.DataFrame, y: pd.DataFrame, fit_preprocessors=True, fit_model=True): + def fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None, fit_preprocessors=True, fit_model=True): """ :param x: data frame containing input tensors on which to train :param y: ground truth has to be an array containing only zeroes and ones (one-hot-encoded labels) of the shape `(*prediction_shape, numLabels)` + :param weights: data point weights (must be None; not supported by this model!) :param fit_preprocessors: whether the model's preprocessors (feature generators and data frame transformers) shall be fitted :param fit_model: whether the model itself shall be fitted @@ -291,7 +296,7 @@ def fit(self, x: pd.DataFrame, y: pd.DataFrame, fit_preprocessors=True, fit_mode f"prediction_shape. If the predictions are scalars, a TensorToScalarClassificationModel " f"should be used instead of {self.__class__.__name__}") self._numPredictedClasses = df_y_to_check.shape[-1] - super().fit(x, y, fit_preprocessors=fit_preprocessors, fit_model=True) + super().fit(x, y, weights=weights, fit_preprocessors=fit_preprocessors, fit_model=True) def get_model_output_shape(self): # The ground truth contains one-hot-encoded labels in the last dimension diff --git a/src/sensai/tensorflow/tf_base.py b/src/sensai/tensorflow/tf_base.py index 88a7451a8..cadb90461 100644 --- a/src/sensai/tensorflow/tf_base.py +++ b/src/sensai/tensorflow/tf_base.py @@ -2,6 +2,7 @@ import logging import os import tempfile +from typing import Optional import pandas as pd import tensorflow as tf @@ -89,7 +90,8 @@ def _create_model(self, input_dim, output_dim): """ pass - def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame): + def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series]): + self._warn_sample_weights_unsupported(False, weights) # normalise data self.input_scaler = normalisation.VectorDataScaler(inputs, self.normalisation_mode) self.output_scaler = normalisation.VectorDataScaler(outputs, self.normalisation_mode) diff --git a/src/sensai/torch/torch_base.py b/src/sensai/torch/torch_base.py index 5365a0bdb..05137393d 100644 --- a/src/sensai/torch/torch_base.py +++ b/src/sensai/torch/torch_base.py @@ -608,7 +608,8 @@ def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) - def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None: + def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None: + self._warn_sample_weights_unsupported(False, weights) if self.inputTensoriser is not None: log.info(f"Fitting {self.inputTensoriser} ...") self.inputTensoriser.fit(inputs, model=self) @@ -772,7 +773,8 @@ def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) - def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None: + def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None: + self._warn_sample_weights_unsupported(False, weights) if len(outputs.columns) != 1: raise ValueError("Expected one output dimension: the class labels") diff --git a/src/sensai/util/cache.py b/src/sensai/util/cache.py index 593277cc8..d7dc42ea4 100644 --- a/src/sensai/util/cache.py +++ b/src/sensai/util/cache.py @@ -9,9 +9,10 @@ import threading import time from abc import abstractmethod, ABC +from collections import OrderedDict from functools import wraps from pathlib import Path -from typing import Any, Callable, Iterator, List, Optional, TypeVar, Generic, Union +from typing import Any, Callable, Iterator, List, Optional, TypeVar, Generic, Union, Hashable from .hash import pickle_hash from .pickle import load_pickle, dump_pickle, setstate @@ -20,6 +21,7 @@ T = TypeVar("T") TKey = TypeVar("TKey") +THashableKey = TypeVar("THashableKey", bound=Hashable) TValue = TypeVar("TValue") TData = TypeVar("TData") @@ -788,3 +790,28 @@ def load(cls, path: Union[str, Path], backend="pickle"): if not isinstance(result, cls): raise Exception(f"Excepted instance of {cls}, instead got: {result.__class__.__name__}") return result + + +class LRUCache(KeyValueCache[THashableKey, TValue], Generic[THashableKey, TValue]): + def __init__(self, capacity: int) -> None: + self._cache = OrderedDict() + self._capacity = capacity + + def get(self, key: THashableKey) -> TValue: + if key not in self._cache: + return None + self._cache.move_to_end(key) + return self._cache[key] + + def set(self, key: THashableKey, value: TValue): + if key in self._cache: + self._cache.move_to_end(key) + self._cache[key] = value + if len(self._cache) > self._capacity: + self._cache.popitem(last=False) + + def __len__(self) -> int: + return len(self._cache) + + def clear(self) -> None: + self._cache.clear() diff --git a/src/sensai/util/helper.py b/src/sensai/util/helper.py index f5b52b1ca..49860c0eb 100644 --- a/src/sensai/util/helper.py +++ b/src/sensai/util/helper.py @@ -2,7 +2,7 @@ This module contains various helper functions. """ import math -from typing import Any, Sequence, Union, TypeVar, List +from typing import Any, Sequence, Union, TypeVar, List, Optional, Dict, Container, Iterable T = TypeVar("T") @@ -58,6 +58,13 @@ def check_not_nan_dict(d: dict): raise ValueError(f"Got one or more NaN values: {invalid_keys}") +def contains_any(container: Union[Container, Iterable], items: Sequence) -> bool: + for item in items: + if item in container: + return True + return False + + # noinspection PyUnusedLocal def mark_used(*args): """ @@ -83,4 +90,23 @@ def flatten_arguments(args: Sequence[Union[T, Sequence[T]]]) -> List[T]: result.extend(arg) else: result.append(arg) - return result \ No newline at end of file + return result + + +def kwarg_if_not_none(arg_name: str, arg_value: Any) -> Dict[str, Any]: + """ + Supports the optional passing of a kwarg, returning a non-empty dictionary with the kwarg only + if the value is not None. + + This can be helpful to improve backward compatibility for cases where a kwarg was added later + but old implementations weren't updated to have it, such that an exception will be raised only + if the kwarg is actually used. + + :param arg_name: the argument name + :param arg_value: the value + :return: a dictionary containing the kwarg or, if the value is None, an empty dictionary + """ + if arg_value is None: + return {} + else: + return {arg_name: arg_value} diff --git a/src/sensai/util/io.py b/src/sensai/util/io.py index 30c99eae7..9b8ddc86e 100644 --- a/src/sensai/util/io.py +++ b/src/sensai/util/io.py @@ -52,6 +52,9 @@ def path(self, filename_suffix: str, extension_to_add=None, valid_other_extensio relevant if extensionToAdd is specified :return: the full path """ + # replace forbidden characters + filename_suffix = filename_suffix.replace(">=", "gte").replace(">", "gt") + if extension_to_add is not None: add_ext = True valid_extensions = set(valid_other_extensions) if valid_other_extensions is not None else set() @@ -200,3 +203,19 @@ def _get_s3_object(self): session = boto3.session.Session(profile_name=os.getenv("AWS_PROFILE")) s3 = session.resource("s3") return s3.Bucket(self.bucket).Object(self.object) + + +def create_path(root: str, *path_elems: str, is_dir: bool, make_dirs: bool = False) -> str: + path = os.path.join(root, *path_elems) + if make_dirs: + dir_path = path if is_dir else os.path.dirname(path) + os.makedirs(dir_path, exist_ok=True) + return path + + +def create_file_path(root, *path_elems, make_dirs: bool = False) -> str: + return create_path(root, *path_elems, is_dir=False, make_dirs=make_dirs) + + +def create_dir_path(root, *path_elems, make_dirs: bool = False) -> str: + return create_path(root, *path_elems, is_dir=True, make_dirs=make_dirs) diff --git a/src/sensai/util/logging.py b/src/sensai/util/logging.py index 901ac4dbc..ea29864b1 100644 --- a/src/sensai/util/logging.py +++ b/src/sensai/util/logging.py @@ -141,10 +141,11 @@ def _at_exit_report_file_logger(): print(f"A log file was saved to {path}") -def add_file_logger(path, register_atexit=True): +def add_file_logger(path, append=True, register_atexit=True): global _isAtExitReportFileLoggerRegistered log.info(f"Logging to {path} ...") - handler = FileHandler(path) + mode = "a" if append else "w" + handler = FileHandler(path, mode=mode) handler.setFormatter(Formatter(_logFormat)) Logger.root.addHandler(handler) _fileLoggerPaths.append(path) @@ -325,19 +326,21 @@ class FileLoggerContext: """ A context handler to be used in conjunction with Python's `with` statement which enables file-based logging. """ - def __init__(self, path: str, enabled=True): + def __init__(self, path: str, append=True, enabled=True): """ :param path: the path to the log file + :param append: whether to append in case the file already exists; if False, always create a new file. :param enabled: whether to actually perform any logging. This switch allows the with statement to be applied regardless of whether logging shall be enabled. """ self.enabled = enabled self.path = path + self.append = append self._log_handler = None def __enter__(self): if self.enabled: - self._log_handler = add_file_logger(self.path, register_atexit=False) + self._log_handler = add_file_logger(self.path, append=self.append, register_atexit=False) def __exit__(self, exc_type, exc_value, traceback): if self._log_handler is not None: diff --git a/src/sensai/util/pandas.py b/src/sensai/util/pandas.py index f4ead1a32..0091d32b4 100644 --- a/src/sensai/util/pandas.py +++ b/src/sensai/util/pandas.py @@ -1,9 +1,13 @@ import logging +from abc import ABC, abstractmethod from copy import copy +from typing import List import numpy as np import pandas as pd +from sensai.util import mark_used + log = logging.getLogger(__name__) @@ -140,3 +144,134 @@ def remove_duplicate_index_entries(df: pd.DataFrame): keep.append(item != prev_item) prev_item = item return df[keep] + + +def query_data_frame(df: pd.DataFrame, sql: str): + """ + Queries the given data frame with the given condition specified in SQL syntax. + + NOTE: Requires duckdb to be installed. + + :param df: the data frame to query + :param sql: an SQL query starting with the WHERE clause (excluding the 'where' keyword itself) + :return: the filtered/transformed data frame + """ + import duckdb + + NUM_TYPE_INFERENCE_ROWS = 100 + + def is_supported_object_col(col_name: str): + supported_type_set = set() + contains_unsupported_types = False + # check the first N values + for value in df[col_name].iloc[:NUM_TYPE_INFERENCE_ROWS]: + if isinstance(value, str): + supported_type_set.add(str) + elif value is None: + pass + else: + contains_unsupported_types = True + return not contains_unsupported_types and len(supported_type_set) == 1 + + # determine which columns are object columns that are unsupported by duckdb and would raise errors + # if they remained in the data frame that is queried + added_index_col = "__sensai_resultset_index__" + original_columns = df.columns + object_columns = list(df.dtypes[df.dtypes == object].index) + object_columns = [c for c in object_columns if not is_supported_object_col(c)] + + # add an artificial index which we will use to identify the rows for object column reconstruction + df[added_index_col] = np.arange(len(df)) + + try: + # remove the object columns from the data frame but save them for subsequent reconstruction + objects_df = df[object_columns + [added_index_col]] + query_df = df.drop(columns=object_columns) + mark_used(query_df) + + # apply query with reduced df + result_df = duckdb.query(f"select * from query_df where {sql}").to_df() + + # restore object columns in result + objects_df.set_index(added_index_col, drop=True, inplace=True) + result_df.set_index(added_index_col, drop=True, inplace=True) + result_objects_df = objects_df.loc[result_df.index] + assert len(result_df) == len(result_objects_df) + full_result_df = pd.concat([result_df, result_objects_df], axis=1) + full_result_df = full_result_df[original_columns] + + finally: + # clean up + df.drop(columns=added_index_col, inplace=True) + + return full_result_df + + +class SeriesInterpolation(ABC): + def interpolate(self, series: pd.Series, inplace: bool = False) -> pd.Series | None: + if not inplace: + series = series.copy() + self._interpolate_in_place(series) + return series if not inplace else None + + @abstractmethod + def _interpolate_in_place(self, series: pd.Series) -> None: + pass + + def interpolate_all_with_combined_index(self, series_list: List[pd.Series]) -> List[pd.Series]: + """ + Interpolates the given series using the combined index of all series. + + :param series_list: the list of series to interpolate + :return: a list of corresponding interpolated series, each having the same index + """ + # determine common index and + index_elements = set() + for series in series_list: + index_elements.update(series.index) + common_index = sorted(index_elements) + + # reindex, filling the gaps via interpolation + interpolated_series_list = [] + for series in series_list: + series = series.copy() + series = series.reindex(common_index, method=None) + self.interpolate(series, inplace=True) + interpolated_series_list.append(series) + + return interpolated_series_list + + +class SeriesInterpolationLinearIndex(SeriesInterpolation): + def __init__(self, ffill: bool = False, bfill: bool = False): + """ + :param ffill: whether to fill any N/A values at the end of the series with the last valid observation + :param bfill: whether to fill any N/A values at the start of the series with the first valid observation + """ + self.ffill = ffill + self.bfill = bfill + + def _interpolate_in_place(self, series: pd.Series) -> pd.Series | None: + series.interpolate(method="index", inplace=True) + if self.ffill: + series.interpolate(method="ffill", limit_direction="forward") + if self.bfill: + series.interpolate(method="bfill", limit_direction="backward") + + +class SeriesInterpolationRepeatPreceding(SeriesInterpolation): + def __init__(self, bfill: bool = False): + """ + :param bfill: whether to fill any N/A values at the start of the series with the first valid observation + """ + self.bfill = bfill + + def _interpolate_in_place(self, series: pd.Series) -> pd.Series | None: + series.interpolate(method="pad", limit_direction="forward", inplace=True) + if self.bfill: + series.interpolate(method="bfill", limit_direction="backward") + + +def average_series(series_list: List[pd.Series], interpolation: SeriesInterpolation) -> pd.Series: + interpolated_series_list = interpolation.interpolate_all_with_combined_index(series_list) + return sum(interpolated_series_list) / len(interpolated_series_list) # type: ignore diff --git a/src/sensai/util/pickle.py b/src/sensai/util/pickle.py index 554115f66..2552788d2 100644 --- a/src/sensai/util/pickle.py +++ b/src/sensai/util/pickle.py @@ -1,8 +1,9 @@ import logging import os import pickle +from copy import copy from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Union +from typing import Any, Callable, Dict, Iterable, List, Tuple, Union import cloudpickle import joblib @@ -150,14 +151,32 @@ def log_failure_if_enabled(cls, obj, context_info: str = None): log.info(f"{prefix}: is picklable") +class PersistableObject: + """ + Base class which can be used for objects that shall support being persisted via pickle. + + IMPORTANT: + The implementations correspond to the default behaviour of pickle for the case where an object has a non-empty + set of attributes. However, for the case where the set of attributes can be empty adding the explicit + implementation of `__getstate__` is crucial in ensuring that `__setstate__` will be called upon unpickling. + So if an object initially has no attributes and is persisted in that state, then any future refactorings + cannot be handled via `__setstate__` by default, but they can when using this class. + """ + def __getstate__(self): + return self.__dict__ + + def __setstate__(self, state): + self.__dict__ = state + + def setstate( cls, obj, state: Dict[str, Any], - renamed_properties: Dict[str, str] = None, + renamed_properties: Dict[str, Union[str, Tuple[str, Callable[[Dict[str, Any]], Any]]]] = None, new_optional_properties: List[str] = None, new_default_properties: Dict[str, Any] = None, - removed_properties: List[str] = None + removed_properties: List[str] = None, ) -> None: """ Helper function for safe implementations of __setstate__ in classes, which appropriately handles the cases where @@ -168,17 +187,26 @@ def setstate( :param cls: the class in which you are implementing __setstate__ :param obj: the instance of cls :param state: the state dictionary - :param renamed_properties: a mapping from old property names to new property names + :param renamed_properties: can be used for renaming as well as for assigning new values. + If passed must map an old property name to either a new property name or + to tuple of a new property name and a function that computes the new value from the state dictionary. :param new_optional_properties: a list of names of new property names, which, if not present, shall be initialised with None :param new_default_properties: a dictionary mapping property names to their default values, which shall be added if they are not present :param removed_properties: a list of names of properties that are no longer being used """ # handle new/changed properties if renamed_properties is not None: - for mOld, mNew in renamed_properties.items(): - if mOld in state: - state[mNew] = state[mOld] - del state[mOld] + # `new` can either be a string or a tuple of a string and a function + for old_name, new in renamed_properties.items(): + if old_name in state: + if isinstance(new, str): + new_name, new_value = new, state[old_name] + else: + new_name, new_value = new[0], new[1](state) + + del state[old_name] + state[new_name] = new_value + if new_optional_properties is not None: for mNew in new_optional_properties: if mNew not in state: @@ -227,7 +255,8 @@ def getstate( if hasattr(s, '__getstate__'): d = s.__getstate__() else: - d = obj.__dict__.copy() + d = obj.__dict__ + d = copy(d) if transient_properties is not None: for p in transient_properties: if p in d: diff --git a/src/sensai/util/plot.py b/src/sensai/util/plot.py index 365851d42..3dd2a9a43 100644 --- a/src/sensai/util/plot.py +++ b/src/sensai/util/plot.py @@ -1,13 +1,17 @@ +import collections import logging -from typing import Sequence, Callable, TypeVar, Tuple, Optional, List, Any +from typing import Sequence, Callable, TypeVar, Tuple, Optional, List, Any, Union, Dict import matplotlib.figure import matplotlib.ticker as plticker import numpy as np +import pandas as pd import seaborn as sns from matplotlib import pyplot as plt from matplotlib.colors import LinearSegmentedColormap +from sensai.util.pandas import SeriesInterpolation + log = logging.getLogger(__name__) MATPLOTLIB_DEFAULT_FIGURE_SIZE = (6.4, 4.8) @@ -136,18 +140,22 @@ def plot_matrix(matrix: np.ndarray, title: str, xtick_labels: Sequence[str], yti class Plot: - def __init__(self, draw: Callable[[], None] = None, name=None): + def __init__(self, draw: Callable[[plt.Axes], None] = None, name=None, ax: Optional[plt.Axes] = None): """ :param draw: function which returns a matplotlib.Axes object to show :param name: name/number of the figure, which determines the window caption; it should be unique, as any plot with the same name will have its contents rendered in the same window. By default, figures are number sequentially. + :param ax: the axes to draw to """ - fig, ax = plt.subplots(num=name) + if ax is not None: + fig = None + else: + fig, ax = plt.subplots(num=name) self.fig: plt.Figure = fig self.ax: plt.Axes = ax if draw is not None: - draw() + draw(ax) def xlabel(self: TPlot, label) -> TPlot: self.ax.set_xlabel(label) @@ -207,7 +215,8 @@ class ScatterPlot(Plot): MAX_OPACITY = 0.5 MIN_OPACITY = 0.05 - def __init__(self, x, y, c=None, c_base: Tuple[float, float, float] = (0, 0, 1), c_opacity=None, x_label=None, y_label=None, **kwargs): + def __init__(self, x, y, c=None, c_base: Tuple[float, float, float] = (0, 0, 1), c_opacity=None, x_label=None, + y_label=None, add_diagonal=False, **kwargs): """ :param x: the x values; if has name (e.g. pd.Series), will be used as axis label :param y: the y values; if has name (e.g. pd.Series), will be used as axis label @@ -238,12 +247,15 @@ def __init__(self, x, y, c=None, c_base: Tuple[float, float, float] = (0, 0, 1), if y_label is None and hasattr(y, "name"): y_label = y.name - def draw(): + def draw(ax): if x_label is not None: plt.xlabel(x_label) if x_label is not None: plt.ylabel(y_label) + value_range = [min(min(x), min(y)), max(max(x), max(y))] plt.scatter(x, y, c=c, **kwargs) + if add_diagonal: + plt.plot(value_range, value_range, '-', lw=1, label="_not in legend", color="green", zorder=1) super().__init__(draw) @@ -273,7 +285,7 @@ def __init__(self, x, y, x_label=None, y_label=None, bins=60, cmap=None, common_ if y_label is None and hasattr(y, "name"): y_label = y.name - def draw(): + def draw(ax): nonlocal cmap x_range = [min(x), max(x)] y_range = [min(y), max(y)] @@ -312,7 +324,7 @@ def __init__(self, values, bins="auto", kde=False, cdf=False, cdf_complementary= :param kwargs: arguments to pass on to sns.histplot """ - def draw(): + def draw(ax): nonlocal cdf_secondary_axis sns.histplot(values, bins=bins, kde=kde, binwidth=binwidth, stat=stat, **kwargs) plt.ylabel(stat) @@ -344,3 +356,60 @@ def draw(): self.xlabel(xlabel) super().__init__(draw) + + +class AverageSeriesLinePlot(Plot): + """ + Plots the average of a collection of series or the averages of several collections of series, + establishing a common index (the unification of all indices) for each collection via interpolation. + The standard deviation is additionally shown as a shaded area around each line. + """ + def __init__(self, + series_collection: Union[List[pd.Series], Dict[str, List[pd.Series]]], + interpolation: SeriesInterpolation, + collection_name="collection", + ax: Optional[plt.Axes] = None, + hue_order=None, palette=None): + """ + :param series_collection: either a list of series to average or a dictionary mapping the name of a collection + to a list of series to average + :param interpolation: the interpolation with which to obtain series values for the unified index of a collection of series + :param collection_name: a name indicating what a key in `series_collection` refers to, which will appear in the legend + for the case where more than one collection is passed + :param ax: the axis to plot to; if None, create a new figure and axis + :param hue_order: the hue order (for the case where there is more than one collection of series) + :param palette: the colour palette to use + """ + if isinstance(series_collection, dict): + series_dict = series_collection + else: + series_dict = {"_": series_collection} + + series_list = next(iter(series_dict.values())) + x_name = series_list[0].index.name or "x" + y_name = series_list[0].name or "y" + + # build data frame with all series, interpolating each sub-collection + dfs = [] + for name, series_list in series_dict.items(): + interpolated_series_list = interpolation.interpolate_all_with_combined_index(series_list) + for series in interpolated_series_list: + df = pd.DataFrame({y_name: series, x_name: series.index}) + df["series_id"] = id(series) + df[collection_name] = name + dfs.append(df) + full_df = pd.concat(dfs, axis=0).reset_index(drop=True) + + def draw(ax): + sns.lineplot( + data=full_df, + x=x_name, + y=y_name, + estimator="mean", + hue=collection_name if len(series_dict) > 1 else None, + hue_order=hue_order, + palette=palette, + ax=ax, + ) + + super().__init__(draw, ax=ax) diff --git a/src/sensai/vector_model.py b/src/sensai/vector_model.py index 7df58ed78..b9b9d6c52 100644 --- a/src/sensai/vector_model.py +++ b/src/sensai/vector_model.py @@ -16,7 +16,7 @@ from .data import InputOutputData from .data_transformation import DataFrameTransformer, DataFrameTransformerChain, InvertibleDataFrameTransformer from .featuregen import FeatureGenerator, FeatureCollector -from .util import mark_used +from .util import mark_used, kwarg_if_not_none from .util.cache import PickleLoadSaveMixin from .util.logging import StopWatch from .util.pickle import setstate, getstate @@ -70,20 +70,6 @@ def get_name(self): return self._name -class VectorModelFittableBase(VectorModelBase, ABC): - """ - Base class for vector models, which encompasses the fundamental prediction and fitting interfaces. - A vector model takes data frames as input, where each row represents a vector of information. - """ - @abstractmethod - def fit(self, x: pd.DataFrame, y: pd.DataFrame): - pass - - @abstractmethod - def is_fitted(self) -> bool: - pass - - class TrainingContext: """ Contains context information for an ongoing training process @@ -93,7 +79,7 @@ def __init__(self, original_input: pd.DataFrame, original_output: pd.DataFrame): self.original_output = original_output -class VectorModel(VectorModelFittableBase, PickleLoadSaveMixin, ABC): +class VectorModel(VectorModelBase, PickleLoadSaveMixin, ABC): """ Represents a model which uses data frames as inputs and outputs whose rows define individual data points. Every data frame row represents a vector of information (one-dimensional array), hence the name of the model. @@ -354,15 +340,17 @@ def fit_input_output_data(self, io_data: InputOutputData, fit_preprocessors=True :param fit_preprocessors: whether the model's preprocessors (feature generators and data frame transformers) shall be fitted :param fit_model: whether the model itself shall be fitted """ - self.fit(io_data.inputs, io_data.outputs, fit_preprocessors=fit_preprocessors, fit_model=fit_model) + self.fit(io_data.inputs, io_data.outputs, weights=io_data.weights, fit_preprocessors=fit_preprocessors, fit_model=fit_model) - def fit(self, x: pd.DataFrame, y: Optional[pd.DataFrame], fit_preprocessors=True, fit_model=True): + def fit(self, x: pd.DataFrame, y: Optional[pd.DataFrame], weights: Optional[pd.Series] = None, fit_preprocessors=True, fit_model=True): """ Fits the model using the given data :param x: a data frame containing input data :param y: a data frame containing output data; may be None if the underlying model does not actually require fitting, e.g. in the case of a rule-based models, but fitting is still necessary for preprocessors + :param weights: an optional series (with the same index as `x` and `y`) containing data point weights. + Added in v1.2.0. :param fit_preprocessors: whether the model's preprocessors (feature generators and data frame transformers) shall be fitted :param fit_model: whether the model itself shall be fitted """ @@ -393,7 +381,7 @@ def fit(self, x: pd.DataFrame, y: Optional[pd.DataFrame], fit_preprocessors=True inputs_with_types = ', '.join([n + '/' + x[n].dtype.name for n in self._modelInputVariableNames]) log.debug(f"Fitting with outputs[{len(y.columns)}]={list(y.columns)}, " f"inputs[{len(self._modelInputVariableNames)}]=[{inputs_with_types}]; N={len(x)} data points") - self._fit(x, y) + self._fit(x, y, **kwarg_if_not_none("weights", weights)) self._isFitted = True else: log.info("Fitting of underlying model skipped") @@ -408,9 +396,13 @@ def is_being_fitted(self) -> bool: return self._trainingContext is not None @abstractmethod - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.DataFrame] = None): pass + def _warn_sample_weights_unsupported(self, is_weighting_supported: bool, weights: Optional[pd.Series]): + if weights is not None and not is_weighting_supported: + log.warning(f"Data point weighting not supported by {self.__class__.__name__}; ignoring weights") + def get_predicted_variable_names(self): """ :return: the list of variable names that are ultimately output by this model (i.e. the columns of the data frame output @@ -633,6 +625,16 @@ def get_model_output_variable_names(self): return self._modelOutputVariableNames +def get_predicted_var_name(specified_var_name: Optional[str], predicted_var_names: List[str]): + if specified_var_name is not None: + return specified_var_name + else: + if len(predicted_var_names) > 1: + raise ValueError("Must explicitly specify the predicted variable name for a model with multiple output variables " + f"({predicted_var_names})") + return predicted_var_names[0] + + class VectorClassificationModel(VectorModel, ABC): def __init__(self, check_input_columns=True): """ @@ -647,17 +649,17 @@ def __init__(self, check_input_columns=True): def is_regression_model(self) -> bool: return False - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): if len(y.columns) != 1: raise ValueError("Classification requires exactly one output column with class labels") self._labels = sorted([label for label in y.iloc[:, 0].unique()]) - self._fit_classifier(x, y) + self._fit_classifier(x, y, **kwarg_if_not_none("weights", weights)) def get_class_labels(self) -> List[Any]: return self._labels @abstractmethod - def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): pass def convert_class_probabilities_to_predictions(self, df: pd.DataFrame): @@ -744,7 +746,7 @@ def __init__(self, predicted_variable_names: list): def _underlying_model_requires_fitting(self): return False - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): pass @@ -765,8 +767,8 @@ def __init__(self, labels: list, predicted_variable_name="predictedLabel"): def _underlying_model_requires_fitting(self): return False - def _fit(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): pass - def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): + def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): pass diff --git a/src/sensai/xgboost.py b/src/sensai/xgboost.py index dde553d38..bb4b9b43c 100644 --- a/src/sensai/xgboost.py +++ b/src/sensai/xgboost.py @@ -1,9 +1,16 @@ +import logging from typing import Optional +import pandas as pd import xgboost +from . import InputOutputData +from .data import DataSplitter from .sklearn.sklearn_base import AbstractSkLearnMultipleOneDimVectorRegressionModel, AbstractSkLearnVectorClassificationModel, \ - FeatureImportanceProviderSkLearnRegressionMultipleOneDim, FeatureImportanceProviderSkLearnClassification + FeatureImportanceProviderSkLearnRegressionMultipleOneDim, FeatureImportanceProviderSkLearnClassification, ActualFitParams +from .util.pickle import setstate + +log = logging.getLogger(__name__) def is_xgboost_version_at_least(major: int, minor: Optional[int] = None, patch: Optional[int] = None): @@ -23,11 +30,42 @@ class XGBGradientBoostedVectorRegressionModel(AbstractSkLearnMultipleOneDimVecto """ XGBoost's regression model using gradient boosted trees """ - def __init__(self, random_state=42, **model_args): + + def __init__(self, random_state=42, + early_stopping_rounds: Optional[int] = None, + early_stopping_data_splitter: Optional[DataSplitter] = None, + **model_args): """ :param model_args: See https://xgboost.readthedocs.io/en/latest/python/python_api.html#xgboost.XGBRegressor """ - super().__init__(xgboost.XGBRegressor, random_state=random_state, **model_args) + super().__init__(xgboost.XGBRegressor, random_state=random_state, early_stopping_rounds=early_stopping_rounds, + **model_args) + self.is_early_stopping_enabled = early_stopping_rounds is not None + self.early_stopping_data_splitter = early_stopping_data_splitter + + def __setstate__(self, state): + setstate(XGBGradientBoostedVectorRegressionModel, self, state, + new_default_properties=dict( + is_early_stopping_enabled=False, + early_stopping_data_splitter=None)) + + def is_sample_weight_supported(self) -> bool: + return True + + def _compute_actual_fit_params(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> ActualFitParams: + kwargs = {} + if self.is_early_stopping_enabled: + data = InputOutputData(inputs, outputs, weights=weights) + train_data, val_data = self.early_stopping_data_splitter.split(data) + train_data: InputOutputData + kwargs["eval_set"] = [(val_data.inputs, val_data.outputs)] + inputs = train_data.inputs + outputs = train_data.outputs + weights = train_data.weights + log.info(f"Early stopping enabled with validation set of size {len(val_data)}") + params = super()._compute_actual_fit_params(inputs, outputs, weights=weights) + params.kwargs.update(kwargs) + return params class XGBRandomForestVectorRegressionModel(AbstractSkLearnMultipleOneDimVectorRegressionModel, @@ -41,6 +79,9 @@ def __init__(self, random_state=42, **model_args): """ super().__init__(xgboost.XGBRFRegressor, random_state=random_state, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class XGBGradientBoostedVectorClassificationModel(AbstractSkLearnVectorClassificationModel, FeatureImportanceProviderSkLearnClassification): """ @@ -54,6 +95,9 @@ def __init__(self, random_state=42, use_balanced_class_weights=False, **model_ar super().__init__(xgboost.XGBClassifier, random_state=random_state, use_balanced_class_weights=use_balanced_class_weights, use_label_encoding=use_label_encoding, **model_args) + def is_sample_weight_supported(self) -> bool: + return True + class XGBRandomForestVectorClassificationModel(AbstractSkLearnVectorClassificationModel, FeatureImportanceProviderSkLearnClassification): """ @@ -66,3 +110,6 @@ def __init__(self, random_state=42, use_balanced_class_weights=False, **model_ar use_label_encoding = is_xgboost_version_at_least(1, 6) super().__init__(xgboost.XGBRFClassifier, random_state=random_state, use_balanced_class_weights=use_balanced_class_weights, use_label_encoding=use_label_encoding, **model_args) + + def is_sample_weight_supported(self) -> bool: + return True