Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added cloudwatch decompression and data message extraction #12

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ Supports all destinations and all Kinesis Firehose Features.
* [Version 3.1.0](#version-310)
* [License](#license)


## Module versioning rule

| Module version | AWS Provider version |
Expand Down Expand Up @@ -95,6 +94,7 @@ Supports all destinations and all Kinesis Firehose Features.
- Original Data Backup in S3
- Logging and Encryption
- Application Role to Direct Put Sources
- Turn on/off cloudwatch logs decompressing and data message extraction
- Permissions
- IAM Roles
- Opensearch / Opensearch Serverless Service Role
Expand Down Expand Up @@ -981,6 +981,8 @@ No modules.
| <a name="input_elasticsearch_index_rotation_period"></a> [elasticsearch\_index\_rotation\_period](#input\_elasticsearch\_index\_rotation\_period) | The Elasticsearch index rotation period. Index rotation appends a timestamp to the IndexName to facilitate expiration of old data | `string` | `"OneDay"` | no |
| <a name="input_elasticsearch_retry_duration"></a> [elasticsearch\_retry\_duration](#input\_elasticsearch\_retry\_duration) | The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt | `string` | `300` | no |
| <a name="input_elasticsearch_type_name"></a> [elasticsearch\_type\_name](#input\_elasticsearch\_type\_name) | The Elasticsearch type name with maximum length of 100 characters | `string` | `null` | no |
| <a name="input_enable_cloudwatch_logs_data_message_extraction"></a> [enable\_cloudwatch\_logs\_data\_message\_extraction](#input\_enable\_cloudwatch\_logs\_data\_message\_extraction) | Cloudwatch Logs data message extraction | `bool` | `false` | no |
| <a name="input_enable_cloudwatch_logs_decompression"></a> [enable\_cloudwatch\_logs\_decompression](#input\_enable\_cloudwatch\_logs\_decompression) | Enables or disables Cloudwatch Logs decompression | `bool` | `false` | no |
| <a name="input_enable_data_format_conversion"></a> [enable\_data\_format\_conversion](#input\_enable\_data\_format\_conversion) | Set it to true if you want to disable format conversion. | `bool` | `false` | no |
| <a name="input_enable_destination_log"></a> [enable\_destination\_log](#input\_enable\_destination\_log) | The CloudWatch Logging Options for the delivery stream | `bool` | `true` | no |
| <a name="input_enable_dynamic_partitioning"></a> [enable\_dynamic\_partitioning](#input\_enable\_dynamic\_partitioning) | Enables or disables dynamic partitioning | `bool` | `false` | no |
Expand Down
24 changes: 22 additions & 2 deletions locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ locals {
is_search_destination = contains(["elasticsearch", "opensearch", "opensearchserverless"], local.destination) ? true : false

# Data Transformation
enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning
enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning || var.enable_cloudwatch_logs_decompression
lambda_processor = var.enable_lambda_transform ? {
type = "Lambda"
parameters = [
Expand Down Expand Up @@ -97,11 +97,31 @@ locals {
record_deaggregation_processor = (var.enable_dynamic_partitioning && var.dynamic_partition_enable_record_deaggregation ?
(var.dynamic_partition_record_deaggregation_type == "JSON" ? local.record_deaggregation_processor_json : local.record_deaggregation_processor_delimiter)
: null)
cloudwatch_logs_decompression_processor = var.enable_cloudwatch_logs_decompression ? {
type = "Decompression"
parameters = [
{
name = "CompressionFormat"
value = "GZIP"
}
]
} : null
cloudwatch_logs_data_message_extraction_processor = var.enable_cloudwatch_logs_decompression && var.enable_cloudwatch_logs_data_message_extraction ? {
type = "CloudWatchLogProcessing"
parameters = [
{
name = "DataMessageExtraction"
value = tostring(var.enable_cloudwatch_logs_data_message_extraction)
},
]
} : null
processors = [for each in [
local.lambda_processor,
local.metadata_extractor_processor,
local.append_delimiter_processor,
local.record_deaggregation_processor
local.record_deaggregation_processor,
local.cloudwatch_logs_decompression_processor,
local.cloudwatch_logs_data_message_extraction_processor
] : each if local.enable_processing && each != null]

# Data Format conversion
Expand Down
12 changes: 12 additions & 0 deletions variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,18 @@ variable "msk_source_connectivity_type" {
######
# S3 Destination Configurations
######
variable "enable_cloudwatch_logs_decompression" {
description = "Enables or disables Cloudwatch Logs decompression"
type = bool
default = false
}

variable "enable_cloudwatch_logs_data_message_extraction" {
description = "Cloudwatch Logs data message extraction"
type = bool
default = false
}

variable "enable_dynamic_partitioning" {
description = "Enables or disables dynamic partitioning"
type = bool
Expand Down
Loading