From c8957f40856a1b04dab61ee7b16eb27489abf83a Mon Sep 17 00:00:00 2001 From: marcopeix Date: Fri, 10 Jan 2025 13:14:36 -0500 Subject: [PATCH] Insample predictions with series of varying lengths --- nbs/core.ipynb | 825 +++++++++++++++++++++++++++++++++++++---- neuralforecast/core.py | 171 +++++---- 2 files changed, 853 insertions(+), 143 deletions(-) diff --git a/nbs/core.ipynb b/nbs/core.ipynb index 5138b5e6..8ea67969 100644 --- a/nbs/core.ipynb +++ b/nbs/core.ipynb @@ -1,5 +1,23 @@ { "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0ebe1274", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "env: PYTORCH_ENABLE_MPS_FALLBACK=1\n" + ] + } + ], + "source": [ + "%set_env PYTORCH_ENABLE_MPS_FALLBACK=1" + ] + }, { "cell_type": "code", "execution_count": null, @@ -37,7 +55,18 @@ "execution_count": null, "id": "515672ca", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/marcopeix/miniconda3/envs/neuralforecast/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n", + "2025-01-10 10:22:44,112\tINFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n", + "2025-01-10 10:22:44,153\tINFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n" + ] + } + ], "source": [ "#| hide\n", "import os\n", @@ -1283,7 +1312,7 @@ " ]\n", " cols_order = first_out_cols + remaining_cols + [target_col]\n", " return ufp.sort(out[cols_order], by=[id_col, 'cutoff', time_col])\n", - "\n", + " \n", " def predict_insample(self, step_size: int = 1):\n", " \"\"\"Predict insample with core.NeuralForecast.\n", "\n", @@ -1306,87 +1335,116 @@ " for model in self.models:\n", " if model.SAMPLING_TYPE == 'recurrent':\n", " warnings.warn(f'Predict insample might not provide accurate predictions for \\\n", - " recurrent model {repr(model)} class yet due to scaling.')\n", + " recurrent model {repr(model)} class yet due to scaling.')\n", " print(f'WARNING: Predict insample might not provide accurate predictions for \\\n", - " recurrent model {repr(model)} class yet due to scaling.')\n", - " \n", - " cols = []\n", - " count_names = {'model': 0}\n", - " for model in self.models:\n", - " model_name = repr(model)\n", - " count_names[model_name] = count_names.get(model_name, -1) + 1\n", - " if count_names[model_name] > 0:\n", - " model_name += str(count_names[model_name])\n", - " cols += [model_name + n for n in model.loss.output_names]\n", + " recurrent model {repr(model)} class yet due to scaling.')\n", "\n", - " # Remove test set from dataset and last dates\n", " test_size = self.models[0].get_test_size()\n", - "\n", - " # trim the forefront period to ensure `test_size - h` should be module `step_size\n", - " # Note: current constraint imposes that all series lengths are equal, so we can take the first series length as sample\n", - " series_length = self.dataset.indptr[1] - self.dataset.indptr[0]\n", - " _, forefront_offset = np.divmod((series_length - test_size - self.h), step_size)\n", - "\n", - " if test_size>0 or forefront_offset>0:\n", - " trimmed_dataset = TimeSeriesDataset.trim_dataset(dataset=self.dataset,\n", - " right_trim=test_size,\n", - " left_trim=forefront_offset)\n", - " new_idxs = np.hstack(\n", - " [\n", - " np.arange(self.dataset.indptr[i] + forefront_offset, self.dataset.indptr[i + 1] - test_size)\n", - " for i in range(self.dataset.n_groups)\n", - " ]\n", + " \n", + " # Process each series separately\n", + " fcsts_dfs = []\n", + " trimmed_datasets = []\n", + " \n", + " for i in range(self.dataset.n_groups):\n", + " # Calculate series-specific length and offset\n", + " series_length = self.dataset.indptr[i + 1] - self.dataset.indptr[i]\n", + " _, forefront_offset = np.divmod((series_length - test_size - self.h), step_size)\n", + " \n", + " if test_size > 0 or forefront_offset > 0:\n", + " # Create single-series dataset\n", + " series_dataset = TimeSeriesDataset(\n", + " temporal=self.dataset.temporal[self.dataset.indptr[i]:self.dataset.indptr[i + 1]],\n", + " temporal_cols=self.dataset.temporal_cols,\n", + " indptr=np.array([0, series_length]),\n", + " y_idx=self.dataset.y_idx\n", + " )\n", + " \n", + " # Trim the series\n", + " trimmed_series = TimeSeriesDataset.trim_dataset(\n", + " dataset=series_dataset,\n", + " right_trim=test_size,\n", + " left_trim=forefront_offset\n", + " )\n", + " \n", + " new_idxs = np.arange(\n", + " self.dataset.indptr[i] + forefront_offset,\n", + " self.dataset.indptr[i + 1] - test_size\n", + " )\n", + " times = self.ds[new_idxs]\n", + " else:\n", + " trimmed_series = TimeSeriesDataset(\n", + " temporal=self.dataset.temporal[self.dataset.indptr[i]:self.dataset.indptr[i + 1]],\n", + " temporal_cols=self.dataset.temporal_cols,\n", + " indptr=np.array([0, series_length]),\n", + " y_idx=self.dataset.y_idx\n", + " )\n", + " times = self.ds[self.dataset.indptr[i]:self.dataset.indptr[i + 1]]\n", + " \n", + " series_fcsts_df = _insample_times(\n", + " times=times,\n", + " uids=self.uids[i:i+1],\n", + " indptr=trimmed_series.indptr,\n", + " h=self.h,\n", + " freq=self.freq,\n", + " step_size=step_size,\n", + " id_col=self.id_col,\n", + " time_col=self.time_col,\n", " )\n", - " times = self.ds[new_idxs]\n", - " else:\n", - " trimmed_dataset = self.dataset\n", - " times = self.ds\n", - "\n", - " # Generate dates\n", - " fcsts_df = _insample_times(\n", - " times=times,\n", - " uids=self.uids,\n", - " indptr=trimmed_dataset.indptr,\n", - " h=self.h,\n", - " freq=self.freq,\n", - " step_size=step_size,\n", - " id_col=self.id_col,\n", - " time_col=self.time_col,\n", - " )\n", - "\n", - " col_idx = 0\n", - " fcsts = np.full((len(fcsts_df), len(cols)), np.nan, dtype=np.float32)\n", + " \n", + " fcsts_dfs.append(series_fcsts_df)\n", + " trimmed_datasets.append(trimmed_series)\n", "\n", + " # Combine all series forecasts DataFrames\n", + " fcsts_df = pd.concat(fcsts_dfs, axis=0, ignore_index=True)\n", + " \n", + " # Generate predictions for each model\n", + " fcsts_list = []\n", " for model in self.models:\n", - " # Test size is the number of periods to forecast (full size of trimmed dataset)\n", - " model.set_test_size(test_size=trimmed_dataset.max_size)\n", - "\n", - " # Predict\n", - " model_fcsts = model.predict(trimmed_dataset, step_size=step_size)\n", - " # Append predictions in memory placeholder\n", - " output_length = len(model.loss.output_names)\n", - " fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts\n", - " col_idx += output_length \n", - " model.set_test_size(test_size=test_size) # Set original test_size\n", - "\n", - " # original y\n", + " model_series_preds = []\n", + " for i, trimmed_dataset in enumerate(trimmed_datasets):\n", + " # Set test size to current series length\n", + " model.set_test_size(test_size=trimmed_dataset.max_size)\n", + " # Generate predictions\n", + " model_fcsts = model.predict(trimmed_dataset, step_size=step_size)\n", + " # Handle distributional forecasts; take only median\n", + " if len(model_fcsts.shape) > 1 and model_fcsts.shape[1] == 3:\n", + " model_fcsts = model_fcsts[:, 0] # Take first column (median)\n", + " # Ensure consistent 2D shape\n", + " if len(model_fcsts.shape) == 1:\n", + " model_fcsts = model_fcsts.reshape(-1, 1)\n", + " model_series_preds.append(model_fcsts)\n", + " model_preds = np.concatenate(model_series_preds, axis=0)\n", + " fcsts_list.append(model_preds)\n", + " # Reset test size to original\n", + " model.set_test_size(test_size=test_size)\n", + " \n", + " # Combine all predictions\n", + " fcsts = np.hstack(fcsts_list)\n", + " \n", + " # Add original y values\n", " original_y = {\n", " self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),\n", " self.time_col: self.ds,\n", " self.target_col: self.dataset.temporal[:, 0].numpy(),\n", " }\n", "\n", - " # Add predictions to forecasts DataFrame\n", + " # Create forecasts DataFrame\n", + " cols = self._get_model_names()\n", + " selected_cols = [col for col in cols if not col.endswith(('-lo', '-hi')) and (not '-' in col or col.endswith('-median'))]\n", " if isinstance(self.uids, pl_Series):\n", - " fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))\n", + " fcsts = pl_DataFrame(dict(zip(selected_cols, fcsts.T)))\n", " Y_df = pl_DataFrame(original_y)\n", " else:\n", - " fcsts = pd.DataFrame(fcsts, columns=cols)\n", + " fcsts = pd.DataFrame(fcsts, columns=selected_cols)\n", " Y_df = pd.DataFrame(original_y).reset_index(drop=True)\n", - " fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])\n", "\n", - " # Add original input df's y to forecasts DataFrame\n", + " # Combine forecasts with dates\n", + " fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])\n", + " \n", + " # Add original values\n", " fcsts_df = ufp.join(fcsts_df, Y_df, how='left', on=[self.id_col, self.time_col])\n", + " \n", + " # Apply scaling if needed\n", " if self.scalers_:\n", " sizes = ufp.counts_by_id(fcsts_df, self.id_col)['counts'].to_numpy()\n", " indptr = np.append(0, sizes.cumsum())\n", @@ -1394,9 +1452,11 @@ " fcsts_df[invert_cols] = self._scalers_target_inverse_transform(\n", " fcsts_df[invert_cols].to_numpy(),\n", " indptr\n", - " ) \n", + " )\n", + " # Drop duplicates when step_size < h\n", + " fcsts_df = fcsts_df.drop_duplicates(subset=[self.id_col, self.time_col], keep='first')\n", " return fcsts_df\n", - " \n", + "\n", " # Save list of models with pytorch lightning save_checkpoint function\n", " def save(self, path: str, model_index: Optional[List]=None, save_dataset: bool=True, overwrite: bool=False):\n", " \"\"\"Save NeuralForecast core class.\n", @@ -1687,7 +1747,93 @@ "execution_count": null, "id": "4bede563-78c0-40ee-ba76-f06f329cd772", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L408){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.fit\n", + "\n", + "> NeuralForecast.fit (df:Union[pandas.core.frame.DataFrame,polars.dataframe\n", + "> .frame.DataFrame,neuralforecast.compat.SparkDataFrame\n", + "> ,Sequence[str],NoneType]=None, static_df:Union[pandas\n", + "> .core.frame.DataFrame,polars.dataframe.frame.DataFram\n", + "> e,neuralforecast.compat.SparkDataFrame,NoneType]=None\n", + "> , val_size:Optional[int]=0,\n", + "> use_init_models:bool=False, verbose:bool=False,\n", + "> id_col:str='unique_id', time_col:str='ds',\n", + "> target_col:str='y', distributed_config:Optional[neura\n", + "> lforecast.common._base_model.DistributedConfig]=None,\n", + "> prediction_intervals:Optional[neuralforecast.utils.Pr\n", + "> edictionIntervals]=None)\n", + "\n", + "*Fit the core.NeuralForecast.\n", + "\n", + "Fit `models` to a large set of time series from DataFrame `df`.\n", + "and store fitted models for later inspection.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If None, a previously stored dataset is required. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| val_size | Optional | 0 | Size of validation set. |\n", + "| use_init_models | bool | False | Use initial model passed when NeuralForecast object was instantiated. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| id_col | str | unique_id | Column that identifies each serie. |\n", + "| time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |\n", + "| target_col | str | y | Column that contains the target. |\n", + "| distributed_config | Optional | None | Configuration to use for DDP training. Currently only spark is supported. |\n", + "| prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |\n", + "| **Returns** | **NeuralForecast** | | **Returns `NeuralForecast` class with fitted `models`.** |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L408){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.fit\n", + "\n", + "> NeuralForecast.fit (df:Union[pandas.core.frame.DataFrame,polars.dataframe\n", + "> .frame.DataFrame,neuralforecast.compat.SparkDataFrame\n", + "> ,Sequence[str],NoneType]=None, static_df:Union[pandas\n", + "> .core.frame.DataFrame,polars.dataframe.frame.DataFram\n", + "> e,neuralforecast.compat.SparkDataFrame,NoneType]=None\n", + "> , val_size:Optional[int]=0,\n", + "> use_init_models:bool=False, verbose:bool=False,\n", + "> id_col:str='unique_id', time_col:str='ds',\n", + "> target_col:str='y', distributed_config:Optional[neura\n", + "> lforecast.common._base_model.DistributedConfig]=None,\n", + "> prediction_intervals:Optional[neuralforecast.utils.Pr\n", + "> edictionIntervals]=None)\n", + "\n", + "*Fit the core.NeuralForecast.\n", + "\n", + "Fit `models` to a large set of time series from DataFrame `df`.\n", + "and store fitted models for later inspection.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If None, a previously stored dataset is required. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| val_size | Optional | 0 | Size of validation set. |\n", + "| use_init_models | bool | False | Use initial model passed when NeuralForecast object was instantiated. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| id_col | str | unique_id | Column that identifies each serie. |\n", + "| time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |\n", + "| target_col | str | y | Column that contains the target. |\n", + "| distributed_config | Optional | None | Configuration to use for DDP training. Currently only spark is supported. |\n", + "| prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |\n", + "| **Returns** | **NeuralForecast** | | **Returns `NeuralForecast` class with fitted `models`.** |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.fit, title_level=3)" ] @@ -1697,7 +1843,83 @@ "execution_count": null, "id": "f90209f6-16da-40a6-8302-1c5c2f66c619", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L777){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.predict\n", + "\n", + "> NeuralForecast.predict (df:Union[pandas.core.frame.DataFrame,polars.dataf\n", + "> rame.frame.DataFrame,neuralforecast.compat.SparkD\n", + "> ataFrame,NoneType]=None, static_df:Union[pandas.c\n", + "> ore.frame.DataFrame,polars.dataframe.frame.DataFr\n", + "> ame,neuralforecast.compat.SparkDataFrame,NoneType\n", + "> ]=None, futr_df:Union[pandas.core.frame.DataFrame\n", + "> ,polars.dataframe.frame.DataFrame,neuralforecast.\n", + "> compat.SparkDataFrame,NoneType]=None,\n", + "> verbose:bool=False, engine=None,\n", + "> level:Optional[List[Union[int,float]]]=None,\n", + "> **data_kwargs)\n", + "\n", + "*Predict with core.NeuralForecast.\n", + "\n", + "Use stored fitted `models` to predict large set of time series from DataFrame `df`.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If a DataFrame is passed, it is used to generate forecasts. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| futr_df | Union | None | DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| engine | NoneType | None | Distributed engine for inference. Only used if df is a spark dataframe or if fit was called on a spark dataframe. |\n", + "| level | Optional | None | Confidence levels between 0 and 100. |\n", + "| data_kwargs | kwargs | | Extra arguments to be passed to the dataset within each model. |\n", + "| **Returns** | **pandas or polars DataFrame** | | **DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`. ** |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L777){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.predict\n", + "\n", + "> NeuralForecast.predict (df:Union[pandas.core.frame.DataFrame,polars.dataf\n", + "> rame.frame.DataFrame,neuralforecast.compat.SparkD\n", + "> ataFrame,NoneType]=None, static_df:Union[pandas.c\n", + "> ore.frame.DataFrame,polars.dataframe.frame.DataFr\n", + "> ame,neuralforecast.compat.SparkDataFrame,NoneType\n", + "> ]=None, futr_df:Union[pandas.core.frame.DataFrame\n", + "> ,polars.dataframe.frame.DataFrame,neuralforecast.\n", + "> compat.SparkDataFrame,NoneType]=None,\n", + "> verbose:bool=False, engine=None,\n", + "> level:Optional[List[Union[int,float]]]=None,\n", + "> **data_kwargs)\n", + "\n", + "*Predict with core.NeuralForecast.\n", + "\n", + "Use stored fitted `models` to predict large set of time series from DataFrame `df`.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If a DataFrame is passed, it is used to generate forecasts. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| futr_df | Union | None | DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| engine | NoneType | None | Distributed engine for inference. Only used if df is a spark dataframe or if fit was called on a spark dataframe. |\n", + "| level | Optional | None | Confidence levels between 0 and 100. |\n", + "| data_kwargs | kwargs | | Extra arguments to be passed to the dataset within each model. |\n", + "| **Returns** | **pandas or polars DataFrame** | | **DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`. ** |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.predict, title_level=3)" ] @@ -1707,7 +1929,113 @@ "execution_count": null, "id": "19a8923a-f4f3-4e60-b9b9-a7088fc9bff5", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1091){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.cross_validation\n", + "\n", + "> NeuralForecast.cross_validation (df:Union[pandas.core.frame.DataFrame,pol\n", + "> ars.dataframe.frame.DataFrame,NoneType]=\n", + "> None, static_df:Union[pandas.core.frame.\n", + "> DataFrame,polars.dataframe.frame.DataFra\n", + "> me,NoneType]=None, n_windows:int=1,\n", + "> step_size:int=1,\n", + "> val_size:Optional[int]=0,\n", + "> test_size:Optional[int]=None,\n", + "> use_init_models:bool=False,\n", + "> verbose:bool=False,\n", + "> refit:Union[bool,int]=False,\n", + "> id_col:str='unique_id',\n", + "> time_col:str='ds', target_col:str='y', p\n", + "> rediction_intervals:Optional[neuralforec\n", + "> ast.utils.PredictionIntervals]=None, lev\n", + "> el:Optional[List[Union[int,float]]]=None\n", + "> , **data_kwargs)\n", + "\n", + "*Temporal Cross-Validation with core.NeuralForecast.\n", + "\n", + "`core.NeuralForecast`'s cross-validation efficiently fits a list of NeuralForecast \n", + "models through multiple windows, in either chained or rolled manner.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If None, a previously stored dataset is required. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| n_windows | int | 1 | Number of windows used for cross validation. |\n", + "| step_size | int | 1 | Step size between each window. |\n", + "| val_size | Optional | 0 | Length of validation size. If passed, set `n_windows=None`. |\n", + "| test_size | Optional | None | Length of test size. If passed, set `n_windows=None`. |\n", + "| use_init_models | bool | False | Use initial model passed when object was instantiated. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| refit | Union | False | Retrain model for each cross validation window.
If False, the models are trained at the beginning and then used to predict each window.
If positive int, the models are retrained every `refit` windows. |\n", + "| id_col | str | unique_id | Column that identifies each serie. |\n", + "| time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |\n", + "| target_col | str | y | Column that contains the target. |\n", + "| prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |\n", + "| level | Optional | None | Confidence levels between 0 and 100. Use with prediction_intervals. |\n", + "| data_kwargs | kwargs | | Extra arguments to be passed to the dataset within each model. |\n", + "| **Returns** | **Union** | | **DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`. ** |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1091){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.cross_validation\n", + "\n", + "> NeuralForecast.cross_validation (df:Union[pandas.core.frame.DataFrame,pol\n", + "> ars.dataframe.frame.DataFrame,NoneType]=\n", + "> None, static_df:Union[pandas.core.frame.\n", + "> DataFrame,polars.dataframe.frame.DataFra\n", + "> me,NoneType]=None, n_windows:int=1,\n", + "> step_size:int=1,\n", + "> val_size:Optional[int]=0,\n", + "> test_size:Optional[int]=None,\n", + "> use_init_models:bool=False,\n", + "> verbose:bool=False,\n", + "> refit:Union[bool,int]=False,\n", + "> id_col:str='unique_id',\n", + "> time_col:str='ds', target_col:str='y', p\n", + "> rediction_intervals:Optional[neuralforec\n", + "> ast.utils.PredictionIntervals]=None, lev\n", + "> el:Optional[List[Union[int,float]]]=None\n", + "> , **data_kwargs)\n", + "\n", + "*Temporal Cross-Validation with core.NeuralForecast.\n", + "\n", + "`core.NeuralForecast`'s cross-validation efficiently fits a list of NeuralForecast \n", + "models through multiple windows, in either chained or rolled manner.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| df | Union | None | DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
If None, a previously stored dataset is required. |\n", + "| static_df | Union | None | DataFrame with columns [`unique_id`] and static exogenous. |\n", + "| n_windows | int | 1 | Number of windows used for cross validation. |\n", + "| step_size | int | 1 | Step size between each window. |\n", + "| val_size | Optional | 0 | Length of validation size. If passed, set `n_windows=None`. |\n", + "| test_size | Optional | None | Length of test size. If passed, set `n_windows=None`. |\n", + "| use_init_models | bool | False | Use initial model passed when object was instantiated. |\n", + "| verbose | bool | False | Print processing steps. |\n", + "| refit | Union | False | Retrain model for each cross validation window.
If False, the models are trained at the beginning and then used to predict each window.
If positive int, the models are retrained every `refit` windows. |\n", + "| id_col | str | unique_id | Column that identifies each serie. |\n", + "| time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |\n", + "| target_col | str | y | Column that contains the target. |\n", + "| prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |\n", + "| level | Optional | None | Confidence levels between 0 and 100. Use with prediction_intervals. |\n", + "| data_kwargs | kwargs | | Extra arguments to be passed to the dataset within each model. |\n", + "| **Returns** | **Union** | | **DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`. ** |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.cross_validation, title_level=3)" ] @@ -1717,7 +2045,53 @@ "execution_count": null, "id": "355df52b", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1258){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.predict_insample\n", + "\n", + "> NeuralForecast.predict_insample (step_size:int=1)\n", + "\n", + "*Predict insample with core.NeuralForecast.\n", + "\n", + "`core.NeuralForecast`'s `predict_insample` uses stored fitted `models`\n", + "to predict historic values of a time series from the stored dataframe.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| step_size | int | 1 | Step size between each window. |\n", + "| **Returns** | **pandas.DataFrame** | | **DataFrame with insample predictions for all fitted `models`. ** |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1258){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.predict_insample\n", + "\n", + "> NeuralForecast.predict_insample (step_size:int=1)\n", + "\n", + "*Predict insample with core.NeuralForecast.\n", + "\n", + "`core.NeuralForecast`'s `predict_insample` uses stored fitted `models`\n", + "to predict historic values of a time series from the stored dataframe.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| step_size | int | 1 | Step size between each window. |\n", + "| **Returns** | **pandas.DataFrame** | | **DataFrame with insample predictions for all fitted `models`. ** |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.predict_insample, title_level=3)" ] @@ -1727,7 +2101,61 @@ "execution_count": null, "id": "93155738-b40f-43d3-ba76-d345bf2583d5", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1403){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.save\n", + "\n", + "> NeuralForecast.save (path:str, model_index:Optional[List]=None,\n", + "> save_dataset:bool=True, overwrite:bool=False)\n", + "\n", + "*Save NeuralForecast core class.\n", + "\n", + "`core.NeuralForecast`'s method to save current status of models, dataset, and configuration.\n", + "Note that by default the `models` are not saving training checkpoints to save disk memory,\n", + "to get them change the individual model `**trainer_kwargs` to include `enable_checkpointing=True`.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| path | str | | Directory to save current status. |\n", + "| model_index | Optional | None | List to specify which models from list of self.models to save. |\n", + "| save_dataset | bool | True | Whether to save dataset or not. |\n", + "| overwrite | bool | False | Whether to overwrite files or not. |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1403){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.save\n", + "\n", + "> NeuralForecast.save (path:str, model_index:Optional[List]=None,\n", + "> save_dataset:bool=True, overwrite:bool=False)\n", + "\n", + "*Save NeuralForecast core class.\n", + "\n", + "`core.NeuralForecast`'s method to save current status of models, dataset, and configuration.\n", + "Note that by default the `models` are not saving training checkpoints to save disk memory,\n", + "to get them change the individual model `**trainer_kwargs` to include `enable_checkpointing=True`.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| path | str | | Directory to save current status. |\n", + "| model_index | Optional | None | List to specify which models from list of self.models to save. |\n", + "| save_dataset | bool | True | Whether to save dataset or not. |\n", + "| overwrite | bool | False | Whether to overwrite files or not. |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.save, title_level=3)" ] @@ -1737,7 +2165,55 @@ "execution_count": null, "id": "0e915796-173c-4400-812f-c6351d5df3be", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/markdown": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1512){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.load\n", + "\n", + "> NeuralForecast.load (path, verbose=False, **kwargs)\n", + "\n", + "*Load NeuralForecast\n", + "\n", + "`core.NeuralForecast`'s method to load checkpoint from path.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| path | str | | Directory with stored artifacts. |\n", + "| verbose | bool | False | |\n", + "| kwargs | | | Additional keyword arguments to be passed to the function
`load_from_checkpoint`. |\n", + "| **Returns** | **NeuralForecast** | | **Instantiated `NeuralForecast` class.** |" + ], + "text/plain": [ + "---\n", + "\n", + "[source](https://github.com/Nixtla/neuralforecast/blob/main/neuralforecast/core.py#L1512){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n", + "\n", + "### NeuralForecast.load\n", + "\n", + "> NeuralForecast.load (path, verbose=False, **kwargs)\n", + "\n", + "*Load NeuralForecast\n", + "\n", + "`core.NeuralForecast`'s method to load checkpoint from path.*\n", + "\n", + "| | **Type** | **Default** | **Details** |\n", + "| -- | -------- | ----------- | ----------- |\n", + "| path | str | | Directory with stored artifacts. |\n", + "| verbose | bool | False | |\n", + "| kwargs | | | Additional keyword arguments to be passed to the function
`load_from_checkpoint`. |\n", + "| **Returns** | **NeuralForecast** | | **Instantiated `NeuralForecast` class.** |" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "show_doc(NeuralForecast.load, title_level=3)" ] @@ -1808,7 +2284,26 @@ "execution_count": null, "id": "e2e35f8f", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Seed set to 1\n" + ] + }, + { + "ename": "NameError", + "evalue": "name 'NeuralForecast' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[10], line 6\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m#| hide\u001b[39;00m\n\u001b[1;32m 2\u001b[0m \u001b[38;5;66;03m# Unitest for early stopping without val_size protection\u001b[39;00m\n\u001b[1;32m 3\u001b[0m models \u001b[38;5;241m=\u001b[39m [\n\u001b[1;32m 4\u001b[0m NHITS(h\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m12\u001b[39m, input_size\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m12\u001b[39m, max_steps\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m1\u001b[39m, early_stop_patience_steps\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m5\u001b[39m)\n\u001b[1;32m 5\u001b[0m ]\n\u001b[0;32m----> 6\u001b[0m nf \u001b[38;5;241m=\u001b[39m \u001b[43mNeuralForecast\u001b[49m(models\u001b[38;5;241m=\u001b[39mmodels, freq\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mM\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 7\u001b[0m test_fail(nf\u001b[38;5;241m.\u001b[39mfit,\n\u001b[1;32m 8\u001b[0m contains\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mSet val_size>0 if early stopping is enabled.\u001b[39m\u001b[38;5;124m'\u001b[39m,\n\u001b[1;32m 9\u001b[0m args\u001b[38;5;241m=\u001b[39m(AirPassengersPanel_train,))\n", + "\u001b[0;31mNameError\u001b[0m: name 'NeuralForecast' is not defined" + ] + } + ], "source": [ "#| hide\n", "# Unitest for early stopping without val_size protection\n", @@ -2070,7 +2565,102 @@ "execution_count": null, "id": "340dd8a9", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001b[36m(_train_tune pid=3565)\u001b[0m /Users/marcopeix/miniconda3/envs/neuralforecast/lib/python3.10/site-packages/ray/tune/integration/pytorch_lightning.py:198: `ray.tune.integration.pytorch_lightning.TuneReportCallback` is deprecated. Use `ray.tune.integration.pytorch_lightning.TuneReportCheckpointCallback` instead.\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m Seed set to 1\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m GPU available: True (mps), used: True\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m TPU available: False, using: 0 TPU cores\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m HPU available: False, using: 0 HPUs\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m `Trainer(val_check_interval=1)` was configured so validation will run after every batch.\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m \n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m | Name | Type | Params | Mode \n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m -------------------------------------------------------\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 0 | loss | MAE | 0 | train\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 1 | padder_train | ConstantPad1d | 0 | train\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 2 | scaler | TemporalNorm | 0 | train\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 3 | mlp | ModuleList | 18.2 K | train\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 4 | out | Linear | 1.5 K | train\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m -------------------------------------------------------\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 19.7 K Trainable params\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 0 Non-trainable params\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 19.7 K Total params\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 0.079 Total estimated model params size (MB)\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 7 Modules in train mode\n", + "\u001b[36m(_train_tune pid=3565)\u001b[0m 0 Modules in eval mode\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sanity Checking DataLoader 0: 0%| | 0/1 [00:00 0: - model_name += str(count_names[model_name]) - cols += [model_name + n for n in model.loss.output_names] - - # Remove test set from dataset and last dates test_size = self.models[0].get_test_size() - # trim the forefront period to ensure `test_size - h` should be module `step_size - # Note: current constraint imposes that all series lengths are equal, so we can take the first series length as sample - series_length = self.dataset.indptr[1] - self.dataset.indptr[0] - _, forefront_offset = np.divmod((series_length - test_size - self.h), step_size) + # Process each series separately + fcsts_dfs = [] + trimmed_datasets = [] - if test_size > 0 or forefront_offset > 0: - trimmed_dataset = TimeSeriesDataset.trim_dataset( - dataset=self.dataset, right_trim=test_size, left_trim=forefront_offset - ) - new_idxs = np.hstack( - [ - np.arange( - self.dataset.indptr[i] + forefront_offset, - self.dataset.indptr[i + 1] - test_size, - ) - for i in range(self.dataset.n_groups) - ] + for i in range(self.dataset.n_groups): + # Calculate series-specific length and offset + series_length = self.dataset.indptr[i + 1] - self.dataset.indptr[i] + _, forefront_offset = np.divmod( + (series_length - test_size - self.h), step_size ) - times = self.ds[new_idxs] - else: - trimmed_dataset = self.dataset - times = self.ds - # Generate dates - fcsts_df = _insample_times( - times=times, - uids=self.uids, - indptr=trimmed_dataset.indptr, - h=self.h, - freq=self.freq, - step_size=step_size, - id_col=self.id_col, - time_col=self.time_col, - ) + if test_size > 0 or forefront_offset > 0: + # Create single-series dataset + series_dataset = TimeSeriesDataset( + temporal=self.dataset.temporal[ + self.dataset.indptr[i] : self.dataset.indptr[i + 1] + ], + temporal_cols=self.dataset.temporal_cols, + indptr=np.array([0, series_length]), + y_idx=self.dataset.y_idx, + ) - col_idx = 0 - fcsts = np.full((len(fcsts_df), len(cols)), np.nan, dtype=np.float32) + # Trim the series + trimmed_series = TimeSeriesDataset.trim_dataset( + dataset=series_dataset, + right_trim=test_size, + left_trim=forefront_offset, + ) - for model in self.models: - # Test size is the number of periods to forecast (full size of trimmed dataset) - model.set_test_size(test_size=trimmed_dataset.max_size) + new_idxs = np.arange( + self.dataset.indptr[i] + forefront_offset, + self.dataset.indptr[i + 1] - test_size, + ) + times = self.ds[new_idxs] + else: + trimmed_series = TimeSeriesDataset( + temporal=self.dataset.temporal[ + self.dataset.indptr[i] : self.dataset.indptr[i + 1] + ], + temporal_cols=self.dataset.temporal_cols, + indptr=np.array([0, series_length]), + y_idx=self.dataset.y_idx, + ) + times = self.ds[self.dataset.indptr[i] : self.dataset.indptr[i + 1]] - # Predict - model_fcsts = model.predict(trimmed_dataset, step_size=step_size) - # Append predictions in memory placeholder - output_length = len(model.loss.output_names) - fcsts[:, col_idx : (col_idx + output_length)] = model_fcsts - col_idx += output_length - model.set_test_size(test_size=test_size) # Set original test_size + series_fcsts_df = _insample_times( + times=times, + uids=self.uids[i : i + 1], + indptr=trimmed_series.indptr, + h=self.h, + freq=self.freq, + step_size=step_size, + id_col=self.id_col, + time_col=self.time_col, + ) + + fcsts_dfs.append(series_fcsts_df) + trimmed_datasets.append(trimmed_series) - # original y + # Combine all series forecasts DataFrames + fcsts_df = pd.concat(fcsts_dfs, axis=0, ignore_index=True) + + # Generate predictions for each model + fcsts_list = [] + for model in self.models: + model_series_preds = [] + for i, trimmed_dataset in enumerate(trimmed_datasets): + # Set test size to current series length + model.set_test_size(test_size=trimmed_dataset.max_size) + # Generate predictions + model_fcsts = model.predict(trimmed_dataset, step_size=step_size) + # Handle distributional forecasts; take only median + if len(model_fcsts.shape) > 1 and model_fcsts.shape[1] == 3: + model_fcsts = model_fcsts[:, 0] # Take first column (median) + # Ensure consistent 2D shape + if len(model_fcsts.shape) == 1: + model_fcsts = model_fcsts.reshape(-1, 1) + model_series_preds.append(model_fcsts) + model_preds = np.concatenate(model_series_preds, axis=0) + fcsts_list.append(model_preds) + # Reset test size to original + model.set_test_size(test_size=test_size) + + # Combine all predictions + fcsts = np.hstack(fcsts_list) + + # Add original y values original_y = { self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)), self.time_col: self.ds, self.target_col: self.dataset.temporal[:, 0].numpy(), } - # Add predictions to forecasts DataFrame + # Create forecasts DataFrame + cols = self._get_model_names() + selected_cols = [ + col + for col in cols + if not col.endswith(("-lo", "-hi")) + and (not "-" in col or col.endswith("-median")) + ] if isinstance(self.uids, pl_Series): - fcsts = pl_DataFrame(dict(zip(cols, fcsts.T))) + fcsts = pl_DataFrame(dict(zip(selected_cols, fcsts.T))) Y_df = pl_DataFrame(original_y) else: - fcsts = pd.DataFrame(fcsts, columns=cols) + fcsts = pd.DataFrame(fcsts, columns=selected_cols) Y_df = pd.DataFrame(original_y).reset_index(drop=True) + + # Combine forecasts with dates fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts]) - # Add original input df's y to forecasts DataFrame + # Add original values fcsts_df = ufp.join(fcsts_df, Y_df, how="left", on=[self.id_col, self.time_col]) + + # Apply scaling if needed if self.scalers_: sizes = ufp.counts_by_id(fcsts_df, self.id_col)["counts"].to_numpy() indptr = np.append(0, sizes.cumsum()) @@ -1374,6 +1411,10 @@ def predict_insample(self, step_size: int = 1): fcsts_df[invert_cols] = self._scalers_target_inverse_transform( fcsts_df[invert_cols].to_numpy(), indptr ) + # Drop duplicates when step_size < h + fcsts_df = fcsts_df.drop_duplicates( + subset=[self.id_col, self.time_col], keep="first" + ) return fcsts_df # Save list of models with pytorch lightning save_checkpoint function