From 3a0566e8dc27d8fbd13174d6a6aed4daddb2405b Mon Sep 17 00:00:00 2001 From: Googler Date: Thu, 13 Jun 2024 13:11:07 -0700 Subject: [PATCH] feat(components): Add Starry Net forecasting pipeline to public preview Signed-off-by: Googler PiperOrigin-RevId: 643098339 --- components/google-cloud/RELEASE.md | 1 + .../docs/source/api/preview/index.rst | 3 +- .../docs/source/api/preview/starry_net.rst | 4 + .../_implementation/starry_net/__init__.py | 41 ++ .../starry_net/dataprep/__init__.py | 13 + .../starry_net/dataprep/component.py | 159 +++++++ .../starry_net/evaluation/__init__.py | 13 + .../starry_net/evaluation/component.py | 23 + .../get_training_artifacts/__init__.py | 13 + .../get_training_artifacts/component.py | 62 +++ .../maybe_set_tfrecord_args/__init__.py | 13 + .../maybe_set_tfrecord_args/component.py | 77 +++ .../starry_net/set_dataprep_args/__init__.py | 13 + .../starry_net/set_dataprep_args/component.py | 97 ++++ .../starry_net/set_eval_args/__init__.py | 13 + .../starry_net/set_eval_args/component.py | 76 +++ .../starry_net/set_test_set/__init__.py | 13 + .../starry_net/set_test_set/component.py | 48 ++ .../starry_net/set_tfrecord_args/__init__.py | 13 + .../starry_net/set_tfrecord_args/component.py | 70 +++ .../starry_net/set_train_args/__init__.py | 13 + .../starry_net/set_train_args/component.py | 90 ++++ .../starry_net/train/__init__.py | 13 + .../starry_net/train/component.py | 209 +++++++++ .../upload_decomposition_plots/__init__.py | 13 + .../upload_decomposition_plots/component.py | 59 +++ .../starry_net/upload_model/__init__.py | 13 + .../starry_net/upload_model/component.py | 23 + .../_implementation/starry_net/version.py | 18 + .../preview/starry_net/__init__.py | 19 + .../preview/starry_net/component.py | 444 ++++++++++++++++++ 31 files changed, 1678 insertions(+), 1 deletion(-) create mode 100644 components/google-cloud/docs/source/api/preview/starry_net.rst create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/component.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/version.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/preview/starry_net/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/preview/starry_net/component.py diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index 190c27ed46c..82b4df957bc 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -1,5 +1,6 @@ ## Upcoming release * Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`. +* Add Starry Net forecasting pipeline to `preview.starry_net.starry_net_pipeline` ## Release 2.14.1 * Add staging and temp location parameters to prophet trainer component. diff --git a/components/google-cloud/docs/source/api/preview/index.rst b/components/google-cloud/docs/source/api/preview/index.rst index 8074a6c6b8d..468f11b3490 100644 --- a/components/google-cloud/docs/source/api/preview/index.rst +++ b/components/google-cloud/docs/source/api/preview/index.rst @@ -8,4 +8,5 @@ Preview Components custom_job dataflow llm - model_evaluation \ No newline at end of file + model_evaluation + starry_net \ No newline at end of file diff --git a/components/google-cloud/docs/source/api/preview/starry_net.rst b/components/google-cloud/docs/source/api/preview/starry_net.rst new file mode 100644 index 00000000000..d80140b8b66 --- /dev/null +++ b/components/google-cloud/docs/source/api/preview/starry_net.rst @@ -0,0 +1,4 @@ +STARRY_NET +========================== + +.. automodule:: preview.starry_net \ No newline at end of file diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/__init__.py new file mode 100644 index 00000000000..b6a2cb7a7e8 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/__init__.py @@ -0,0 +1,41 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from google_cloud_pipeline_components._implementation.starry_net.dataprep.component import dataprep as DataprepOp +from google_cloud_pipeline_components._implementation.starry_net.evaluation.component import evaluation as EvaluationOp +from google_cloud_pipeline_components._implementation.starry_net.get_training_artifacts.component import get_training_artifacts as GetTrainingArtifactsOp +from google_cloud_pipeline_components._implementation.starry_net.maybe_set_tfrecord_args.component import maybe_set_tfrecord_args as MaybeSetTfrecordArgsOp +from google_cloud_pipeline_components._implementation.starry_net.set_dataprep_args.component import set_dataprep_args as SetDataprepArgsOp +from google_cloud_pipeline_components._implementation.starry_net.set_eval_args.component import set_eval_args as SetEvalArgsOp +from google_cloud_pipeline_components._implementation.starry_net.set_test_set.component import set_test_set as SetTestSetOp +from google_cloud_pipeline_components._implementation.starry_net.set_tfrecord_args.component import set_tfrecord_args as SetTfrecordArgsOp +from google_cloud_pipeline_components._implementation.starry_net.set_train_args.component import set_train_args as SetTrainArgsOp +from google_cloud_pipeline_components._implementation.starry_net.train.component import train as TrainOp +from google_cloud_pipeline_components._implementation.starry_net.upload_decomposition_plots.component import upload_decomposition_plots as UploadDecompositionPlotsOp +from google_cloud_pipeline_components._implementation.starry_net.upload_model.component import upload_model as UploadModelOp + + +__all__ = [ + 'DataprepOp', + 'EvaluationOp', + 'GetTrainingArtifactsOp', + 'MaybeSetTfrecordArgsOp', + 'SetDataprepArgsOp', + 'SetEvalArgsOp', + 'SetTestSetOp', + 'SetTfrecordArgsOp', + 'SetTrainArgsOp', + 'TrainOp', + 'UploadDecompositionPlotsOp', + 'UploadModelOp', +] diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/component.py new file mode 100644 index 00000000000..d62ff0acdf1 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/dataprep/component.py @@ -0,0 +1,159 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net component for data preparation.""" + +from google_cloud_pipeline_components import utils +from google_cloud_pipeline_components._implementation.starry_net import version +from kfp import dsl + + +@dsl.container_component +def dataprep( + gcp_resources: dsl.OutputPath(str), + dataprep_dir: dsl.Output[dsl.Artifact], # pytype: disable=unsupported-operands + backcast_length: int, + forecast_length: int, + train_end_date: str, + n_val_windows: int, + n_test_windows: int, + test_set_stride: int, + model_blocks: str, + bigquery_source: str, + ts_identifier_columns: str, + time_column: str, + static_covariate_columns: str, + target_column: str, + machine_type: str, + docker_region: str, + location: str, + project: str, + job_id: str, + job_name_prefix: str, + num_workers: int, + max_num_workers: int, + disk_size_gb: int, + test_set_only: bool, + bigquery_output: str, + gcs_source: str, + gcs_static_covariate_source: str, + encryption_spec_key_name: str, +): + # fmt: off + """Runs Dataprep for training and evaluating a STARRY-Net model. + + Args: + gcp_resources: Serialized JSON of ``gcp_resources`` which tracks the + CustomJob. + dataprep_dir: The gcp bucket path where all dataprep artifacts + are saved. + backcast_length: The length of the input window to feed into the model. + forecast_length: The length of the forecast horizon. + train_end_date: The last date of data to use in the training set. All + subsequent dates are part of the test set. + n_val_windows: The number of windows to use for the val set. If 0, no + validation set is used. + n_test_windows: The number of windows to use for the test set. Must be >= 1. + test_set_stride: The number of timestamps to roll forward when + constructing the val and test sets. + model_blocks: The stringified tuple of blocks to use in the order + that they appear in the model. Possible values are `cleaning`, + `change_point`, `trend`, `hour_of_week-hybrid`, `day_of_week-hybrid`, + `day_of_year-hybrid`, `week_of_year-hybrid`, `month_of_year-hybrid`, + `residual`, `quantile`. + bigquery_source: The BigQuery source of the data. + ts_identifier_columns: The columns that identify unique time series in the BigQuery + data source. + time_column: The column with timestamps in the BigQuery source. + static_covariate_columns: The names of the staic covariates. + target_column: The target column in the Big Query data source. + machine_type: The machine type of the dataflow workers. + docker_region: The docker region, used to determine which image to use. + location: The location where the job is run. + project: The name of the project. + job_id: The pipeline job id. + job_name_prefix: The name of the dataflow job name prefix. + num_workers: The initial number of workers in the dataflow job. + max_num_workers: The maximum number of workers in the dataflow job. + disk_size_gb: The disk size of each dataflow worker. + test_set_only: Whether to only create the test set BigQuery table or also + to create TFRecords for traiing and validation. + bigquery_output: The BigQuery dataset where the test set is written in the + form bq://project.dataset. + gcs_source: The path the csv file of the data source. + gcs_static_covariate_source: The path to the csv file of static covariates. + encryption_spec_key_name: Customer-managed encryption key options for the + CustomJob. If this is set, then all resources created by the CustomJob + will be encrypted with the provided encryption key. + + Returns: + gcp_resources: Serialized JSON of ``gcp_resources`` which tracks the + CustomJob. + dataprep_dir: The gcp bucket path where all dataprep artifacts + are saved. + """ + job_name = f'{job_name_prefix}-{job_id}' + payload = { + 'display_name': job_name, + 'encryption_spec': { + 'kms_key_name': str(encryption_spec_key_name), + }, + 'job_spec': { + 'worker_pool_specs': [{ + 'replica_count': '1', + 'machine_spec': { + 'machine_type': str(machine_type), + }, + 'disk_spec': { + 'boot_disk_type': 'pd-ssd', + 'boot_disk_size_gb': 100, + }, + 'container_spec': { + 'image_uri': f'{docker_region}-docker.pkg.dev/vertex-ai-restricted/starryn/dataprep:captain_{version.DATAPREP_VERSION}', + 'args': [ + '--config=starryn/experiments/configs/vertex.py', + f'--config.datasets.backcast_length={backcast_length}', + f'--config.datasets.forecast_length={forecast_length}', + f'--config.datasets.train_end_date={train_end_date}', + f'--config.datasets.n_val_windows={n_val_windows}', + f'--config.datasets.val_rolling_window_size={test_set_stride}', + f'--config.datasets.n_test_windows={n_test_windows}', + f'--config.datasets.test_rolling_window_size={test_set_stride}', + f'--config.model.static_cov_names={static_covariate_columns}', + f'--config.model.blocks_list={model_blocks}', + f'--bigquery_source={bigquery_source}', + f'--bigquery_output={bigquery_output}', + f'--gcs_source={gcs_source}', + f'--gcs_static_covariate_source={gcs_static_covariate_source}', + f'--ts_identifier_columns={ts_identifier_columns}', + f'--time_column={time_column}', + f'--target_column={target_column}', + f'--job_id={job_name}', + f'--num_workers={num_workers}', + f'--max_num_workers={max_num_workers}', + f'--root_bucket={dataprep_dir.uri}', + f'--disk_size={disk_size_gb}', + f'--machine_type={machine_type}', + f'--test_set_only={test_set_only}', + f'--image_uri={docker_region}-docker.pkg.dev/vertex-ai-restricted/starryn/dataprep:replica_{version.DATAPREP_VERSION}', + ], + }, + }] + } + } + return utils.build_serverless_customjob_container_spec( + project=project, + location=location, + custom_job_payload=payload, + gcp_resources=gcp_resources, + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/component.py new file mode 100644 index 00000000000..88fa79e7708 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/evaluation/component.py @@ -0,0 +1,23 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""StarryNet Evaluation Component.""" + +import os + +from kfp import components + +# TODO(b/346580764) +evaluation = components.load_component_from_file( + os.path.join(os.path.dirname(__file__), 'evaluation.yaml') +) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/component.py new file mode 100644 index 00000000000..f1dc01b4008 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/get_training_artifacts/component.py @@ -0,0 +1,62 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""StarryNet get training artifacts component.""" + +from typing import NamedTuple + +from kfp import dsl + + +@dsl.component(packages_to_install=['tensorflow==2.11.0']) +def get_training_artifacts( + docker_region: str, + trainer_dir: dsl.InputPath(), +) -> NamedTuple( + 'TrainingArtifacts', + image_uri=str, + artifact_uri=str, + prediction_schema_uri=str, + instance_schema_uri=str, +): + # fmt: off + """Gets the artifact URIs from the training job. + + Args: + docker_region: The region from which the training docker image is pulled. + trainer_dir: The directory where training artifacts where stored. + + Returns: + A NamedTuple containing the image_uri for the prediction server, + the artifact_uri with model artifacts, the prediction_schema_uri, + and the instance_schema_uri. + """ + import os # pylint: disable=g-import-not-at-top + import tensorflow as tf # pylint: disable=g-import-not-at-top + + with tf.io.gfile.GFile(os.path.join(trainer_dir, 'trainer.txt')) as f: + private_dir = f.read().strip() + + outputs = NamedTuple( + 'TrainingArtifacts', + image_uri=str, + artifact_uri=str, + prediction_schema_uri=bool, + instance_schema_uri=str, + ) + return outputs( + f'{docker_region}-docker.pkg.dev/vertex-ai/starryn/predictor:20240610_0542_RC00', # pylint: disable=too-many-function-args + private_dir, # pylint: disable=too-many-function-args + os.path.join(private_dir, 'predict_schema.yaml'), # pylint: disable=too-many-function-args + os.path.join(private_dir, 'instance_schema.yaml'), # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/component.py new file mode 100644 index 00000000000..325ecdc011a --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/maybe_set_tfrecord_args/component.py @@ -0,0 +1,77 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net component to set TFRecord args if training with TF Records.""" + +from typing import List, NamedTuple + +from kfp import dsl + + +@dsl.component +def maybe_set_tfrecord_args( + dataprep_previous_run_dir: str, + static_covariates: List[str], +) -> NamedTuple( + 'TfrecordArgs', + static_covariates_vocab_path=str, + train_tf_record_patterns=str, + val_tf_record_patterns=str, + test_tf_record_patterns=str, +): + # fmt: off + """Creates Trainer TFRecord args if training with TF Records. + + Args: + dataprep_previous_run_dir: The dataprep dir from a previous run. Use this + to save time if you've already created TFRecords from your BigQuery + dataset with the same dataprep parameters as this run. + static_covariates: The static covariates to train the model with. + + Returns: + A NamedTuple containing the path to the static covariates covabulary, and + the tf record patterns for the train, validation, and test sets. + """ + + outputs = NamedTuple( + 'TfrecordArgs', + static_covariates_vocab_path=str, + train_tf_record_patterns=str, + val_tf_record_patterns=str, + test_tf_record_patterns=str, + ) + + if static_covariates and dataprep_previous_run_dir: + static_covariates_vocab_path = ( + f'{dataprep_previous_run_dir}/static_covariate_vocab.json' + ) + else: + static_covariates_vocab_path = '' + if dataprep_previous_run_dir: + train_tf_record_patterns = ( + f"('{dataprep_previous_run_dir}/tf_records/train*',)" + ) + val_tf_record_patterns = f"('{dataprep_previous_run_dir}/tf_records/val*',)" + test_tf_record_patterns = ( + f"('{dataprep_previous_run_dir}/tf_records/test_path_for_plot*',)" + ) + else: + train_tf_record_patterns = '()' + val_tf_record_patterns = '()' + test_tf_record_patterns = '()' + return outputs( + static_covariates_vocab_path, # pylint: disable=too-many-function-args + train_tf_record_patterns, # pylint: disable=too-many-function-args + val_tf_record_patterns, # pylint: disable=too-many-function-args + test_tf_record_patterns, # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/component.py new file mode 100644 index 00000000000..34bd19c468c --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_dataprep_args/component.py @@ -0,0 +1,97 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""StarryNet Set Dataprep Args Component.""" + +from typing import List, NamedTuple + +from kfp import dsl + + +@dsl.component +def set_dataprep_args( + model_blocks: List[str], + ts_identifier_columns: List[str], + static_covariate_columns: List[str], + csv_data_path: str, + previous_run_dir: str, + location: str, +) -> NamedTuple( + 'DataprepArgs', + model_blocks=str, + ts_identifier_columns=str, + static_covariate_columns=str, + create_tf_records=bool, + docker_region=str, +): + # fmt: off + """Creates Dataprep args. + + Args: + model_blocks: The list of model blocks to use in the order they will appear + in the model. Possible values are `cleaning`, `change_point`, `trend`, + `hour_of_week`, `day_of_week`, `day_of_year`, `week_of_year`, + `month_of_year`, `residual`. + ts_identifier_columns: The list of ts_identifier columns from the BigQuery + data source. + static_covariate_columns: The list of strings of static covariate names. + csv_data_path: The path to the training data csv in the format + gs://bucket_name/sub_dir/blob_name.csv. + previous_run_dir: The dataprep dir from a previous run. Use this + to save time if you've already created TFRecords from your BigQuery + dataset with the same dataprep parameters as this run. + location: The location where the pipeline is run. + + Returns: + A NamedTuple containing the model blocks formatted as expected by the + dataprep job, the ts_identifier_columns formatted as expected by the + dataprep job, the static_covariate_columns formatted as expected by the + dataprep job, a boolean indicating whether to create tf records, and the + region of the dataprep docker image. + """ + outputs = NamedTuple( + 'DataprepArgs', + model_blocks=str, + ts_identifier_columns=str, + static_covariate_columns=str, + create_tf_records=bool, + docker_region=str, + ) + + def maybe_update_model_blocks(model_blocks: List[str]) -> List[str]: + return [f'{b}-hybrid' if '_of_' in b else b for b in model_blocks] + + def create_name_tuple_from_list(input_list: List[str]) -> str: + if len(input_list) == 1: + return str(input_list).replace('[', '(').replace(']', ',)') + return str(input_list).replace('[', '(').replace(']', ')') + + def set_docker_region(location: str) -> str: + if location.startswith('africa') or location.startswith('europe'): + return 'europe' + elif ( + location.startswith('asia') + or location.startswith('australia') + or location.startswith('me') + ): + return 'asia' + else: + return 'us' + + return outputs( + create_name_tuple_from_list(maybe_update_model_blocks(model_blocks)), # pylint: disable=too-many-function-args + ','.join(ts_identifier_columns), # pylint: disable=too-many-function-args + create_name_tuple_from_list(static_covariate_columns), # pylint: disable=too-many-function-args + False if csv_data_path or previous_run_dir else True, # pylint: disable=too-many-function-args + set_docker_region(location), # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/component.py new file mode 100644 index 00000000000..498913edaa8 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_eval_args/component.py @@ -0,0 +1,76 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net Set Eval Args Component.""" + +from typing import List, NamedTuple + +from kfp import dsl + + +@dsl.component +def set_eval_args( + big_query_source: dsl.Input[dsl.Artifact], quantiles: List[float] +) -> NamedTuple( + 'EvalArgs', + big_query_source=str, + forecasting_type=str, + quantiles=List[float], + prediction_score_column=str, +): + # fmt: off + """Creates Evaluation args. + + Args: + big_query_source: The BQ Table containing the test set. + quantiles: The quantiles the model was trained to output. + + Returns: + A NamedTuple containing big_query_source as a string, forecasting_type + used for evaluation step, quantiles in the format expected by the evaluation + job, and the prediction_score_column used to evaluate. + """ + outputs = NamedTuple( + 'EvalArgs', + big_query_source=str, + forecasting_type=str, + quantiles=List[float], + prediction_score_column=str) + + def set_forecasting_type_for_eval(quantiles: List[float]) -> str: + if quantiles and quantiles[-1] != 0.5: + return 'quantile' + return 'point' + + def set_quantiles_for_eval(quantiles: List[float]) -> List[float]: + updated_q = [q for q in quantiles if q != 0.5] + if updated_q: + updated_q = [0.5] + updated_q + return updated_q + + def set_prediction_score_column( + quantiles: List[float]) -> str: + updated_q = [q for q in quantiles if q != 0.5] + if updated_q: + return 'predicted_x.quantile_predictions' + return 'predicted_x.value' + + project_id = big_query_source.metadata['projectId'] + dataset_id = big_query_source.metadata['datasetId'] + table_id = big_query_source.metadata['tableId'] + return outputs( + f'bq://{project_id}.{dataset_id}.{table_id}', # pylint: disable=too-many-function-args + set_forecasting_type_for_eval(quantiles), # pylint: disable=too-many-function-args + set_quantiles_for_eval(quantiles), # pylint: disable=too-many-function-args + set_prediction_score_column(quantiles), # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/component.py new file mode 100644 index 00000000000..befe6b88d32 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_test_set/component.py @@ -0,0 +1,48 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net Set Test Set Component.""" + +from typing import NamedTuple + +from kfp import dsl + + +@dsl.component(packages_to_install=['tensorflow==2.11.0']) +def set_test_set( + dataprep_dir: dsl.InputPath(), +) -> NamedTuple('TestSetArtifact', uri=str, artifact=dsl.Artifact): + # fmt: off + """Creates test set artifact. + + Args: + dataprep_dir: The bucket where dataprep artifacts are stored. + + Returns: + The test set dsl.Artifact. + """ + import os # pylint: disable=g-import-not-at-top + import json # pylint: disable=g-import-not-at-top + import tensorflow as tf # pylint: disable=g-import-not-at-top + + with tf.io.gfile.GFile( + os.path.join(dataprep_dir, 'big_query_test_set.json') + ) as f: + metadata = json.load(f) + project = metadata['projectId'] + dataset = metadata['datasetId'] + table = metadata['tableId'] + output = NamedTuple('TestSetArtifact', uri=str, artifact=dsl.Artifact) + uri = f'bq://{project}.{dataset}.{table}' + artifact = dsl.Artifact(uri=uri, metadata=metadata) + return output(uri, artifact) # pylint: disable=too-many-function-args diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/component.py new file mode 100644 index 00000000000..3c83742db6f --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_tfrecord_args/component.py @@ -0,0 +1,70 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" "Starry Net component to set TFRecord args.""" + +from typing import List, NamedTuple + +from kfp import dsl + + +@dsl.component +def set_tfrecord_args( + dataprep_dir: dsl.InputPath(), + static_covariates: List[str], +) -> NamedTuple( + 'TfrecordArgs', + static_covariates_vocab_path=str, + train_tf_record_patterns=str, + val_tf_record_patterns=str, + test_tf_record_patterns=str, +): + # fmt: off + """Creates Trainer TFRecord args. + + Args: + dataprep_dir: The dataprep directory where dataprep artifacts are stored. + static_covariates: The static covariates to train the model with. + + Returns: + A NamedTuple containing the path to the static covariates covabulary, and + the tf record patterns for the train, validation, and test sets. + """ + + outputs = NamedTuple( + 'TfrecordArgs', + static_covariates_vocab_path=str, + train_tf_record_patterns=str, + val_tf_record_patterns=str, + test_tf_record_patterns=str, + ) + + if static_covariates and dataprep_dir: + static_covariates_vocab_path = f'{dataprep_dir}/static_covariate_vocab.json' + else: + static_covariates_vocab_path = '' + if dataprep_dir: + train_tf_record_patterns = f"('{dataprep_dir}/tf_records/train*',)" + val_tf_record_patterns = f"('{dataprep_dir}/tf_records/val*',)" + test_tf_record_patterns = ( + f"('{dataprep_dir}/tf_records/test_path_for_plot*',)") + else: + train_tf_record_patterns = '()' + val_tf_record_patterns = '()' + test_tf_record_patterns = '()' + return outputs( + static_covariates_vocab_path, # pylint: disable=too-many-function-args + train_tf_record_patterns, # pylint: disable=too-many-function-args + val_tf_record_patterns, # pylint: disable=too-many-function-args + test_tf_record_patterns, # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/component.py new file mode 100644 index 00000000000..c1ab69fbe26 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/set_train_args/component.py @@ -0,0 +1,90 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net component to set training args.""" + +from typing import List, NamedTuple + +from kfp import dsl + + +@dsl.component +def set_train_args( + quantiles: List[float], + model_blocks: List[str], + static_covariates: List[str], +) -> NamedTuple( + 'TrainArgs', + quantiles=str, + use_static_covariates=bool, + static_covariate_names=str, + model_blocks=str, + freeze_point_forecasts=bool, +): + # fmt: off + """Creates Trainer model args. + + Args: + quantiles: The list of floats representing quantiles. Leave blank if + only training to produce point forecasts. + model_blocks: The list of model blocks to use in the order they will appear + in the model. Possible values are `cleaning`, `change_point`, `trend`, + `hour_of_week`, `day_of_week`, `day_of_year`, `week_of_year`, + `month_of_year`, `residual`. + static_covariates: The list of strings of static covariate names. + + Returns: + A NamedTuple containing the quantiles formatted as expected by the train + job, a bool indicating whether the job should train with static covariates, + the model blocks formatted as expected by the train job, and a bool + indicating whether or not to do two-pass training, fist training for point + forecsats and then quantiles. + """ + + outputs = NamedTuple( + 'TrainArgs', + quantiles=str, + use_static_covariates=bool, + static_covariate_names=str, + model_blocks=str, + freeze_point_forecasts=bool, + ) + + def set_quantiles(input_list: List[float]) -> str: + if not input_list or input_list[0] != 0.5: + input_list = [0.5] + input_list + if len(input_list) == 1: + return str(input_list).replace('[', '(').replace(']', ',)') + return str(input_list).replace('[', '(').replace(']', ')') + + def maybe_update_model_blocks( + quantiles: List[float], model_blocks: List[str]) -> List[str]: + updated_q = [q for q in quantiles if q != 0.5] + model_blocks = [b for b in model_blocks if b != 'quantile'] + if updated_q: + model_blocks.append('quantile') + return [f'{b}-hybrid' if '_of_' in b else b for b in model_blocks] + + def create_name_tuple_from_list(input_list: List[str]) -> str: + if len(input_list) == 1: + return str(input_list).replace('[', '(').replace(']', ',)') + return str(input_list).replace('[', '(').replace(']', ')') + + return outputs( + set_quantiles(quantiles), # pylint: disable=too-many-function-args + True if static_covariates else False, # pylint: disable=too-many-function-args + create_name_tuple_from_list(static_covariates), # pylint: disable=too-many-function-args + create_name_tuple_from_list( # pylint: disable=too-many-function-args + maybe_update_model_blocks(quantiles, model_blocks)), + True if quantiles and quantiles[-1] != 0.5 else False, # pylint: disable=too-many-function-args + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/component.py new file mode 100644 index 00000000000..c1f08e2e78d --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/train/component.py @@ -0,0 +1,209 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container Component for training STARRY-Net.""" + +from google_cloud_pipeline_components import _placeholders +from google_cloud_pipeline_components import utils +from google_cloud_pipeline_components._implementation.starry_net import version + +from kfp import dsl + + +@dsl.container_component +def train( + gcp_resources: dsl.OutputPath(str), + trainer_dir: dsl.Output[dsl.Artifact], # pytype: disable=unsupported-operands + num_epochs: int, + backcast_length: int, + forecast_length: int, + train_end_date: str, + csv_data_path: str, + csv_static_covariates_path: str, + static_covariates_vocab_path: str, + train_tf_record_patterns: str, + val_tf_record_patterns: str, + test_tf_record_patterns: str, + n_decomposition_plots: int, + n_val_windows: int, + n_test_windows: int, + test_set_stride: int, + cleaning_activation_regularizer_coeff: float, + change_point_activation_regularizer_coeff: float, + change_point_output_regularizer_coeff: float, + alpha_upper_bound: float, + beta_upper_bound: float, + phi_lower_bound: float, + b_fixed_val: int, + b0_fixed_val: int, + phi_fixed_val: int, + quantiles: str, + use_static_covariates: bool, + static_covariate_names: str, + model_blocks: str, + freeze_point_forecasts: bool, + machine_type: str, + accelerator_type: str, + docker_region: str, + location: str, + job_id: str, + project: str, + encryption_spec_key_name: str, +): + # fmt: off + """Trains a STARRY-Net model. + + Args: + gcp_resources: Serialized JSON of ``gcp_resources`` which tracks the + CustomJob. + trainer_dir: The gcp bucket path where training artifacts are saved. + num_epochs: The number of epochs to train for. + backcast_length: The length of the input window to feed into the model. + forecast_length: The length of the forecast horizon. + train_end_date: The last date of data to use in the training set. All + subsequent dates are part of the test set. + csv_data_path: The path to the training data csv. + csv_static_covariates_path: The path to the static covariates csv. + static_covariates_vocab_path: The path to the master static covariates vocab + json. + train_tf_record_patterns: The glob patterns to the tf records to use for + training. + val_tf_record_patterns: The glob patterns to the tf records to use for + validation. + test_tf_record_patterns: The glob patterns to the tf records to use for + testing. + n_decomposition_plots: How many decomposition plots to save to tensorboard. + n_val_windows: The number of windows to use for the val set. If 0, no + validation set is used. + n_test_windows: The number of windows to use for the test set. Must be >= 1. + test_set_stride: The number of timestamps to roll forward when + constructing the val and test sets. + cleaning_activation_regularizer_coeff: The regularization coefficient for + the cleaning param estimator's final layer's activation in the cleaning + block. + change_point_activation_regularizer_coeff: The regularization coefficient + for the change point param estimator's final layer's activation in the + change_point block. + change_point_output_regularizer_coeff: The regularization coefficient + for the change point param estimator's output in the change_point block. + alpha_upper_bound: The upper bound for data smooth parameter alpha in the + trend block. + beta_upper_bound: The upper bound for data smooth parameter beta in the + trend block. + phi_lower_bound: The lower bound for damping param phi in the trend block. + b_fixed_val: The fixed value for b in the trend block. If set to anything + other than -1, the trend block will not learn to provide estimates + but use the fixed value directly. + b0_fixed_val: The fixed value for b0 in the trend block. If set to + anything other than -1, the trend block will not learn to provide + estimates but use the fixed value directly. + phi_fixed_val: The fixed value for phi in the trend block. If set to + anything other than -1, the trend block will not learn to provide + estimates but use the fixed value directly. + quantiles: The stringified tuple of quantiles to learn in the quantile + block, e.g., 0.5,0.9,0.95. This should always start with 0.5, + representing the point forecasts. + use_static_covariates: Whether to use static covariates. + static_covariate_names: The stringified tuple of names of the static + covariates. + model_blocks: The stringified tuple of blocks to use in the order + that they appear in the model. Possible values are `cleaning`, + `change_point`, `trend`, `hour_of_week-hybrid`, `day_of_week-hybrid`, + `day_of_year-hybrid`, `week_of_year-hybrid`, `month_of_year-hybrid`, + `residual`, `quantile`. + freeze_point_forecasts: Whether or not to do two pass training, where + first the point forecast model is trained, then the quantile block is, + added, all preceding blocks are frozen, and the quantile block is trained. + This should always be True if quantiles != [0.5]. + machine_type: The machine type. + accelerator_type: The accelerator type. + docker_region: The docker region, used to determine which image to use. + location: Location for creating the custom training job. If not set, + defaults to us-central1. + job_id: The pipeline job id. + project: Project to create the custom training job in. Defaults to + the project in which the PipelineJob is run. + encryption_spec_key_name: Customer-managed encryption key options for the + CustomJob. If this is set, then all resources created by the CustomJob + will be encrypted with the provided encryption key. + + Returns: + gcp_resources: Serialized JSON of ``gcp_resources`` which tracks the + CustomJob. + trainer_dir: The gcp bucket path where training artifacts are saved. + """ + job_name = f'trainer-{job_id}' + payload = { + 'display_name': job_name, + 'encryption_spec': { + 'kms_key_name': str(encryption_spec_key_name), + }, + 'job_spec': { + 'worker_pool_specs': [{ + 'replica_count': '1', + 'machine_spec': { + 'machine_type': str(machine_type), + 'accelerator_type': str(accelerator_type), + 'accelerator_count': 1, + }, + 'disk_spec': { + 'boot_disk_type': 'pd-ssd', + 'boot_disk_size_gb': 100, + }, + 'container_spec': { + 'image_uri': f'{docker_region}-docker.pkg.dev/vertex-ai-restricted/starryn/trainer:{version.TRAINER_VERSION}', + 'args': [ + f'--vertex_experiment_dir={trainer_dir.path}', + f'--vertex_job_id={job_id}', + '--config=analysis/trafficforecast/starryn/experiments/configs/vertex.py', + f'--config.num_epochs={num_epochs}', + f'--config.freeze_point_forecasts={freeze_point_forecasts}', + f'--config.callbacks.tensorboard.n_decomposition_plots={n_decomposition_plots}', + f'--config.datasets.backcast_length={backcast_length}', + f'--config.datasets.forecast_length={forecast_length}', + f'--config.datasets.train_end_date={train_end_date}', + f'--config.datasets.train_path={csv_data_path}', + f'--config.datasets.static_covariates_path={csv_static_covariates_path}', + f'--config.datasets.static_covariates_vocab_path={static_covariates_vocab_path}', + f'--config.datasets.train_tf_record_patterns={train_tf_record_patterns}', + f'--config.datasets.val_tf_record_patterns={val_tf_record_patterns}', + f'--config.datasets.test_tf_record_patterns={test_tf_record_patterns}', + f'--config.datasets.n_val_windows={n_val_windows}', + f'--config.datasets.val_rolling_window_size={test_set_stride}', + f'--config.datasets.n_test_windows={n_test_windows}', + f'--config.datasets.test_rolling_window_size={test_set_stride}', + f'--config.model.regularizer_coeff={cleaning_activation_regularizer_coeff}', + f'--config.model.activation_regularizer_coeff={change_point_activation_regularizer_coeff}', + f'--config.model.output_regularizer_coeff={change_point_output_regularizer_coeff}', + f'--config.model.alpha_upper_bound={alpha_upper_bound}', + f'--config.model.beta_upper_bound={beta_upper_bound}', + f'--config.model.phi_lower_bound={phi_lower_bound}', + f'--config.model.b_fixed_val={b_fixed_val}', + f'--config.model.b0_fixed_val={b0_fixed_val}', + f'--config.model.phi_fixed_val={phi_fixed_val}', + f'--config.model.quantiles={quantiles}', + f'--config.model.use_static_covariates_trend={use_static_covariates}', + f'--config.model.use_static_covariates_calendar={use_static_covariates}', + f'--config.model.static_cov_names={static_covariate_names}', + f'--config.model.blocks_list={model_blocks}', + ], + }, + }] + } + } + return utils.build_serverless_customjob_container_spec( + project=project, + location=location, + custom_job_payload=payload, + gcp_resources=gcp_resources, + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/component.py new file mode 100644 index 00000000000..fe958fa2bb7 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_decomposition_plots/component.py @@ -0,0 +1,59 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net upload decomposition plots component.""" + +from kfp import dsl + + +@dsl.component(packages_to_install=['google-cloud-aiplatform[tensorboard]']) +def upload_decomposition_plots( + project: str, + location: str, + tensorboard_id: str, + display_name: str, + trainer_dir: dsl.InputPath(), +) -> dsl.Artifact: + # fmt: off + """Uploads decomposition plots to Tensorboard. + + Args: + project: The project where the pipeline is run. Defaults to current project. + location: The location where the pipeline components are run. + tensorboard_id: The tensorboard instance ID. + display_name: The diplay name of the job. + trainer_dir: The directory where training artifacts where stored. + + Returns: + A dsl.Artifact where the URI is the URI where decomposition plots can be + viewed. + """ + import os # pylint: disable=g-import-not-at-top + from google.cloud import aiplatform # pylint: disable=g-import-not-at-top + + log_dir = os.path.join(trainer_dir, 'tensorboard', 'r=1:gc=0') + project_number = os.environ['CLOUD_ML_PROJECT_ID'] + aiplatform.init(project=project, location=location) + aiplatform.upload_tb_log( + tensorboard_id=tensorboard_id, + tensorboard_experiment_name=display_name, + logdir=log_dir, + experiment_display_name=display_name, + description=f'Tensorboard for {display_name}', + ) + uri = ( + f'https://{location}.tensorboard.googleusercontent.com/experiment/' + f'projects+{project_number}+locations+{location}+tensorboards+' + f'{tensorboard_id}+experiments+{display_name}/#images' + ) + return dsl.Artifact(uri=uri) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/__init__.py new file mode 100644 index 00000000000..c0b27fe2418 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/component.py new file mode 100644 index 00000000000..1ed008c78c0 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/upload_model/component.py @@ -0,0 +1,23 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Starry Net Upload Model Component.""" + +import os + +from kfp import components + +# TODO(b/346580764) +upload_model = components.load_component_from_file( + os.path.join(os.path.dirname(__file__), 'upload_model.yaml') +) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/version.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/version.py new file mode 100644 index 00000000000..ebb5b7be2de --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/starry_net/version.py @@ -0,0 +1,18 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Version constants for starry net components.""" + +DATAPREP_VERSION = '20240609_1425_RC00' +PREDICTOR_VERSION = '20240610_0542_RC00' +TRAINER_VERSION = '20240610_0542_RC00' diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/__init__.py b/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/__init__.py new file mode 100644 index 00000000000..f4c77af4557 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Starry Net Forecasting Pipeline.""" + +from google_cloud_pipeline_components.preview.starry_net.component import starry_net # pylint: disable=g-importing-member + +__all__ = ['starry_net'] diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/component.py b/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/component.py new file mode 100644 index 00000000000..b50f6bf3076 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/preview/starry_net/component.py @@ -0,0 +1,444 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the pipeline for Starry Net.""" + +from typing import List + +#pylint: disable=g-importing-member +from google_cloud_pipeline_components import _placeholders +from google_cloud_pipeline_components._implementation.starry_net import DataprepOp +from google_cloud_pipeline_components._implementation.starry_net import EvaluationOp +from google_cloud_pipeline_components._implementation.starry_net import GetTrainingArtifactsOp +from google_cloud_pipeline_components._implementation.starry_net import MaybeSetTfrecordArgsOp +from google_cloud_pipeline_components._implementation.starry_net import SetDataprepArgsOp +from google_cloud_pipeline_components._implementation.starry_net import SetEvalArgsOp +from google_cloud_pipeline_components._implementation.starry_net import SetTestSetOp +from google_cloud_pipeline_components._implementation.starry_net import SetTfrecordArgsOp +from google_cloud_pipeline_components._implementation.starry_net import SetTrainArgsOp +from google_cloud_pipeline_components._implementation.starry_net import TrainOp +from google_cloud_pipeline_components._implementation.starry_net import UploadDecompositionPlotsOp +from google_cloud_pipeline_components._implementation.starry_net import UploadModelOp +from google_cloud_pipeline_components.preview.model_evaluation import model_evaluation_import_component +from google_cloud_pipeline_components.types import artifact_types +from google_cloud_pipeline_components.v1 import batch_predict_job +from kfp import dsl + + +@dsl.pipeline +def starry_net( # pylint: disable=dangerous-default-value + tensorboard_instance_id: str, + dataprep_backcast_length: int, + dataprep_forecast_length: int, + dataprep_train_end_date: str, + dataprep_n_val_windows: int, + dataprep_n_test_windows: int, + dataprep_test_set_stride: int, + dataprep_test_set_bigquery_dataset: str, + dataflow_machine_type: str = 'n1-standard-16', + dataflow_max_replica_count: int = 50, + dataflow_starting_replica_count: int = 1, + dataflow_disk_size_gb: int = 50, + dataprep_csv_data_path: str = '', + dataprep_csv_static_covariates_path: str = '', + dataprep_bigquery_data_path: str = '', + dataprep_ts_identifier_columns: List[str] = [], + dataprep_time_column: str = '', + dataprep_target_column: str = '', + dataprep_static_covariate_columns: List[str] = [], + dataprep_previous_run_dir: str = '', + trainer_machine_type: str = 'n1-standard-4', + trainer_accelerator_type: str = 'NVIDIA_TESLA_V100', + trainer_num_epochs: int = 50, + trainer_cleaning_activation_regularizer_coeff: float = 1e3, + trainer_change_point_activation_regularizer_coeff: float = 1e3, + trainer_change_point_output_regularizer_coeff: float = 1e3, + trainer_trend_alpha_upper_bound: float = 0.5, + trainer_trend_beta_upper_bound: float = 0.2, + trainer_trend_phi_lower_bound: float = 0.99, + trainer_trend_b_fixed_val: int = -1, + trainer_trend_b0_fixed_val: int = -1, + trainer_trend_phi_fixed_val: int = -1, + trainer_quantiles: List[float] = [], + trainer_model_blocks: List[str] = [ + 'cleaning', + 'change_point', + 'trend', + 'day_of_week', + 'week_of_year', + 'residual', + ], + tensorboard_n_decomposition_plots: int = 25, + encryption_spec_key_name: str = '', + location: str = _placeholders.LOCATION_PLACEHOLDER, + project: str = _placeholders.PROJECT_ID_PLACEHOLDER, +): + # fmt: off + """Trains a STARRY-Net model. + + Args: + tensorboard_instance_id: The tensorboard instance ID. This must be in same + location as the pipeline job. + dataprep_backcast_length: The length of the context window to feed into the + model. + dataprep_forecast_length: The length of the forecast horizon used in the + loss function during training and during evaluation, so that the model is + optimized to produce forecasts from 0 to H. + dataprep_train_end_date: The last date of data to use in the training and + validation set. All dates after a train_end_date are part of the test set. + If last_forecasted_date is equal to the final day forecasted in the test + set, then last_forecasted_date = + train_end_date + forecast_length + (n_test_windows * test_set_stride). + last_forecasted_date must be included in the dataset. + dataprep_n_val_windows: The number of windows to use for the val set. If 0, + no validation set is used. + dataprep_n_test_windows: The number of windows to use for the test set. Must + be >= 1. See note in dataprep_train_end_date. + dataprep_test_set_stride: The number of timestamps to roll forward + when constructing each window of the val and test sets. See note in + dataprep_train_end_date. + dataprep_test_set_bigquery_dataset: The bigquery dataset where the test set + is saved in the format bq://project.dataset. This must be in the same + region or multi-region as the output or staging bucket of the pipeline and + the dataprep_bigquery_data_path, if using a Big Query data source. + dataflow_machine_type: The type of machine to use for dataprep, + batch prediction, and evaluation jobs.. + dataflow_max_replica_count: The maximum number of replicas to scale the + dataprep, batch prediction, and evaluation jobs. + dataflow_starting_replica_count: The number of replicas to start the + dataprep, batch prediction, and evaluation jobs. + dataflow_disk_size_gb: The disk size of dataflow workers in GB for the + dataprep, batch prediction, and evaluation jobs. + dataprep_csv_data_path: The path to the training data csv in the format + gs://bucket_name/sub_dir/blob_name.csv. Each row of the csv represents + a time series, where the column names are the dates, and the index is the + unique time series names. + dataprep_csv_static_covariates_path: The path to the static covariates csv. + Each row of the csv represents the static covariate values for the series, + where the column names are the static covariate names, and the + index is the unique time series names. The index values must match the + index values of dataprep_csv_data_path. The column values must match + dataprep_static_covariate_columns. + dataprep_bigquery_data_path: The path to the training data on BigQuery in + the format bq://project.dataset.table_id. You should only set this or + csv_data_path. This must be in the same region or multi-region as the + output or staging bucket of the pipeline and the + dataprep_test_set_bigquery_dataset. + dataprep_ts_identifier_columns: The list of ts_identifier columns from the + BigQuery data source. These columns are used to distinguish the different + time series, so that if multiple rows have identical ts_identifier columns, + the series is generated by summing the target columns for each timestamp. + If the big query dataset has a different This is only used if + dataprep_bigquery_data_path is set. + dataprep_time_column: The time column from the BigQuery data source. This is + only used if dataprep_bigquery_data_path is set. + dataprep_target_column: The column to be forecasted from the BigQuery data + source. This is only used if dataprep_bigquery_data_path is set. + dataprep_static_covariate_columns: The list of strings of static covariate + names. This needs to be set if training with static covariates regardless + of whether you're using bigquery_data_path or csv_static_covariates_path. + dataprep_previous_run_dir: The dataprep dir from a previous run. Use this + to save time if you've already created TFRecords from your BigQuery + dataset with the same dataprep parameters as this run. + trainer_machine_type: The machine type for training. Must be compatible with + trainer_accelerator_type. + trainer_accelerator_type: The accelerator type for training. + trainer_num_epochs: The number of epochs to train for. + trainer_cleaning_activation_regularizer_coeff: The L1 regularization + coefficient for the anomaly detection activation in the cleaning block. + The larger the value, the less aggressive the cleaning, so fewer and only + the most extreme anomalies are detected. A rule of thumb is that this + value should be about the same scale of your series. + trainer_change_point_activation_regularizer_coeff: The L1 regularization + coefficient for the change point detection activation in the change point + block. The larger the value, the less aggressive the cleaning, so fewer + and only the most extreme change points are detected. A rule of thumb is + that this value should be a ratio of the + trainer_change_point_output_regularizer_coeff to determine the sparsity + of the changes. If you want the model to detect many small step changes + this number should be smaller than the + trainer_change_point_output_regularizer_coeff. To detect fewer large step + changes, this number should be about equal to or larger than the + trainer_change_point_output_regularizer_coeff. + trainer_change_point_output_regularizer_coeff: The L2 regularization + penalty applied to the mean lag-one difference of the cleaned output of + the change point block. Intutively, + trainer_change_point_activation_regularizer_coeff determines how many + steps to detect in the series, while this parameter determines how + aggressively to clean the detected steps. The higher this value, the more + aggressive the cleaning. A rule of thumb is that this value should be + about the same scale of your series. + trainer_trend_alpha_upper_bound: The upper bound for data smooth parameter + alpha in the trend block. + trainer_trend_beta_upper_bound: The upper bound for trend smooth parameter + beta in the trend block. + trainer_trend_phi_lower_bound: The lower bound for damping param phi in the + trend block. + trainer_trend_b_fixed_val: The fixed value for long term trend parameter b + in the trend block. If set to anything other than -1, the trend block will + not learn to provide estimates but use the fixed value directly. + trainer_trend_b0_fixed_val: The fixed value for starting short-term trend + parameter b0 in the trend block. If set to anything other than -1, the + trend block will not learn to provide estimates but use the fixed value + directly. + trainer_trend_phi_fixed_val: The fixed value for the damping parameter phi + in the trend block. If set to anything other than -1, the trend block will + not learn to provide estimates but use the fixed value directly. + trainer_quantiles: The list of floats representing quantiles. Leave blank if + only training to produce point forecasts. + trainer_model_blocks: The list of model blocks to use in the order they will + appear in the model. Possible values are `cleaning`, `change_point`, + `trend`, `hour_of_week`, `day_of_week`, `day_of_year`, `week_of_year`, + `month_of_year`, `residual`. + tensorboard_n_decomposition_plots: How many decomposition plots from the + test set to save to tensorboard. + encryption_spec_key_name: Customer-managed encryption key options for the + CustomJob. If this is set, then all resources created by the CustomJob + will be encrypted with the provided encryption key. + location: The location where the pipeline components are run. + project: The project where the pipeline is run. Defaults to current project. + """ + job_id = dsl.PIPELINE_JOB_NAME_PLACEHOLDER + create_dataprep_args_task = SetDataprepArgsOp( + model_blocks=trainer_model_blocks, + ts_identifier_columns=dataprep_ts_identifier_columns, + static_covariate_columns=dataprep_static_covariate_columns, + csv_data_path=dataprep_csv_data_path, + previous_run_dir=dataprep_previous_run_dir, + location=location, + ) + create_trainer_args_task = SetTrainArgsOp( + quantiles=trainer_quantiles, + model_blocks=trainer_model_blocks, + static_covariates=dataprep_static_covariate_columns, + ) + test_set_task = DataprepOp( + backcast_length=dataprep_backcast_length, + forecast_length=dataprep_forecast_length, + train_end_date=dataprep_train_end_date, + n_val_windows=dataprep_n_val_windows, + n_test_windows=dataprep_n_test_windows, + test_set_stride=dataprep_test_set_stride, + model_blocks=create_dataprep_args_task.outputs['model_blocks'], + bigquery_source=dataprep_bigquery_data_path, + ts_identifier_columns=create_dataprep_args_task.outputs[ + 'ts_identifier_columns'], + time_column=dataprep_time_column, + static_covariate_columns=create_dataprep_args_task.outputs[ + 'static_covariate_columns'], + target_column=dataprep_target_column, + machine_type=dataflow_machine_type, + docker_region=create_dataprep_args_task.outputs['docker_region'], + location=location, + project=project, + job_id=job_id, + job_name_prefix='test-set', + num_workers=dataflow_starting_replica_count, + max_num_workers=dataflow_max_replica_count, + disk_size_gb=dataflow_disk_size_gb, + test_set_only=True, + bigquery_output=dataprep_test_set_bigquery_dataset, + gcs_source=dataprep_csv_data_path, + gcs_static_covariate_source=dataprep_csv_static_covariates_path, + encryption_spec_key_name=encryption_spec_key_name + ) + test_set_task.set_display_name('create-test-set') + set_test_set_task = SetTestSetOp( + dataprep_dir=test_set_task.outputs['dataprep_dir']) + with dsl.If(create_dataprep_args_task.outputs['create_tf_records'] == True, # pylint: disable=singleton-comparison + 'create-tf-records'): + create_tf_records_task = DataprepOp( + backcast_length=dataprep_backcast_length, + forecast_length=dataprep_forecast_length, + train_end_date=dataprep_train_end_date, + n_val_windows=dataprep_n_val_windows, + n_test_windows=dataprep_n_test_windows, + test_set_stride=dataprep_test_set_stride, + model_blocks=create_dataprep_args_task.outputs['model_blocks'], + bigquery_source=dataprep_bigquery_data_path, + ts_identifier_columns=create_dataprep_args_task.outputs[ + 'ts_identifier_columns'], + time_column=dataprep_time_column, + static_covariate_columns=create_dataprep_args_task.outputs[ + 'static_covariate_columns'], + target_column=dataprep_target_column, + machine_type=dataflow_machine_type, + docker_region=create_dataprep_args_task.outputs['docker_region'], + location=location, + project=project, + job_id=job_id, + job_name_prefix='tf-records', + num_workers=dataflow_starting_replica_count, + max_num_workers=dataflow_max_replica_count, + disk_size_gb=dataflow_disk_size_gb, + test_set_only=False, + bigquery_output=dataprep_test_set_bigquery_dataset, + gcs_source=dataprep_csv_data_path, + gcs_static_covariate_source=dataprep_csv_static_covariates_path, + encryption_spec_key_name=encryption_spec_key_name + ) + create_tf_records_task.set_display_name('create-tf-records') + set_tfrecord_args_this_run_task = ( + SetTfrecordArgsOp( + dataprep_dir=create_tf_records_task.outputs['dataprep_dir'], + static_covariates=dataprep_static_covariate_columns)) + with dsl.Else('skip-tf-record-generation'): + set_tfrecord_args_previous_run_task = ( + MaybeSetTfrecordArgsOp( + dataprep_previous_run_dir=dataprep_previous_run_dir, + static_covariates=dataprep_static_covariate_columns)) + set_tfrecord_args_previous_run_task.set_display_name( + 'set_tfrecord_args_previous_run') + static_covariates_vocab_path = dsl.OneOf( + set_tfrecord_args_previous_run_task.outputs[ + 'static_covariates_vocab_path'], + set_tfrecord_args_this_run_task.outputs['static_covariates_vocab_path'] + ) + train_tf_record_patterns = dsl.OneOf( + set_tfrecord_args_previous_run_task.outputs['train_tf_record_patterns'], + set_tfrecord_args_this_run_task.outputs['train_tf_record_patterns'] + ) + val_tf_record_patterns = dsl.OneOf( + set_tfrecord_args_previous_run_task.outputs['val_tf_record_patterns'], + set_tfrecord_args_this_run_task.outputs['val_tf_record_patterns'] + ) + test_tf_record_patterns = dsl.OneOf( + set_tfrecord_args_previous_run_task.outputs['test_tf_record_patterns'], + set_tfrecord_args_this_run_task.outputs['test_tf_record_patterns'] + ) + trainer_task = TrainOp( + num_epochs=trainer_num_epochs, + backcast_length=dataprep_backcast_length, + forecast_length=dataprep_forecast_length, + train_end_date=dataprep_train_end_date, + csv_data_path=dataprep_csv_data_path, + csv_static_covariates_path=dataprep_csv_static_covariates_path, + static_covariates_vocab_path=static_covariates_vocab_path, + train_tf_record_patterns=train_tf_record_patterns, + val_tf_record_patterns=val_tf_record_patterns, + test_tf_record_patterns=test_tf_record_patterns, + n_decomposition_plots=tensorboard_n_decomposition_plots, + n_val_windows=dataprep_n_val_windows, + n_test_windows=dataprep_n_test_windows, + test_set_stride=dataprep_test_set_stride, + cleaning_activation_regularizer_coeff=trainer_cleaning_activation_regularizer_coeff, + change_point_activation_regularizer_coeff=trainer_change_point_activation_regularizer_coeff, + change_point_output_regularizer_coeff=trainer_change_point_output_regularizer_coeff, + alpha_upper_bound=trainer_trend_alpha_upper_bound, + beta_upper_bound=trainer_trend_beta_upper_bound, + phi_lower_bound=trainer_trend_phi_lower_bound, + b_fixed_val=trainer_trend_b_fixed_val, + b0_fixed_val=trainer_trend_b0_fixed_val, + phi_fixed_val=trainer_trend_phi_fixed_val, + quantiles=create_trainer_args_task.outputs['quantiles'], + use_static_covariates=create_trainer_args_task.outputs[ + 'use_static_covariates'], + static_covariate_names=create_trainer_args_task.outputs[ + 'static_covariate_names'], + model_blocks=create_trainer_args_task.outputs['model_blocks'], + freeze_point_forecasts=create_trainer_args_task.outputs[ + 'freeze_point_forecasts'], + machine_type=trainer_machine_type, + accelerator_type=trainer_accelerator_type, + docker_region=create_dataprep_args_task.outputs['docker_region'], + location=location, + job_id=job_id, + project=project, + encryption_spec_key_name=encryption_spec_key_name + ) + _ = UploadDecompositionPlotsOp( + project=project, + location=location, + tensorboard_id=tensorboard_instance_id, + display_name=job_id, + trainer_dir=trainer_task.outputs['trainer_dir']) + training_artifacts_task = GetTrainingArtifactsOp( + docker_region=create_dataprep_args_task.outputs['docker_region'], + trainer_dir=trainer_task.outputs['trainer_dir']) + model = dsl.importer( + artifact_uri=training_artifacts_task.outputs['artifact_uri'], + artifact_class=artifact_types.UnmanagedContainerModel, + metadata={ + 'predictSchemata': { + 'instanceSchemaUri': training_artifacts_task.outputs[ + 'instance_schema_uri'], + 'predictionSchemaUri': training_artifacts_task.outputs[ + 'prediction_schema_uri'], + }, + 'containerSpec': { + 'imageUri': training_artifacts_task.outputs['image_uri'], + 'healthRoute': '/health', + 'predictRoute': '/predict', + } + }, + ) + model.set_display_name('set-model') + upload_model_task = UploadModelOp( + project=project, + location=location, + display_name=job_id, + unmanaged_container_model=model.output, + encryption_spec_key_name=encryption_spec_key_name, + ) + upload_model_task.set_display_name('upload-model') + batch_predict_task = batch_predict_job.ModelBatchPredictOp( + project=project, + location=location, + unmanaged_container_model=model.output, + job_display_name=f'batch-predict-{job_id}', + instances_format='bigquery', + predictions_format='bigquery', + bigquery_source_input_uri=set_test_set_task.outputs['uri'], + bigquery_destination_output_uri=dataprep_test_set_bigquery_dataset, + machine_type=dataflow_machine_type, + starting_replica_count=dataflow_starting_replica_count, + max_replica_count=dataflow_max_replica_count, + encryption_spec_key_name=encryption_spec_key_name, + generate_explanation=False, + ) + batch_predict_task.set_display_name('run-batch-prediction') + set_eval_args_task = SetEvalArgsOp( + big_query_source=batch_predict_task.outputs['bigquery_output_table'], + quantiles=trainer_quantiles) + eval_task = EvaluationOp( + project=project, + location=location, + root_dir=test_set_task.outputs['dataprep_dir'], + target_field_name='HORIZON__x', + predictions_format='bigquery', + ground_truth_format='bigquery', + predictions_bigquery_source=batch_predict_task.outputs[ + 'bigquery_output_table'], + ground_truth_bigquery_source=set_eval_args_task.outputs[ + 'big_query_source'], + ground_truth_gcs_source=[], + forecasting_type=set_eval_args_task.outputs['forecasting_type'], + forecasting_quantiles=set_eval_args_task.outputs['quantiles'], + prediction_score_column=set_eval_args_task.outputs[ + 'prediction_score_column'], + dataflow_service_account=_placeholders.SERVICE_ACCOUNT_PLACEHOLDER, + dataflow_machine_type=dataflow_machine_type, + dataflow_max_workers_num=dataflow_max_replica_count, + dataflow_workers_num=dataflow_starting_replica_count, + dataflow_disk_size=dataflow_disk_size_gb, + dataflow_use_public_ips=True, + encryption_spec_key_name=encryption_spec_key_name, + ) + model_evaluation_import_component.model_evaluation_import( + forecasting_metrics=eval_task.outputs['evaluation_metrics'], + model=upload_model_task.outputs['model'], + dataset_type='bigquery', + dataset_path=set_test_set_task.outputs['uri'], + display_name=job_id, + problem_type='forecasting', + )