From 1fc95880d6ed87ffc27f25a6df1e68b0ae25738c Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 27 Mar 2024 14:59:29 -0400 Subject: [PATCH] Fix `iso_date_regex_pattern` config in `file_batcher` module and allow override (#1580) - Update `file_batcher`module to use documented module config key `iso_date_regex_pattern` instead of `batch_iso_date_regex_pattern`. - Allow override of `iso_date_regex_pattern` with control message. - Add tests for `file_batcher` module Closes #1576 Closes #1577 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - Eli Fajardo (https://github.com/efajardo-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1580 --- morpheus/modules/file_batcher.py | 22 ++- tests/modules/test_file_batcher.py | 300 +++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 tests/modules/test_file_batcher.py diff --git a/morpheus/modules/file_batcher.py b/morpheus/modules/file_batcher.py index ac63474710..214f6c0e5b 100644 --- a/morpheus/modules/file_batcher.py +++ b/morpheus/modules/file_batcher.py @@ -80,7 +80,7 @@ def file_batcher(builder: mrc.Builder): sampling = config.get("sampling", None) sampling_rate_s = config.get("sampling_rate_s", None) - iso_date_regex_pattern = config.get("batch_iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN) + iso_date_regex_pattern = config.get("iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN) iso_date_regex = re.compile(iso_date_regex_pattern) if (sampling_rate_s is not None and sampling_rate_s > 0): @@ -99,6 +99,7 @@ def file_batcher(builder: mrc.Builder): "sampling": sampling, "start_time": config.get("start_time"), "end_time": config.get("end_time"), + "iso_date_regex_pattern": iso_date_regex_pattern } default_file_to_df_opts = { @@ -123,11 +124,19 @@ def build_period_batches(files: typing.List[str], params: typing.Dict[any, any]) -> typing.List[typing.Tuple[typing.List[str], int]]: file_objects: fsspec.core.OpenFiles = fsspec.open_files(files) + nonlocal iso_date_regex_pattern + nonlocal iso_date_regex + + if params["iso_date_regex_pattern"] != iso_date_regex_pattern: + iso_date_regex_pattern = params["iso_date_regex_pattern"] + iso_date_regex = re.compile(iso_date_regex_pattern) + try: start_time = params["start_time"] end_time = params["end_time"] period = params["period"] sampling_rate_s = params["sampling_rate_s"] + sampling = params["sampling"] if not isinstance(start_time, (str, type(None))) or (start_time is not None and not re.match(r"\d{4}-\d{2}-\d{2}", start_time)): @@ -137,8 +146,15 @@ def build_period_batches(files: typing.List[str], and not re.match(r"\d{4}-\d{2}-\d{2}", end_time)): raise ValueError(f"Invalid 'end_time' value: {end_time}") - if not isinstance(sampling_rate_s, int) or sampling_rate_s < 0: - raise ValueError(f"Invalid 'sampling_rate_s' value: {sampling_rate_s}") + if (sampling_rate_s is not None and sampling_rate_s > 0): + assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time" + + # Show the deprecation message + warnings.warn(("The `sampling_rate_s` argument has been deprecated. " + "Please use `sampling={sampling_rate_s}S` instead"), + DeprecationWarning) + + sampling = f"{sampling_rate_s}S" if (start_time is not None): start_time = datetime.datetime.strptime(start_time, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) diff --git a/tests/modules/test_file_batcher.py b/tests/modules/test_file_batcher.py new file mode 100644 index 0000000000..463b2fef21 --- /dev/null +++ b/tests/modules/test_file_batcher.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import cudf + +import morpheus.modules # noqa: F401 # pylint: disable=unused-import +from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import source +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from morpheus.utils.module_ids import FILE_BATCHER +from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE + +# pylint: disable=redundant-keyword-arg + + +@source +def source_test_stage(filenames: list[str], cm_batching_options: dict) -> ControlMessage: + + df = cudf.DataFrame(filenames, columns=['files']) + + control_message = ControlMessage() + + control_message.set_metadata("batching_options", cm_batching_options) + control_message.set_metadata("data_type", "payload") + + control_message.payload(MessageMeta(df=df)) + + yield control_message + + +@pytest.fixture(name="default_module_config") +def default_module_config_fixture(): + yield { + "module_id": FILE_BATCHER, + "module_name": "file_batcher", + "namespace": MORPHEUS_MODULE_NAMESPACE, + "sampling_rate_s": 0, + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + +@pytest.fixture(name="default_file_list") +def default_file_list_fixture(): + yield [ + "DUO_2022-08-01T00_05_06.806Z.json", + "DUO_2022-08-01T03_02_04.418Z.json", + "DUO_2022-08-01T06_05_05.064Z.json", + "DUO_2022-08-02T00_05_06.806Z.json", + "DUO_2022-08-02T03_02_04.418Z.json", + "DUO_2022-08-02T06_05_05.064Z.json" + ] + + +def test_no_overrides(config: Config, default_module_config, default_file_list): + pipeline = LinearPipeline(config) + + cm_batching_opts = { + "sampling_rate_s": 0, + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + assert len(sink_messages) == 2 + assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3 + assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2 + assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3 + assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2 + + +def test_no_date_matches(config: Config, default_module_config, default_file_list): + pipeline = LinearPipeline(config) + + cm_batching_opts = { + "sampling_rate_s": 0, + "start_time": "2022-09-01", + "end_time": "2022-09-30", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + assert len(sink_messages) == 0 + + +def test_partial_date_matches(config: Config, default_module_config, default_file_list): + pipeline = LinearPipeline(config) + + cm_batching_opts = { + "sampling_rate_s": 0, + "start_time": "2022-07-30", + "end_time": "2022-08-02", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + sink_messages = sink_stage.get_messages() + assert len(sink_messages) == 1 + assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3 + assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 1 + + +def test_override_date_regex(config: Config, default_module_config): + pipeline = LinearPipeline(config) + + filenames = [ + "DUO_2022-08-01_00_05_06.806Z.json", + "DUO_2022-08-01_03_02_04.418Z.json", + "DUO_2022-08-01_06_05_05.064Z.json", + "DUO_2022-08-02_00_05_06.806Z.json", + "DUO_2022-08-02_03_02_04.418Z.json", + "DUO_2022-08-02_06_05_05.064Z.json" + ] + + cm_date_regex_pattern = ( + r"(?P\d{4})-(?P\d{1,2})-(?P\d{1,2})" + r"_(?P\d{1,2})(:|_)(?P\d{1,2})(:|_)(?P\d{1,2})(?P\.\d{1,6})?Z") + + cm_batching_opts = { + "sampling_rate_s": 0, + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "iso_date_regex_pattern": cm_date_regex_pattern, + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + assert len(sink_messages) == 2 + assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3 + assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2 + assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3 + assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2 + + +def test_sampling_freq(config: Config, default_module_config): + pipeline = LinearPipeline(config) + + filenames = [ + "DUO_2022-08-01T00_05_06.806Z.json", + "DUO_2022-08-01T00_05_08.418Z.json", + "DUO_2022-08-01T00_05_12.064Z.json", + "DUO_2022-08-02T03_02_06.806Z.json", + "DUO_2022-08-02T03_02_14.418Z.json", + "DUO_2022-08-02T03_02_17.064Z.json" + ] + + cm_batching_opts = { + "sampling_rate_s": None, + "sampling": "30S", + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + assert len(sink_messages) == 2 + assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 1 + assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2 + assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 1 + assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2 + + +def test_sampling_pct(config: Config, default_module_config, default_file_list): + pipeline = LinearPipeline(config) + + cm_batching_opts = { + "sampling_rate_s": None, + "sampling": 0.5, + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages] + assert sum(msg_counts) == 3 + + +def test_sampling_fixed(config: Config, default_module_config, default_file_list): + pipeline = LinearPipeline(config) + + cm_batching_opts = { + "sampling_rate_s": None, + "sampling": 5, + "start_time": "2022-08-01", + "end_time": "2022-08-31", + "parser_kwargs": None, + "schema": { + "schema_str": None, "encoding": None + } + } + + pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts)) + + pipeline.add_stage( + LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output")) + + sink_stage = pipeline.add_stage(InMemorySinkStage(config)) + + pipeline.run() + + sink_messages = sink_stage.get_messages() + msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages] + assert sum(msg_counts) == 5