diff --git a/CHANGELOG.md b/CHANGELOG.md index d6c71d3241..4a6127fabb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,9 +12,10 @@ but cannot always guarantee backwards compatibility. Changes that may **break co **Improved** - Improvements to `TorchForecastingModel`: + - 🚀🚀 Optimized `historical_forecasts()` for pre-trained `TorchForecastingModel` running up to 20 times faster than before!. [#2013](https://github.com/unit8co/darts/pull/2013) by [Dennis Bader](https://github.com/dennisbader). - Added callback `darts.utils.callbacks.TFMProgressBar` to customize at which model stages to display the progress bar. [#2020](https://github.com/unit8co/darts/pull/2020) by [Dennis Bader](https://github.com/dennisbader). - Improvements to documentation: - - Adapted the example notebooks to properly apply data transformers and avoid look-ahead bias. [#2020](https://github.com/unit8co/darts/pull/2020) by [Samriddhi Singh](https://github.com/SimTheGreat). + - Adapted the example notebooks to properly apply data transformers and avoid look-ahead bias. [#2020](https://github.com/unit8co/darts/pull/2020) by [Samriddhi Singh](https://github.com/SimTheGreat). **Fixed** - Fixed a bug when calling optimized `historical_forecasts()` for a `RegressionModel` trained with unequal component-specific lags. [#2040](https://github.com/unit8co/darts/pull/2040) by [Antoine Madrona](https://github.com/madtoinou). diff --git a/darts/models/forecasting/ensemble_model.py b/darts/models/forecasting/ensemble_model.py index 2d2312d387..65e0595faf 100644 --- a/darts/models/forecasting/ensemble_model.py +++ b/darts/models/forecasting/ensemble_model.py @@ -496,6 +496,13 @@ def supports_future_covariates(self) -> bool: [model.supports_future_covariates for model in self.forecasting_models] ) + @property + def supports_optimized_historical_forecasts(self) -> bool: + """ + Whether the model supports optimized historical forecasts + """ + return False + @property def _supports_non_retrainable_historical_forecasts(self) -> bool: return self.is_global_ensemble diff --git a/darts/models/forecasting/forecasting_model.py b/darts/models/forecasting/forecasting_model.py index ce8a682ac2..1d3bf14eac 100644 --- a/darts/models/forecasting/forecasting_model.py +++ b/darts/models/forecasting/forecasting_model.py @@ -487,6 +487,7 @@ def _build_forecast_series( custom_components: Union[List[str], None] = None, with_static_covs: bool = True, with_hierarchy: bool = True, + pred_start: Optional[Union[pd.Timestamp, int]] = None, ) -> TimeSeries: """ Builds a forecast time series starting after the end of the training time series, with the @@ -504,6 +505,9 @@ def _build_forecast_series( If set to False, do not copy the input_series `static_covariates` attribute with_hierarchy If set to False, do not copy the input_series `hierarchy` attribute + pred_start + Optionally, give a custom prediction start point. + Returns ------- TimeSeries @@ -518,6 +522,7 @@ def _build_forecast_series( custom_components, with_static_covs, with_hierarchy, + pred_start, ) def _historical_forecasts_sanity_checks(self, *args: Any, **kwargs: Any) -> None: @@ -546,40 +551,18 @@ def _get_last_prediction_time( overlap_end, latest_possible_prediction_start, ): - # when overlap_end=True, we can simply use the precomputed last possible prediction start point + # if `overlap_end` is True, we can use the pre-computed latest possible first prediction point if overlap_end: return latest_possible_prediction_start - # (1) otherwise, we have to step `forecast_horizon` steps back. - # (2) additionally, we check whether the `latest_possible_prediction_start` was shifted back - # from the overall theoretical latest possible prediction start point (which is by definition - # the first time step after the end of the target series) due to too short covariates. - theoretical_latest_prediction_start = series.end_time() + series.freq - if latest_possible_prediction_start == theoretical_latest_prediction_start: - # (1) - last_valid_pred_time = series.time_index[-forecast_horizon] - else: - # (2) - covariates_shift = ( - len( - generate_index( - start=latest_possible_prediction_start, - end=theoretical_latest_prediction_start, - freq=series.freq, - ) - ) - - 2 - ) - last_valid_pred_time = series.time_index[ - -(forecast_horizon + covariates_shift) - ] - return last_valid_pred_time + # otherwise, the upper bound for the last time step of the last prediction is the end of the target series + return series.time_index[-forecast_horizon] def _check_optimizable_historical_forecasts( self, forecast_horizon: int, retrain: Union[bool, int, Callable[..., bool]], - show_warnings=bool, + show_warnings: bool, ) -> bool: """By default, historical forecasts cannot be optimized""" return False @@ -863,7 +846,13 @@ def retrain_func( # predictable time indexes (assuming model is already trained) historical_forecasts_time_index_predict = ( _get_historical_forecast_predict_index( - model, series_, idx, past_covariates_, future_covariates_ + model, + series_, + idx, + past_covariates_, + future_covariates_, + forecast_horizon, + overlap_end, ) ) @@ -876,6 +865,8 @@ def retrain_func( idx, past_covariates_, future_covariates_, + forecast_horizon, + overlap_end, ) ) @@ -910,12 +901,9 @@ def retrain_func( # based on `forecast_horizon` and `overlap_end`, historical_forecasts_time_index is shortened historical_forecasts_time_index = _adjust_historical_forecasts_time_index( - model=model, series=series_, series_idx=idx, historical_forecasts_time_index=historical_forecasts_time_index, - forecast_horizon=forecast_horizon, - overlap_end=overlap_end, start=start, start_format=start_format, show_warnings=show_warnings, @@ -1041,7 +1029,7 @@ def retrain_func( else: forecasts.append(forecast) - if last_points_only: + if last_points_only and last_points_values: forecasts_list.append( TimeSeries.from_times_and_values( generate_index( @@ -2265,6 +2253,13 @@ def _supports_non_retrainable_historical_forecasts(self) -> bool: """GlobalForecastingModel supports historical forecasts without retraining the model""" return True + @property + def supports_optimized_historical_forecasts(self) -> bool: + """ + Whether the model supports optimized historical forecasts + """ + return True + def _sanity_check_predict_likelihood_parameters( self, n: int, output_chunk_length: Union[int, None], num_samples: int ): diff --git a/darts/models/forecasting/pl_forecasting_module.py b/darts/models/forecasting/pl_forecasting_module.py index 821e35745c..79c1902a81 100644 --- a/darts/models/forecasting/pl_forecasting_module.py +++ b/darts/models/forecasting/pl_forecasting_module.py @@ -184,6 +184,7 @@ def __init__( self.pred_roll_size: Optional[int] = None self.pred_batch_size: Optional[int] = None self.pred_n_jobs: Optional[int] = None + self.predict_likelihood_parameters: Optional[bool] = None @property def first_prediction_index(self) -> int: @@ -241,7 +242,11 @@ def predict_step( dataloader_idx the dataloader index """ - input_data_tuple, batch_input_series = batch[:-1], batch[-1] + input_data_tuple, batch_input_series, batch_pred_starts = ( + batch[:-2], + batch[-2], + batch[-1], + ) # number of individual series to be predicted in current batch num_series = input_data_tuple[0].shape[0] @@ -303,8 +308,11 @@ def predict_step( else None, with_static_covs=False if self.predict_likelihood_parameters else True, with_hierarchy=False if self.predict_likelihood_parameters else True, + pred_start=pred_start, + ) + for batch_idx, (input_series, pred_start) in enumerate( + zip(batch_input_series, batch_pred_starts) ) - for batch_idx, input_series in enumerate(batch_input_series) ) return ts_forecasts diff --git a/darts/models/forecasting/regression_model.py b/darts/models/forecasting/regression_model.py index 3c10680c1f..68a7b08d38 100644 --- a/darts/models/forecasting/regression_model.py +++ b/darts/models/forecasting/regression_model.py @@ -47,8 +47,10 @@ create_lagged_training_data, ) from darts.utils.historical_forecasts import ( - _optimized_historical_forecasts_regression_all_points, - _optimized_historical_forecasts_regression_last_points_only, + _check_optimizable_historical_forecasts_global_models, + _optimized_historical_forecasts_all_points, + _optimized_historical_forecasts_last_points_only, + _process_historical_forecast_input, ) from darts.utils.multioutput import MultiOutputRegressor from darts.utils.utils import ( @@ -1068,40 +1070,23 @@ def supports_future_covariates(self) -> bool: def supports_static_covariates(self) -> bool: return True - @property - def supports_optimized_historical_forecasts(self) -> bool: - return True - def _check_optimizable_historical_forecasts( self, forecast_horizon: int, retrain: Union[bool, int, Callable[..., bool]], - show_warnings=bool, + show_warnings: bool, ) -> bool: """ - Historical forecast can be optimized only if `retrain=False` and `forecast_horizon <= self.output_chunk_length` + Historical forecast can be optimized only if `retrain=False` and `forecast_horizon <= model.output_chunk_length` (no auto-regression required). """ - - supported_retrain = (retrain is False) or (retrain == 0) - supported_forecast_horizon = forecast_horizon <= self.output_chunk_length - if supported_retrain and supported_forecast_horizon: - return True - - if show_warnings: - if not supported_retrain: - logger.warning( - "`enable_optimization=True` is ignored because `retrain` is not `False`" - "To hide this warning, set `show_warnings=False` or `enable_optimization=False`." - ) - if not supported_forecast_horizon: - logger.warning( - "`enable_optimization=True` is ignored because " - "`forecast_horizon > self.output_chunk_length`." - "To hide this warning, set `show_warnings=False` or `enable_optimization=False`." - ) - - return False + return _check_optimizable_historical_forecasts_global_models( + model=self, + forecast_horizon=forecast_horizon, + retrain=retrain, + show_warnings=show_warnings, + allow_autoregression=False, + ) def _optimized_historical_forecasts( self, @@ -1122,41 +1107,25 @@ def _optimized_historical_forecasts( TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]] ]: """ + For RegressionModels we create the lagged prediction data once per series using a moving window. + With this, we can avoid having to recreate the tabular input data and call `model.predict()` for each + forecastable index and series. + Additionally, there is a dedicated subroutines for `last_points_only=True` and `last_points_only=False`. + TODO: support forecast_horizon > output_chunk_length (auto-regression) """ - if not self._fit_called: - raise_log( - ValueError("Model has not been fit yet."), - logger, - ) - if forecast_horizon > self.output_chunk_length: - raise_log( - ValueError( - "`forecast_horizon > model.output_chunk_length` requires auto-regression which is not " - "supported in this optimized routine." - ), - logger, - ) - - # manage covariates, usually handled by RegressionModel.predict() - if past_covariates is None and self.past_covariate_series is not None: - past_covariates = [self.past_covariate_series] * len(series) - if future_covariates is None and self.future_covariate_series is not None: - future_covariates = [self.future_covariate_series] * len(series) - - self._verify_static_covariates(series[0].static_covariates) - - if self.encoders.encoding_available: - past_covariates, future_covariates = self.generate_fit_predict_encodings( - n=forecast_horizon, - series=series, - past_covariates=past_covariates, - future_covariates=future_covariates, - ) + series, past_covariates, future_covariates = _process_historical_forecast_input( + model=self, + series=series, + past_covariates=past_covariates, + future_covariates=future_covariates, + forecast_horizon=forecast_horizon, + allow_autoregression=False, + ) # TODO: move the loop here instead of duplicated code in each sub-routine? if last_points_only: - return _optimized_historical_forecasts_regression_last_points_only( + return _optimized_historical_forecasts_last_points_only( model=self, series=series, past_covariates=past_covariates, @@ -1171,7 +1140,7 @@ def _optimized_historical_forecasts( predict_likelihood_parameters=predict_likelihood_parameters, ) else: - return _optimized_historical_forecasts_regression_all_points( + return _optimized_historical_forecasts_all_points( model=self, series=series, past_covariates=past_covariates, diff --git a/darts/models/forecasting/theta.py b/darts/models/forecasting/theta.py index 40f7425431..3df149b525 100644 --- a/darts/models/forecasting/theta.py +++ b/darts/models/forecasting/theta.py @@ -4,7 +4,7 @@ """ import math -from typing import List, Optional, Tuple +from typing import List, Optional import numpy as np import statsmodels.tsa.holtwinters as hw @@ -200,19 +200,6 @@ def min_train_series_length(self) -> int: else: return 3 - @property - def extreme_lags( - self, - ) -> Tuple[ - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[int], - ]: - return -self.min_train_series_length, 0, None, None, None, None - class FourTheta(LocalForecastingModel): def __init__( diff --git a/darts/models/forecasting/torch_forecasting_model.py b/darts/models/forecasting/torch_forecasting_model.py index c0b76c1513..b6643c1db9 100644 --- a/darts/models/forecasting/torch_forecasting_model.py +++ b/darts/models/forecasting/torch_forecasting_model.py @@ -29,9 +29,15 @@ import sys from abc import ABC, abstractmethod from glob import glob -from typing import Any, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal import numpy as np +import pandas as pd import pytorch_lightning as pl import torch from pytorch_lightning import loggers as pl_loggers @@ -76,6 +82,13 @@ SplitCovariatesTrainingDataset, TrainingDataset, ) +from darts.utils.historical_forecasts import ( + _check_optimizable_historical_forecasts_global_models, + _process_historical_forecast_input, +) +from darts.utils.historical_forecasts.optimized_historical_forecasts_torch import ( + _optimized_historical_forecasts, +) from darts.utils.likelihood_models import Likelihood from darts.utils.torch import random_method from darts.utils.utils import get_single_series, seq2series, series2seq @@ -567,6 +580,8 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> InferenceDataset: """ Each model must specify the default training dataset to use. @@ -1337,6 +1352,8 @@ def predict( n=n, past_covariates=past_covariates, future_covariates=future_covariates, + stride=0, + bounds=None, ) predictions = self.predict_from_dataset( @@ -1518,7 +1535,7 @@ def _batch_collate_fn(batch: List[Tuple]) -> Tuple: ) elif elem is None: aggregated.append(None) - elif isinstance(elem, TimeSeries): + else: aggregated.append([sample[i] for sample in batch]) return tuple(aggregated) @@ -1988,6 +2005,72 @@ def _is_probabilistic(self) -> bool: else True # all torch models can be probabilistic (via Dropout) ) + def _check_optimizable_historical_forecasts( + self, + forecast_horizon: int, + retrain: Union[bool, int, Callable[..., bool]], + show_warnings: bool, + ) -> bool: + """ + Historical forecast can be optimized only if `retrain=False` and `forecast_horizon <= model.output_chunk_length` + (no auto-regression required). + """ + return _check_optimizable_historical_forecasts_global_models( + model=self, + forecast_horizon=forecast_horizon, + retrain=retrain, + show_warnings=show_warnings, + allow_autoregression=True, + ) + + def _optimized_historical_forecasts( + self, + series: Optional[Sequence[TimeSeries]], + past_covariates: Optional[Sequence[TimeSeries]] = None, + future_covariates: Optional[Sequence[TimeSeries]] = None, + num_samples: int = 1, + start: Optional[Union[pd.Timestamp, float, int]] = None, + start_format: Literal["position", "value"] = "value", + forecast_horizon: int = 1, + stride: int = 1, + overlap_end: bool = False, + last_points_only: bool = True, + verbose: bool = False, + show_warnings: bool = True, + predict_likelihood_parameters: bool = False, + ) -> Union[ + TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]] + ]: + """ + For TorchForecastingModels we use a strided inference dataset to avoid having to recreate trainers and + datasets for each forecastable index and series. + """ + series, past_covariates, future_covariates = _process_historical_forecast_input( + model=self, + series=series, + past_covariates=past_covariates, + future_covariates=future_covariates, + forecast_horizon=forecast_horizon, + allow_autoregression=True, + ) + forecasts_list = _optimized_historical_forecasts( + model=self, + series=series, + past_covariates=past_covariates, + future_covariates=future_covariates, + num_samples=num_samples, + start=start, + start_format=start_format, + forecast_horizon=forecast_horizon, + stride=stride, + overlap_end=overlap_end, + last_points_only=last_points_only, + show_warnings=show_warnings, + predict_likelihood_parameters=predict_likelihood_parameters, + verbose=verbose, + ) + return forecasts_list + def _load_encoders( self, tfm_save: "TorchForecastingModel", load_encoders: bool ) -> Tuple[SequentialEncoder, Dict]: @@ -2176,9 +2259,13 @@ def _basic_compare_sample(train_sample: Tuple, predict_sample: Tuple): """ For all models relying on one type of covariates only (Past, Future, Dual), we can rely on the fact that training/inference datasets have target and covariates in first and second position to do the checks. + + - `train_sample` comes with last dimension (static covs, target TimeSeries) + - `predict_sample` comes with last dimensions (..., static covs, target TimeSeries, first prediction time stamp) + """ tgt_train, cov_train, static_train = train_sample[:2] + (train_sample[-2],) - tgt_pred, cov_pred, static_pred = predict_sample[:2] + (predict_sample[-2],) + tgt_pred, cov_pred, static_pred = predict_sample[:2] + (predict_sample[-3],) raise_if_not( tgt_train.shape[-1] == tgt_pred.shape[-1], "The provided target has a dimension (width) that does not match the dimension " @@ -2299,6 +2386,8 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> PastCovariatesInferenceDataset: raise_if_not( future_covariates is None, @@ -2309,6 +2398,8 @@ def _build_inference_dataset( target_series=target, covariates=past_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, use_static_covariates=self.uses_static_covariates, @@ -2398,6 +2489,8 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> FutureCovariatesInferenceDataset: raise_if_not( past_covariates is None, @@ -2408,6 +2501,8 @@ def _build_inference_dataset( target_series=target, covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=self.input_chunk_length, use_static_covariates=self.uses_static_covariates, ) @@ -2491,11 +2586,15 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> DualCovariatesInferenceDataset: return DualCovariatesInferenceDataset( target_series=target, covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, use_static_covariates=self.uses_static_covariates, @@ -2579,12 +2678,16 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> MixedCovariatesInferenceDataset: return MixedCovariatesInferenceDataset( target_series=target, past_covariates=past_covariates, future_covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, use_static_covariates=self.uses_static_covariates, @@ -2665,12 +2768,16 @@ def _build_inference_dataset( n: int, past_covariates: Optional[Sequence[TimeSeries]], future_covariates: Optional[Sequence[TimeSeries]], + stride: int = 0, + bounds: Optional[np.ndarray] = None, ) -> SplitCovariatesInferenceDataset: return SplitCovariatesInferenceDataset( target_series=target, past_covariates=past_covariates, future_covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=self.input_chunk_length, output_chunk_length=self.output_chunk_length, use_static_covariates=self.uses_static_covariates, diff --git a/darts/tests/models/forecasting/test_ensemble_models.py b/darts/tests/models/forecasting/test_ensemble_models.py index 719e972694..b8dc2f0c3d 100644 --- a/darts/tests/models/forecasting/test_ensemble_models.py +++ b/darts/tests/models/forecasting/test_ensemble_models.py @@ -151,7 +151,7 @@ def test_call_predict_local_models(self): def test_call_backtest_naive_ensemble_local_models(self): ensemble = NaiveEnsembleModel([NaiveSeasonal(5), Theta(2, 5)]) ensemble.fit(self.series1) - assert ensemble.extreme_lags == (-10, 0, None, None, None, None) + assert ensemble.extreme_lags == (-10, -1, None, None, None, None) ensemble.backtest(self.series1) def test_predict_univariate_ensemble_local_models(self): diff --git a/darts/tests/models/forecasting/test_historical_forecasts.py b/darts/tests/models/forecasting/test_historical_forecasts.py index f161cd9120..93ff1e18d6 100644 --- a/darts/tests/models/forecasting/test_historical_forecasts.py +++ b/darts/tests/models/forecasting/test_historical_forecasts.py @@ -5,7 +5,7 @@ import pytest import darts -from darts import TimeSeries +from darts import TimeSeries, concatenate from darts.dataprocessing.transformers import Scaler from darts.datasets import AirPassengersDataset from darts.logging import get_logger @@ -1003,6 +1003,172 @@ def test_optimized_historical_forecasts_regression_with_component_specific_lags( assert (hfc.time_index == ohfc.time_index).all() np.testing.assert_array_almost_equal(hfc.all_values(), ohfc.all_values()) + @pytest.mark.slow + @pytest.mark.skipif(not TORCH_AVAILABLE, reason="requires torch") + @pytest.mark.parametrize( + "config", + list( + itertools.product( + [False, True], # use covariates + [True, False], # last points only + [False, True], # overlap end + [1, 3], # stride + [ + 3, # horizon < ocl + 5, # horizon == ocl + 7, # horizon > ocl -> autoregression + ], + [False, True], # use integer indexed series + [False, True], # use multi-series + ) + ), + ) + def test_optimized_historical_forecasts_torch_with_encoders(self, config): + ( + use_covs, + last_points_only, + overlap_end, + stride, + horizon, + use_int_idx, + use_multi_series, + ) = config + icl = 3 + ocl = 5 + len_val_series = 10 + series_train, series_val = ( + self.ts_pass_train[:10], + self.ts_pass_val[:len_val_series], + ) + if use_int_idx: + series_train = TimeSeries.from_values( + series_train.all_values(), columns=series_train.columns + ) + series_val = TimeSeries.from_times_and_values( + values=series_val.all_values(), + times=pd.RangeIndex( + start=series_train.end_time() + series_train.freq, + stop=series_train.end_time() + + (len(series_val) + 1) * series_train.freq, + step=series_train.freq, + ), + columns=series_train.columns, + ) + + def f_encoder(idx): + return idx.month if not use_int_idx else idx + + model = NLinearModel( + input_chunk_length=icl, + add_encoders={ + "custom": {"past": [f_encoder], "future": [f_encoder]}, + }, + output_chunk_length=ocl, + n_epochs=1, + **tfm_kwargs, + ) + if use_covs: + pc = tg.gaussian_timeseries( + start=series_train.start_time(), + end=series_val.end_time() + max(0, horizon - ocl) * series_train.freq, + freq=series_train.freq, + ) + fc = tg.gaussian_timeseries( + start=series_train.start_time(), + end=series_val.end_time() + max(ocl, horizon) * series_train.freq, + freq=series_train.freq, + ) + else: + pc, fc = None, None + + model.fit(series_train, past_covariates=pc, future_covariates=fc) + + if use_multi_series: + series_val = [ + series_val, + (series_val + 10) + .shift(1) + .with_columns_renamed(series_val.columns, "test_col"), + ] + pc = [pc, pc.shift(1)] if pc is not None else None + fc = [fc, fc.shift(1)] if fc is not None else None + + hist_fct = model.historical_forecasts( + series=series_val, + past_covariates=pc, + future_covariates=fc, + retrain=False, + last_points_only=last_points_only, + overlap_end=overlap_end, + stride=stride, + forecast_horizon=horizon, + enable_optimization=False, + ) + + opti_hist_fct = model._optimized_historical_forecasts( + series=series_val if isinstance(series_val, list) else [series_val], + past_covariates=pc if (isinstance(pc, list) or pc is None) else [pc], + future_covariates=fc if (isinstance(fc, list) or fc is None) else [fc], + last_points_only=last_points_only, + overlap_end=overlap_end, + stride=stride, + forecast_horizon=horizon, + ) + + if not isinstance(series_val, list): + series_val = [series_val] + hist_fct = [hist_fct] + opti_hist_fct = [opti_hist_fct] + + for series, hfc, ohfc in zip(series_val, hist_fct, opti_hist_fct): + if not isinstance(hfc, list): + hfc = [hfc] + ohfc = [ohfc] + + if not last_points_only and overlap_end: + n_pred_series_expected = 8 + n_pred_points_expected = horizon + first_ts_expected = series.time_index[icl] + last_ts_expected = series.end_time() + series.freq * horizon + elif not last_points_only: # overlap_end = False + n_pred_series_expected = len(series) - icl - horizon + 1 + n_pred_points_expected = horizon + first_ts_expected = series.time_index[icl] + last_ts_expected = series.end_time() + elif overlap_end: # last_points_only = True + n_pred_series_expected = 1 + n_pred_points_expected = 8 + first_ts_expected = series.time_index[icl] + (horizon - 1) * series.freq + last_ts_expected = series.end_time() + series.freq * horizon + else: # last_points_only = True, overlap_end = False + n_pred_series_expected = 1 + n_pred_points_expected = len(series) - icl - horizon + 1 + first_ts_expected = series.time_index[icl] + (horizon - 1) * series.freq + last_ts_expected = series.end_time() + + # to make it simple in case of stride, we assume that non-optimized hist fc returns correct results + if stride > 1: + n_pred_series_expected = len(hfc) + n_pred_points_expected = len(hfc[0]) + first_ts_expected = hfc[0].start_time() + last_ts_expected = hfc[-1].end_time() + + # check length match between optimized and default hist fc + assert len(ohfc) == n_pred_series_expected + assert len(hfc) == len(ohfc) + # check hist fc start + assert ohfc[0].start_time() == first_ts_expected + # check hist fc end + assert ohfc[-1].end_time() == last_ts_expected + for hfc, ohfc in zip(hfc, ohfc): + assert hfc.columns.equals(series.columns) + assert ohfc.columns.equals(series.columns) + assert len(ohfc) == n_pred_points_expected + assert (hfc.time_index == ohfc.time_index).all() + np.testing.assert_array_almost_equal( + hfc.all_values(), ohfc.all_values() + ) + @pytest.mark.slow @pytest.mark.skipif(not TORCH_AVAILABLE, reason="requires torch") @pytest.mark.parametrize("model_config", models_torch_cls_kwargs) @@ -1134,6 +1300,42 @@ def test_torch_auto_start_multiple_no_cov(self, model_config): == self.ts_pass_val.end_time() + forecast_hrz * self.ts_pass_val.freq ) + def test_hist_fc_end_exact_with_covs(self): + model = LinearRegressionModel( + lags=2, + lags_past_covariates=2, + lags_future_covariates=(2, 1), + output_chunk_length=2, + ) + series = tg.sine_timeseries(length=10) + model.fit(series, past_covariates=series, future_covariates=series) + fc = model.historical_forecasts( + series, + past_covariates=series[:-2], + future_covariates=series, + forecast_horizon=2, + stride=2, + overlap_end=False, + last_points_only=True, + retrain=False, + ) + assert len(fc) == 4 + assert fc.end_time() == series.end_time() + + fc = model.historical_forecasts( + series, + past_covariates=series[:-2], + future_covariates=series, + forecast_horizon=2, + stride=2, + overlap_end=False, + last_points_only=False, + retrain=False, + ) + fc = concatenate(fc) + assert len(fc) == 8 + assert fc.end_time() == series.end_time() + @pytest.mark.parametrize("model_config", models_reg_cov_cls_kwargs) def test_regression_auto_start_multiple_with_cov_retrain(self, model_config): forecast_hrz = 10 @@ -1190,11 +1392,12 @@ def test_regression_auto_start_multiple_with_cov_retrain(self, model_config): if max_future_cov_lag is not None and max_future_cov_lag > 0 else 0 ) - # length input - biggest past lag - biggest future lag - forecast horizon - output_chunk_length + # length input - largest past lag - forecast horizon - max(largest future lag, output_chunk_length) theorical_retrain_forecast_length = len(self.ts_pass_val) - ( - -past_lag + future_lag + forecast_hrz + kwargs.get("output_chunk_length", 1) + -past_lag + + forecast_hrz + + max(future_lag + 1, kwargs.get("output_chunk_length", 1)) ) - assert ( len(forecasts_retrain[0]) == len(forecasts_retrain[1]) @@ -1216,7 +1419,11 @@ def test_regression_auto_start_multiple_with_cov_retrain(self, model_config): assert forecasts_retrain[0].start_time() == expected_start # end is shifted back by the biggest future lag - expected_end = self.ts_pass_val.end_time() - future_lag * self.ts_pass_val.freq + if model.output_chunk_length - 1 > future_lag: + shift = 0 + else: + shift = future_lag + expected_end = self.ts_pass_val.end_time() - shift * self.ts_pass_val.freq assert forecasts_retrain[0].end_time() == expected_end @pytest.mark.parametrize("model_config", models_reg_cov_cls_kwargs) @@ -1292,8 +1499,9 @@ def test_regression_auto_start_multiple_with_cov_no_retrain(self, model_config): ) assert forecasts_no_retrain[0].start_time() == expected_start - # end is shifted by the biggest future lag - expected_end = self.ts_pass_val.end_time() - future_lag * self.ts_pass_val.freq + # end is shifted by the biggest future lag if future lag > output_chunk_length + shift_back = future_lag if future_lag + 1 > model.output_chunk_length else 0 + expected_end = self.ts_pass_val.end_time() - shift_back * self.ts_pass_val.freq assert forecasts_no_retrain[0].end_time() == expected_end @pytest.mark.slow @@ -1712,7 +1920,9 @@ def create_model(ocl, use_ll=True, model_type="regression"): ) ) preds = darts.concatenate(preds) - assert np.all(preds.values() == hist_fc.values()) + np.testing.assert_array_almost_equal( + preds.all_values(copy=False), hist_fc.all_values(copy=False) + ) # check equal results between predict and hist fc with higher output_chunk_length and horizon, # and last_points_only=False @@ -1742,5 +1952,10 @@ def create_model(ocl, use_ll=True, model_type="regression"): predict_likelihood_parameters=True, ) ) - assert preds == hist_fc - assert len(hist_fc) == n + 1 + for p, hfc in zip(preds, hist_fc): + assert p.columns.equals(hfc.columns) + assert p.time_index.equals(hfc.time_index) + np.testing.assert_array_almost_equal( + p.all_values(copy=False), hfc.all_values(copy=False) + ) + assert len(hist_fc) == n + 1 diff --git a/darts/tests/models/forecasting/test_regression_ensemble_model.py b/darts/tests/models/forecasting/test_regression_ensemble_model.py index 300688b56d..6d6e3b700e 100644 --- a/darts/tests/models/forecasting/test_regression_ensemble_model.py +++ b/darts/tests/models/forecasting/test_regression_ensemble_model.py @@ -360,7 +360,7 @@ def test_train_with_historical_forecasts_with_covs(self, config): with pytest.raises(ValueError): # covariates are too short (ends too early) - ensemble.fit(ts, future_covariates=future_covs[:-1]) + ensemble.fit(ts, future_covariates=future_covs[: -min(ocl1, ocl2)]) @pytest.mark.skipif(not TORCH_AVAILABLE, reason="requires torch") def test_train_predict_global_models_univar(self): @@ -551,7 +551,7 @@ def test_call_backtest_regression_ensemble_local_models(self): max(m_.min_train_series_length for m_ in ensemble.forecasting_models) == 10 ) # -10 comes from the maximum minimum train series length of all models - assert ensemble.extreme_lags == (-10 - regr_train_n, 0, None, None, None, None) + assert ensemble.extreme_lags == (-10 - regr_train_n, -1, None, None, None, None) ensemble.backtest(self.sine_series) def test_extreme_lags(self): diff --git a/darts/utils/data/inference_dataset.py b/darts/utils/data/inference_dataset.py index 1c75dde120..c60f1f22c0 100644 --- a/darts/utils/data/inference_dataset.py +++ b/darts/utils/data/inference_dataset.py @@ -3,17 +3,22 @@ ----------------- """ +import bisect from abc import ABC, abstractmethod from typing import Optional, Sequence, Tuple, Union import numpy as np +import pandas as pd from torch.utils.data import Dataset from darts import TimeSeries -from darts.logging import raise_if_not +from darts.logging import get_logger, raise_log +from darts.utils.historical_forecasts.utils import _process_predict_start_points_bounds from .utils import CovariateType +logger = get_logger(__name__) + class InferenceDataset(ABC, Dataset): def __init__(self): @@ -39,7 +44,8 @@ def __getitem__(self, idx: int): @staticmethod def _covariate_indexer( target_idx: int, - target_series: TimeSeries, + past_start: Union[pd.Timestamp, int], + past_end: Union[pd.Timestamp, int], covariate_series: TimeSeries, covariate_type: CovariateType, input_chunk_length: int, @@ -54,21 +60,24 @@ def _covariate_indexer( else CovariateType.FUTURE ) - raise_if_not( - main_covariate_type in [CovariateType.PAST, CovariateType.FUTURE], - "`main_covariate_type` must be one of `(CovariateType.PAST, CovariateType.FUTURE)`", - ) + if main_covariate_type not in [CovariateType.PAST, CovariateType.FUTURE]: + raise_log( + ValueError( + "`main_covariate_type` must be one of `(CovariateType.PAST, CovariateType.FUTURE)`" + ), + logger=logger, + ) # we need to use the time index (datetime or integer) here to match the index with the covariate series - past_start = target_series.time_index[-input_chunk_length] - past_end = target_series.time_index[-1] if main_covariate_type is CovariateType.PAST: - future_end = past_end + max(0, n - output_chunk_length) * target_series.freq + future_end = ( + past_end + max(0, n - output_chunk_length) * covariate_series.freq + ) else: # CovariateType.FUTURE - future_end = past_end + max(n, output_chunk_length) * target_series.freq + future_end = past_end + max(n, output_chunk_length) * covariate_series.freq future_start = ( - past_end + target_series.freq if future_end != past_end else future_end + past_end + covariate_series.freq if future_end != past_end else future_end ) if input_chunk_length == 0: # for regression ensemble models @@ -78,21 +87,27 @@ def _covariate_indexer( case_start = ( future_start if covariate_type is CovariateType.FUTURE else past_start ) - raise_if_not( - covariate_series.start_time() <= case_start, - f"For the given forecasting case, the provided {main_covariate_type.value} covariates at dataset index " - f"`{target_idx}` do not extend far enough into the past. The {main_covariate_type.value} covariates " - f"must start at time step `{case_start}`, whereas now they start at time step " - f"`{covariate_series.start_time()}`.", - ) - raise_if_not( - covariate_series.end_time() >= future_end, - f"For the given forecasting horizon `n={n}`, the provided {main_covariate_type.value} covariates " - f"at dataset index `{target_idx}` do not extend far enough into the future. As `" - f"{'n > output_chunk_length' if n > output_chunk_length else 'n <= output_chunk_length'}" - f"` the {main_covariate_type.value} covariates must end at time step `{future_end}`, " - f"whereas now they end at time step `{covariate_series.end_time()}`.", - ) + if not covariate_series.start_time() <= case_start: + raise_log( + ValueError( + f"For the given forecasting case, the provided {main_covariate_type.value} covariates at " + f"dataset index `{target_idx}` do not extend far enough into the past. The " + f"{main_covariate_type.value} covariates must start at time step `{case_start}`, whereas now " + f"they start at time step `{covariate_series.start_time()}`." + ), + logger=logger, + ) + if not covariate_series.end_time() >= future_end: + raise_log( + ValueError( + f"For the given forecasting horizon `n={n}`, the provided {main_covariate_type.value} covariates " + f"at dataset index `{target_idx}` do not extend far enough into the future. As `" + f"{'n > output_chunk_length' if n > output_chunk_length else 'n <= output_chunk_length'}" + f"` the {main_covariate_type.value} covariates must end at time step `{future_end}`, " + f"whereas now they end at time step `{covariate_series.end_time()}`." + ), + logger=logger, + ) # extract the index position (index) from time_index value covariate_start = covariate_series.time_index.get_loc(past_start) @@ -106,6 +121,8 @@ def __init__( target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, covariate_type: CovariateType = CovariateType.PAST, @@ -130,6 +147,13 @@ def __init__( were used during training, the same type of cavariates must be supplied at prediction. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. output_chunk_length @@ -153,13 +177,47 @@ def __init__( self.output_chunk_length = output_chunk_length self.use_static_covariates = use_static_covariates - raise_if_not( - (covariates is None or len(self.target_series) == len(self.covariates)), - "The number of target series must be equal to the number of covariates.", - ) + if not (covariates is None or len(self.target_series) == len(self.covariates)): + raise_log( + ValueError( + "The number of target series must be equal to the number of covariates." + ), + logger=logger, + ) + + if (bounds is not None and stride == 0) or (bounds is None and stride > 0): + raise_log( + ValueError( + "Must supply either both `stride` and `bounds`, or none of them." + ), + logger=logger, + ) + + self.stride = stride + if bounds is None: + self.bounds = bounds + self.cum_lengths = None + self.len_preds = len(self.target_series) + else: + self.bounds, self.cum_lengths = _process_predict_start_points_bounds( + series=target_series, + bounds=bounds, + stride=stride, + ) + self.len_preds = self.cum_lengths[-1] def __len__(self): - return len(self.target_series) + return self.len_preds + + @staticmethod + def find_list_index(index, cumulative_lengths, bounds, stride): + list_index = bisect.bisect_right(cumulative_lengths, index) + bound_left = bounds[list_index, 0] + if list_index == 0: + stride_idx = index * stride + else: + stride_idx = (index - cumulative_lengths[list_index - 1]) * stride + return list_index, bound_left + stride_idx def __getitem__( self, idx: int @@ -169,26 +227,51 @@ def __getitem__( Optional[np.ndarray], Optional[np.ndarray], TimeSeries, + Union[pd.Timestamp, int], ]: - target_series = self.target_series[idx] - raise_if_not( - len(target_series) >= self.input_chunk_length, - f"All input series must have length >= `input_chunk_length` ({self.input_chunk_length}).", - ) + if self.bounds is None: + series_idx, target_start_idx, target_end_idx = ( + idx, + -self.input_chunk_length, + None, + ) + else: + series_idx, target_end_idx = self.find_list_index( + idx, + self.cum_lengths, + self.bounds, + self.stride, + ) + target_start_idx = target_end_idx - self.input_chunk_length + + target_series = self.target_series[series_idx] + if not len(target_series) >= self.input_chunk_length: + raise_log( + ValueError( + f"All input series must have length >= `input_chunk_length` ({self.input_chunk_length})." + ), + logger=logger, + ) # extract past target values + past_end = target_series.time_index[ + target_end_idx - 1 if target_end_idx is not None else -1 + ] past_target = target_series.random_component_values(copy=False)[ - -self.input_chunk_length : + target_start_idx:target_end_idx ] # optionally, extract covariates past_covariate, future_covariate = None, None - covariate_series = None if self.covariates is None else self.covariates[idx] + covariate_series = ( + None if self.covariates is None else self.covariates[series_idx] + ) if covariate_series is not None: # get start and end indices (integer) of the covariates including historic and future parts covariate_start, covariate_end = self._covariate_indexer( - target_idx=idx, - target_series=target_series, + target_idx=series_idx, + past_start=target_series.time_index[target_start_idx], + past_end=past_end, covariate_series=covariate_series, covariate_type=self.covariate_type, input_chunk_length=self.input_chunk_length, @@ -231,6 +314,7 @@ def __getitem__( future_covariate, static_covariate, target_series, + past_end + target_series.freq, ) @@ -240,6 +324,8 @@ def __init__( target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, covariate_type: CovariateType = CovariateType.PAST, @@ -262,6 +348,13 @@ def __init__( if the model was trained with past-observed covariates. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. output_chunk_length @@ -276,6 +369,8 @@ def __init__( target_series=target_series, covariates=covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, covariate_type=covariate_type, @@ -293,6 +388,7 @@ def __getitem__( Optional[np.ndarray], Optional[np.ndarray], TimeSeries, + Union[pd.Timestamp, int], ]: return self.ds[idx] @@ -303,6 +399,8 @@ def __init__( target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, covariate_type: CovariateType = CovariateType.FUTURE, use_static_covariates: bool = True, @@ -319,6 +417,13 @@ def __init__( if the model was trained with future-known covariates. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. use_static_covariates @@ -330,6 +435,8 @@ def __init__( target_series=target_series, covariates=covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=n, covariate_type=covariate_type, @@ -341,9 +448,28 @@ def __len__(self): def __getitem__( self, idx: int - ) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray], TimeSeries]: - past_target, _, future_covariate, static_covariate, target_series = self.ds[idx] - return past_target, future_covariate, static_covariate, target_series + ) -> Tuple[ + np.ndarray, + Optional[np.ndarray], + Optional[np.ndarray], + TimeSeries, + Union[pd.Timestamp, int], + ]: + ( + past_target, + _, + future_covariate, + static_covariate, + target_series, + pred_point, + ) = self.ds[idx] + return ( + past_target, + future_covariate, + static_covariate, + target_series, + pred_point, + ) class DualCovariatesInferenceDataset(InferenceDataset): @@ -352,6 +478,8 @@ def __init__( target_series: Union[TimeSeries, Sequence[TimeSeries]], covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, use_static_covariates: bool = True, @@ -368,6 +496,13 @@ def __init__( if the model was trained with future-known covariates. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. output_chunk_length @@ -382,6 +517,8 @@ def __init__( target_series=target_series, covariates=covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, covariate_type=CovariateType.HISTORIC_FUTURE, @@ -393,6 +530,8 @@ def __init__( target_series=target_series, covariates=covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -409,6 +548,7 @@ def __getitem__( Optional[np.ndarray], Optional[np.ndarray], TimeSeries, + Union[pd.Timestamp, int], ]: ( past_target, @@ -416,14 +556,16 @@ def __getitem__( _, static_covariate, ts_target, + pred_point, ) = self.ds_past[idx] - _, future_covariate, _, _ = self.ds_future[idx] + _, future_covariate, _, _, _ = self.ds_future[idx] return ( past_target, historic_future_covariate, future_covariate, static_covariate, ts_target, + pred_point, ) @@ -434,6 +576,8 @@ def __init__( past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, use_static_covariates: bool = True, @@ -456,6 +600,13 @@ def __init__( if the model was trained with future-known covariates. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. output_chunk_length @@ -470,6 +621,8 @@ def __init__( target_series=target_series, covariates=past_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, covariate_type=CovariateType.PAST, @@ -481,6 +634,8 @@ def __init__( target_series=target_series, covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, use_static_covariates=use_static_covariates, @@ -499,6 +654,7 @@ def __getitem__( Optional[np.ndarray], Optional[np.ndarray], TimeSeries, + Union[pd.Timestamp, int], ]: ( @@ -507,8 +663,9 @@ def __getitem__( future_past_covariate, static_covariate, ts_target, + pred_point, ) = self.ds_past[idx] - _, historic_future_covariate, future_covariate, _, _ = self.ds_future[idx] + _, historic_future_covariate, future_covariate, _, _, _ = self.ds_future[idx] return ( past_target, past_covariate, @@ -517,6 +674,7 @@ def __getitem__( future_past_covariate, static_covariate, ts_target, + pred_point, ) @@ -527,6 +685,8 @@ def __init__( past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, n: int = 1, + stride: int = 0, + bounds: Optional[np.ndarray] = None, input_chunk_length: int = 12, output_chunk_length: int = 1, use_static_covariates: bool = True, @@ -548,6 +708,13 @@ def __init__( if the model was trained with future-known covariates. n Forecast horizon: The number of time steps to predict after the end of the target series. + stride + Optionally, the number of time steps between two consecutive predictions. Can only be used together + with `bounds`. + bounds + Optionally, an array of shape `(n series, 2)`, with the left and right prediction start point boundaries + per series. The boundaries must represent the positional index of the series (0, len(series)). + If provided, `stride` must be `>=1`. input_chunk_length The length of the target series the model takes as input. output_chunk_length @@ -562,6 +729,8 @@ def __init__( target_series=target_series, covariates=past_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, output_chunk_length=output_chunk_length, covariate_type=CovariateType.PAST, @@ -573,6 +742,8 @@ def __init__( target_series=target_series, covariates=future_covariates, n=n, + stride=stride, + bounds=bounds, input_chunk_length=input_chunk_length, covariate_type=CovariateType.FUTURE, use_static_covariates=use_static_covariates, @@ -590,6 +761,7 @@ def __getitem__( Optional[np.ndarray], Optional[np.ndarray], TimeSeries, + Union[pd.Timestamp, int], ]: ( @@ -598,8 +770,9 @@ def __getitem__( future_past_covariate, static_covariate, ts_target, + pred_point, ) = self.ds_past[idx] - _, future_covariate, _, _ = self.ds_future[idx] + _, future_covariate, _, _, _ = self.ds_future[idx] return ( past_target, past_covariate, @@ -607,4 +780,5 @@ def __getitem__( future_past_covariate, static_covariate, ts_target, + pred_point, ) diff --git a/darts/utils/historical_forecasts/__init__.py b/darts/utils/historical_forecasts/__init__.py index 26037e4f29..2edf85ebd4 100644 --- a/darts/utils/historical_forecasts/__init__.py +++ b/darts/utils/historical_forecasts/__init__.py @@ -1,9 +1,11 @@ -from .optimized_historical_forecasts import ( - _optimized_historical_forecasts_regression_all_points, - _optimized_historical_forecasts_regression_last_points_only, +from .optimized_historical_forecasts_regression import ( + _optimized_historical_forecasts_all_points, + _optimized_historical_forecasts_last_points_only, ) from .utils import ( + _check_optimizable_historical_forecasts_global_models, _get_historical_forecast_boundaries, _historical_forecasts_general_checks, _historical_forecasts_start_warnings, + _process_historical_forecast_input, ) diff --git a/darts/utils/historical_forecasts/optimized_historical_forecasts.py b/darts/utils/historical_forecasts/optimized_historical_forecasts_regression.py similarity index 89% rename from darts/utils/historical_forecasts/optimized_historical_forecasts.py rename to darts/utils/historical_forecasts/optimized_historical_forecasts_regression.py index 634dbf3b5b..8d64bbbd45 100644 --- a/darts/utils/historical_forecasts/optimized_historical_forecasts.py +++ b/darts/utils/historical_forecasts/optimized_historical_forecasts_regression.py @@ -18,7 +18,7 @@ logger = get_logger(__name__) -def _optimized_historical_forecasts_regression_last_points_only( +def _optimized_historical_forecasts_last_points_only( model, series: Sequence[TimeSeries], past_covariates: Optional[Sequence[TimeSeries]] = None, @@ -84,12 +84,20 @@ def _optimized_historical_forecasts_regression_last_points_only( shift = model.output_chunk_length - forecast_horizon hist_fct_tgt_start -= shift * unit - hist_fct_pc_start -= shift * unit - hist_fct_fc_start -= shift * unit + hist_fct_pc_start -= ( + shift * unit if hist_fct_pc_start is not None else hist_fct_pc_start + ) + hist_fct_fc_start -= ( + shift * unit if hist_fct_fc_start is not None else hist_fct_fc_start + ) hist_fct_tgt_end -= shift * unit - hist_fct_pc_end -= shift * unit - hist_fct_fc_end -= shift * unit + hist_fct_pc_end -= ( + shift * unit if hist_fct_pc_end is not None else hist_fct_pc_end + ) + hist_fct_fc_end -= ( + shift * unit if hist_fct_fc_end is not None else hist_fct_fc_end + ) X, times = create_lagged_prediction_data( target_series=None @@ -156,7 +164,7 @@ def _optimized_historical_forecasts_regression_last_points_only( return forecasts_list if len(series) > 1 else forecasts_list[0] -def _optimized_historical_forecasts_regression_all_points( +def _optimized_historical_forecasts_all_points( model, series: Sequence[TimeSeries], past_covariates: Optional[Sequence[TimeSeries]] = None, @@ -216,7 +224,7 @@ def _optimized_historical_forecasts_regression_all_points( # Additional shift, to account for the model output_chunk_length shift_start = 0 - shift_end = 0 + # shift_end = 0 if model.output_chunk_length > 1: # used to convert the shift into the appropriate unit unit = freq if series_.has_datetime_index else 1 @@ -225,16 +233,16 @@ def _optimized_historical_forecasts_regression_all_points( shift_start = model.output_chunk_length - 1 hist_fct_tgt_start -= shift_start * unit - hist_fct_pc_start -= shift_start * unit - hist_fct_fc_start -= shift_start * unit - - # last lagged inputs are removed as the last prediction of length output_chunk_length will include them - if model.output_chunk_length == forecast_horizon: - shift_end = model.output_chunk_length - 1 - - hist_fct_tgt_end += shift_end * unit - hist_fct_pc_end += shift_end * unit - hist_fct_fc_end += shift_end * unit + hist_fct_pc_start -= ( + shift_start * unit + if hist_fct_pc_start is not None + else hist_fct_pc_start + ) + hist_fct_fc_start -= ( + shift_start * unit + if hist_fct_fc_start is not None + else hist_fct_fc_start + ) X, _ = create_lagged_prediction_data( target_series=None @@ -277,16 +285,7 @@ def _optimized_historical_forecasts_regression_all_points( len(forecast_components), num_samples, ) - - if ( - forecast_horizon == model.output_chunk_length - and forecast_horizon > 1 - and not overlap_end - ): - forecast = forecast[:-shift_end:stride] - # only keep the prediction of the first forecast_horizon sub-models - else: - forecast = forecast[::stride, :forecast_horizon] + forecast = forecast[::stride, :forecast_horizon] else: # forecast has shape ((forecastable_index_length-1)*num_samples, 1, n_component) # and the components are interleaved @@ -307,9 +306,6 @@ def _optimized_historical_forecasts_regression_all_points( :, :, ] - # apply stride, remove the last windows - elif forecast_horizon > 1 and not overlap_end: - forecast = forecast[: -forecast_horizon + 1 : stride, 0, 0, :, :, :] # apply stride else: forecast = forecast[::stride, 0, 0, :, :, :] diff --git a/darts/utils/historical_forecasts/optimized_historical_forecasts_torch.py b/darts/utils/historical_forecasts/optimized_historical_forecasts_torch.py new file mode 100644 index 0000000000..085c5efa64 --- /dev/null +++ b/darts/utils/historical_forecasts/optimized_historical_forecasts_torch.py @@ -0,0 +1,143 @@ +from typing import List, Optional, Sequence, Union + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +import numpy as np +import pandas as pd + +from darts.logging import get_logger +from darts.timeseries import TimeSeries +from darts.utils.historical_forecasts.utils import ( + _get_historical_forecast_boundaries, + _process_predict_start_points_bounds, +) +from darts.utils.timeseries_generation import generate_index + +logger = get_logger(__name__) + + +def _optimized_historical_forecasts( + model, + series: Sequence[TimeSeries], + past_covariates: Optional[Sequence[TimeSeries]] = None, + future_covariates: Optional[Sequence[TimeSeries]] = None, + num_samples: int = 1, + start: Optional[Union[pd.Timestamp, float, int]] = None, + start_format: Literal["position", "value"] = "value", + forecast_horizon: int = 1, + stride: int = 1, + overlap_end: bool = False, + last_points_only: bool = True, + show_warnings: bool = True, + predict_likelihood_parameters: bool = False, + verbose: bool = False, +) -> Union[ + TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]] +]: + """ + Optimized historical forecasts for TorchForecastingModels + + Rely on _check_optimizable_historical_forecasts() to check that the assumptions are verified. + """ + bounds = [] + for idx, series_ in enumerate(series): + past_covariates_ = past_covariates[idx] if past_covariates is not None else None + future_covariates_ = ( + future_covariates[idx] if future_covariates is not None else None + ) + # obtain forecastable indexes boundaries, adjust target & covariates boundaries accordingly + ( + hist_fct_start, + hist_fct_end, + _, + _, + _, + _, + _, + _, + ) = _get_historical_forecast_boundaries( + model=model, + series=series_, + series_idx=idx, + past_covariates=past_covariates_, + future_covariates=future_covariates_, + start=start, + start_format=start_format, + forecast_horizon=forecast_horizon, + overlap_end=overlap_end, + freq=series_.freq, + show_warnings=show_warnings, + ) + left_bound = series_.get_index_at_point(hist_fct_start) + # latest possible prediction start is one time step after end of target series + if hist_fct_end > series_.end_time(): + right_bound = len(series_) + else: + right_bound = series_.get_index_at_point(hist_fct_end) + bounds.append((left_bound, right_bound)) + + bounds, cum_lengths = _process_predict_start_points_bounds( + series=series, + bounds=np.array(bounds), + stride=stride, + ) + + # TODO: is there a better way to call the super().predict() from TorchForecastingModel, without having to + # import it? (avoid circular imports) + tfm_cls = [ + cls + for cls in model.__class__.__mro__ + if cls.__name__ == "TorchForecastingModel" + ][0] + super(tfm_cls, model).predict( + forecast_horizon, + series, + past_covariates, + future_covariates, + num_samples=num_samples, + predict_likelihood_parameters=predict_likelihood_parameters, + ) + + dataset = model._build_inference_dataset( + target=series, + n=forecast_horizon, + past_covariates=past_covariates, + future_covariates=future_covariates, + stride=stride, + bounds=bounds, + ) + + predictions = model.predict_from_dataset( + forecast_horizon, + dataset, + trainer=None, + verbose=verbose, + predict_likelihood_parameters=predict_likelihood_parameters, + ) + + # torch models return list of time series in order of historical forecasts: we reorder per time series + forecasts_list = [] + for series_idx in range(len(series)): + pred_idx_start = 0 if not series_idx else cum_lengths[series_idx - 1] + pred_idx_end = cum_lengths[series_idx] + preds = predictions[pred_idx_start:pred_idx_end] + if last_points_only: + # torch predictions come with the entire horizon: we extract last values + preds = TimeSeries.from_times_and_values( + times=generate_index( + start=preds[0].end_time(), + length=len(preds), + freq=preds[0].freq * stride, + ), + values=np.concatenate( + [p.all_values(copy=False)[-1:, :, :] for p in preds], axis=0 + ), + columns=preds[0].columns, + static_covariates=preds[0].static_covariates, + hierarchy=preds[0].hierarchy, + ) + forecasts_list.append(preds) + return forecasts_list if len(forecasts_list) > 1 else forecasts_list[0] diff --git a/darts/utils/historical_forecasts/utils.py b/darts/utils/historical_forecasts/utils.py index 75a8f01836..fac3d4029e 100644 --- a/darts/utils/historical_forecasts/utils.py +++ b/darts/utils/historical_forecasts/utils.py @@ -1,5 +1,5 @@ from types import SimpleNamespace -from typing import Any, Callable, Optional, Tuple, Union +from typing import Any, Callable, Optional, Sequence, Tuple, Union try: from typing import Literal @@ -8,6 +8,7 @@ import numpy as np import pandas as pd +from numpy.typing import ArrayLike from darts.logging import get_logger, raise_if_not, raise_log from darts.timeseries import TimeSeries @@ -27,6 +28,7 @@ def _historical_forecasts_general_checks(model, series, kwargs): """ Performs checks common to ForecastingModel and RegressionModel backtest() methods + Parameters ---------- model @@ -241,6 +243,8 @@ def _historical_forecasts_start_warnings( def _get_historical_forecastable_time_index( model, series: TimeSeries, + forecast_horizon: int, + overlap_end: bool, past_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, is_training: Optional[bool] = False, @@ -264,11 +268,21 @@ def _get_historical_forecastable_time_index( timestamp is a timestamp that has a training sample of length at least ``self.training_sample_length`` preceding it. + Additionally, it accounts for auto-regression (forecast_horizon > model.output_chunk_length , and overlap_end. + - In case of auto-regression, we have to step back the last possible predictable/trainable time step if the + covariates are too short + - In case overlap_end=False, the latest possible predictable/trainable time step is shifted back if a + prediction starting from that point would go beyond the end of `series`. + Parameters ---------- series A target series. + forecast_horizon + The forecast horizon for the predictions. + overlap_end + Whether the returned forecasts can go beyond the series' end or not. past_covariates Optionally, a past covariates. future_covariates @@ -288,26 +302,35 @@ def _get_historical_forecastable_time_index( >>> model = LinearRegressionModel(lags=3, output_chunk_length=2) >>> model.fit(train_series) >>> series = TimeSeries.from_times_and_values(pd.date_range('2000-01-01', '2000-01-10'), np.arange(10)) - >>> model._get_historical_forecastable_time_index(series=series, is_training=False) - DatetimeIndex(['2000-01-04', '2000-01-05', '2000-01-06', '2000-01-07', - '2000-01-08', '2000-01-09', '2000-01-10'], - dtype='datetime64[ns]', freq='D') + >>> model._get_historical_forecastable_time_index(series=series, is_training=False, forecast_horizon=1) + DatetimeIndex( + ['2000-01-04', '2000-01-05', '2000-01-06', '2000-01-07', '2000-01-08', '2000-01-09', '2000-01-10'], + dtype='datetime64[ns]', freq='D' + ) >>> model._get_historical_forecastable_time_index(series=series, is_training=True) - DatetimeIndex(['2000-01-06', '2000-01-08', '2000-01-09', '2000-01-10'], - dtype='datetime64[ns]', freq='D') - + DatetimeIndex(['2000-01-06', '2000-01-08', '2000-01-09', '2000-01-10'], dtype='datetime64[ns]', freq='D') >>> model = NBEATSModel(input_chunk_length=3, output_chunk_length=3) >>> model.fit(train_series, train_past_covariates) >>> series = TimeSeries.from_times_and_values(pd.date_range('2000-10-01', '2000-10-09'), np.arange(8)) - >>> past_covariates = TimeSeries.from_times_and_values(pd.date_range('2000-10-03', '2000-10-20'), - np.arange(18)) - >>> model._get_historical_forecastable_time_index(series=series, past_covariates=past_covariates, - is_training=False) + >>> past_covariates = TimeSeries.from_times_and_values( + >>> pd.date_range('2000-10-03', '2000-10-20'), + >>> np.arange(18) + >>> ) + >>> model._get_historical_forecastable_time_index( + >>> series=series, + >>> past_covariates=past_covariates, + >>> is_training=False, + >>> forecast_horizon=1, + >>> ) DatetimeIndex(['2000-10-06', '2000-10-07', '2000-10-08', '2000-10-09'], dtype='datetime64[ns]', freq='D') - >>> model._get_historical_forecastable_time_index(series=series, past_covariates=past_covariates, - is_training=True) - DatetimeIndex(['2000-10-09'], dtype='datetime64[ns]', freq='D') # Only one point is trainable, and - # corresponds to the first point after we reach a common subset of timestamps of training_sample_length length. + >>> # Only one point is trainable; it corresponds to the first point after we reach a common subset of + >>> # timestamps of training_sample_length length. + >>> model._get_historical_forecastable_time_index( + >>> series=series, + >>> past_covariates=past_covariates, + >>> is_training=True, + >>> ) + DatetimeIndex(['2000-10-09'], dtype='datetime64[ns]', freq='D') """ ( @@ -319,59 +342,76 @@ def _get_historical_forecastable_time_index( max_future_cov_lag, ) = model.extreme_lags + # max_target_lag < 0 are local models which can predict for n (horizon) -> infinity (no auto-regression) + is_autoregression = max_target_lag >= 0 and forecast_horizon > max_target_lag + 1 + if min_target_lag is None: min_target_lag = 0 # longest possible time index for target - start = ( - series.start_time() + (max_target_lag - min_target_lag + 1) * series.freq - if is_training - else series.start_time() - min_target_lag * series.freq - ) + if is_training: + start = ( + series.start_time() + (max_target_lag - min_target_lag + 1) * series.freq + ) + else: + start = series.start_time() - min_target_lag * series.freq end = series.end_time() + 1 * series.freq intersect_ = (start, end) # longest possible time index for past covariates if (min_past_cov_lag is not None) and (past_covariates is not None): - start_pc = ( - past_covariates.start_time() - - (min_past_cov_lag - max_target_lag - 1) * past_covariates.freq - if is_training - else past_covariates.start_time() - min_past_cov_lag * past_covariates.freq - ) - end_pc = past_covariates.end_time() - max_past_cov_lag * past_covariates.freq - tmp_ = (start_pc, end_pc) - - if intersect_ is not None: - intersect_ = ( - max([intersect_[0], tmp_[0]]), - min([intersect_[1], tmp_[1]]), + if is_training: + start_pc = ( + past_covariates.start_time() + - (min_past_cov_lag - max_target_lag - 1) * past_covariates.freq ) else: - intersect_ = tmp_ + start_pc = ( + past_covariates.start_time() - min_past_cov_lag * past_covariates.freq + ) - # longest possible time index for future covariates - if (min_future_cov_lag is not None) and (future_covariates is not None): - start_fc = ( - future_covariates.start_time() - - (min_future_cov_lag - max_target_lag - 1) * future_covariates.freq - if is_training - else future_covariates.start_time() - - min_future_cov_lag * future_covariates.freq - ) - end_fc = ( - future_covariates.end_time() - max_future_cov_lag * future_covariates.freq + shift_pc_end = max_past_cov_lag + if is_autoregression: + # we step back in case of auto-regression + shift_pc_end += forecast_horizon - (max_target_lag + 1) + end_pc = past_covariates.end_time() - shift_pc_end * past_covariates.freq + + intersect_ = ( + max([intersect_[0], start_pc]), + min([intersect_[1], end_pc]), ) - tmp_ = (start_fc, end_fc) - if intersect_ is not None: - intersect_ = ( - max([intersect_[0], tmp_[0]]), - min([intersect_[1], tmp_[1]]), + # longest possible time index for future covariates + if (min_future_cov_lag is not None) and (future_covariates is not None): + if is_training: + start_fc = ( + future_covariates.start_time() + - (min_future_cov_lag - max_target_lag - 1) * future_covariates.freq ) else: - intersect_ = tmp_ + start_fc = ( + future_covariates.start_time() + - min_future_cov_lag * future_covariates.freq + ) + + shift_fc_end = max_future_cov_lag + if is_autoregression: + # we step back in case of auto-regression + shift_fc_end += forecast_horizon - (max_target_lag + 1) + end_fc = future_covariates.end_time() - shift_fc_end * future_covariates.freq + + intersect_ = ( + max([intersect_[0], start_fc]), + min([intersect_[1], end_fc]), + ) + + # overlap_end = True -> predictions must not go beyond end of target series + if ( + not overlap_end + and intersect_[1] + (forecast_horizon - 1) * series.freq > series.end_time() + ): + intersect_ = (intersect_[0], end - forecast_horizon * series.freq) # end comes before the start if intersect_[1] < intersect_[0]: @@ -395,12 +435,9 @@ def _get_historical_forecastable_time_index( def _adjust_historical_forecasts_time_index( - model, series: TimeSeries, series_idx: int, historical_forecasts_time_index: TimeIndex, - forecast_horizon: int, - overlap_end: bool, start: Optional[Union[pd.Timestamp, float, int]], start_format: Literal["position", "value"], show_warnings: bool, @@ -409,19 +446,6 @@ def _adjust_historical_forecasts_time_index( Shrink the beginning and end of the historical forecasts time index based on the values of `start`, `forecast_horizon` and `overlap_end`. """ - # shift the end of the forecastable index based on `overlap_end`` and `forecast_horizon`` - last_valid_pred_time = model._get_last_prediction_time( - series, - forecast_horizon, - overlap_end, - latest_possible_prediction_start=historical_forecasts_time_index[-1], - ) - - historical_forecasts_time_index = ( - historical_forecasts_time_index[0], - min(historical_forecasts_time_index[-1], last_valid_pred_time), - ) - # when applicable, shift the start of the forecastable index based on `start` if start is not None: if start_format == "value": @@ -456,13 +480,17 @@ def _get_historical_forecast_predict_index( series_idx: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], + forecast_horizon: int, + overlap_end: bool, ) -> TimeIndex: """Obtain the boundaries of the predictable time indices, raise an exception if None""" historical_forecasts_time_index = _get_historical_forecastable_time_index( - model, - series, - past_covariates, - future_covariates, + model=model, + series=series, + forecast_horizon=forecast_horizon, + overlap_end=overlap_end, + past_covariates=past_covariates, + future_covariates=future_covariates, is_training=False, reduce_to_bounds=True, ) @@ -486,16 +514,20 @@ def _get_historical_forecast_train_index( series_idx: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], + forecast_horizon: int, + overlap_end: bool, ) -> TimeIndex: """ Obtain the boundaries of the time indices usable for training, raise an exception if training is required and no indices are available. """ historical_forecasts_time_index = _get_historical_forecastable_time_index( - model, - series, - past_covariates, - future_covariates, + model=model, + series=series, + forecast_horizon=forecast_horizon, + overlap_end=overlap_end, + past_covariates=past_covariates, + future_covariates=future_covariates, is_training=True, reduce_to_bounds=True, ) @@ -598,17 +630,20 @@ def _get_historical_forecast_boundaries( """ # obtain forecastable indexes boundaries, as values from the time index historical_forecasts_time_index = _get_historical_forecast_predict_index( - model, series, series_idx, past_covariates, future_covariates + model, + series, + series_idx, + past_covariates, + future_covariates, + forecast_horizon, + overlap_end, ) # adjust boundaries based on start, forecast_horizon and overlap_end historical_forecasts_time_index = _adjust_historical_forecasts_time_index( - model=model, series=series, series_idx=series_idx, historical_forecasts_time_index=historical_forecasts_time_index, - forecast_horizon=forecast_horizon, - overlap_end=overlap_end, start=start, start_format=start_format, show_warnings=show_warnings, @@ -646,10 +681,16 @@ def _get_historical_forecast_boundaries( if series.has_range_index: hist_fct_tgt_start = series.get_index_at_point(hist_fct_tgt_start) hist_fct_tgt_end = series.get_index_at_point(hist_fct_tgt_end) + 1 - hist_fct_pc_start = series.get_index_at_point(hist_fct_pc_start) - hist_fct_pc_end = series.get_index_at_point(hist_fct_pc_end) + 1 - hist_fct_fc_start = series.get_index_at_point(hist_fct_fc_start) - hist_fct_fc_end = series.get_index_at_point(hist_fct_fc_end) + 1 + if past_covariates is not None: + hist_fct_pc_start = past_covariates.get_index_at_point(hist_fct_pc_start) + hist_fct_pc_end = past_covariates.get_index_at_point(hist_fct_pc_end) + 1 + else: + hist_fct_pc_start, hist_fct_pc_end = None, None + if future_covariates is not None: + hist_fct_fc_start = future_covariates.get_index_at_point(hist_fct_fc_start) + hist_fct_fc_end = future_covariates.get_index_at_point(hist_fct_fc_end) + 1 + else: + hist_fct_fc_start, hist_fct_fc_end = None, None return ( historical_forecasts_time_index[0], @@ -661,3 +702,113 @@ def _get_historical_forecast_boundaries( hist_fct_fc_start, hist_fct_fc_end, ) + + +def _check_optimizable_historical_forecasts_global_models( + model, + forecast_horizon: int, + retrain: Union[bool, int, Callable[..., bool]], + show_warnings: bool, + allow_autoregression: bool, +) -> bool: + """ + Historical forecast can be optimized only if `retrain=False`. If `allow_autoregression=False`, historical forecasts + can be optimized only if `forecast_horizon <= model.output_chunk_length` (no auto-regression required). + """ + + retrain_off = (retrain is False) or (retrain == 0) + is_autoregressive = forecast_horizon > model.output_chunk_length + if retrain_off and ( + not is_autoregressive or (is_autoregressive and allow_autoregression) + ): + return True + + if show_warnings: + if not retrain_off: + logger.warning( + "`enable_optimization=True` is ignored because `retrain` is not `False` or `0`." + "To hide this warning, set `show_warnings=False` or `enable_optimization=False`." + ) + if is_autoregressive: + logger.warning( + "`enable_optimization=True` is ignored because `forecast_horizon > model.output_chunk_length`." + "To hide this warning, set `show_warnings=False` or `enable_optimization=False`." + ) + + return False + + +def _process_historical_forecast_input( + model, + series: Optional[Sequence[TimeSeries]], + past_covariates: Optional[Sequence[TimeSeries]] = None, + future_covariates: Optional[Sequence[TimeSeries]] = None, + forecast_horizon: int = 1, + allow_autoregression: bool = False, +): + if not model._fit_called: + raise_log( + ValueError("Model has not been fit yet."), + logger, + ) + + if not allow_autoregression and forecast_horizon > model.output_chunk_length: + raise_log( + ValueError( + "`forecast_horizon > model.output_chunk_length` requires auto-regression which is not " + "supported in this optimized routine." + ), + logger, + ) + + # manage covariates, usually handled by RegressionModel.predict() + if past_covariates is None and model.past_covariate_series is not None: + past_covariates = [model.past_covariate_series] * len(series) + if future_covariates is None and model.future_covariate_series is not None: + future_covariates = [model.future_covariate_series] * len(series) + + model._verify_static_covariates(series[0].static_covariates) + + if model.encoders.encoding_available: + past_covariates, future_covariates = model.generate_fit_predict_encodings( + n=forecast_horizon, + series=series, + past_covariates=past_covariates, + future_covariates=future_covariates, + ) + return series, past_covariates, future_covariates + + +def _process_predict_start_points_bounds( + series: Sequence[TimeSeries], bounds: ArrayLike, stride: int +) -> Tuple[np.ndarray, np.ndarray]: + """Processes the historical forecastable time index bounds (earliest, and latest possible prediction + start points). + + Parameters + ---------- + bounds + An array of shape (n series, 2), with the left and right prediction start point bounds per series. + stride + The number of time steps between two consecutive predictions. + + Returns + ------- + (np.ndarray, np.ndarray) + The adjusted bounds: the right bounds are adjusted to be a multiple of 'stride' ahead of the left bounds. + The number of resulting predicted series per input series respecting stride and bounds. + """ + bounds = bounds if isinstance(bounds, np.ndarray) else np.array(bounds) + if not bounds.shape == (len(series), 2): + raise_log( + ValueError( + "`bounds` must be an array like with shape `(n target series, 2)`, " + "with the start and end bounds of each series" + ), + logger=logger, + ) + # we might have some steps that are too long considering stride + steps_too_long = (bounds[:, 1] - bounds[:, 0]) % stride + bounds[:, 1] -= steps_too_long + cum_lengths = np.cumsum(np.diff(bounds) // stride + 1) + return bounds, cum_lengths diff --git a/darts/utils/timeseries_generation.py b/darts/utils/timeseries_generation.py index f4a5be5dbc..d07aaf164a 100644 --- a/darts/utils/timeseries_generation.py +++ b/darts/utils/timeseries_generation.py @@ -753,6 +753,7 @@ def _build_forecast_series( custom_columns: List[str] = None, with_static_covs: bool = True, with_hierarchy: bool = True, + pred_start: Optional[Union[pd.Timestamp, int]] = None, ) -> TimeSeries: """ Builds a forecast time series starting after the end of an input time series, with the @@ -770,6 +771,9 @@ def _build_forecast_series( If set to False, do not copy the input_series `static_covariates` attribute with_hierarchy If set to False, do not copy the input_series `hierarchy` attribute + pred_start + Optionally, give a custom prediction start point. + Returns ------- TimeSeries @@ -780,7 +784,12 @@ def _build_forecast_series( if isinstance(points_preds, np.ndarray) else len(points_preds[0]) ) - time_index = _generate_new_dates(time_index_length, input_series=input_series) + + time_index = _generate_new_dates( + time_index_length, + input_series=input_series, + start=pred_start, + ) values = ( points_preds if isinstance(points_preds, np.ndarray) @@ -798,13 +807,14 @@ def _build_forecast_series( def _generate_new_dates( - n: int, input_series: TimeSeries + n: int, input_series: TimeSeries, start: Optional[Union[pd.Timestamp, int]] = None ) -> Union[pd.DatetimeIndex, pd.RangeIndex]: """ Generates `n` new dates after the end of the specified series """ - last = input_series.end_time() - start = last + input_series.freq + if start is None: + last = input_series.end_time() + start = last + input_series.freq return generate_index( start=start, freq=input_series.freq, length=n, name=input_series.time_dim )