From 3886e44849c2c0ec34775595a9d4e68ba89d2fe0 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Mon, 27 Jun 2022 15:01:34 +0200 Subject: [PATCH] stream-enrich-kafka: reinstate config example (close #646) --- config/stream-enrich-kafka.hocon | 152 +++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 config/stream-enrich-kafka.hocon diff --git a/config/stream-enrich-kafka.hocon b/config/stream-enrich-kafka.hocon new file mode 100644 index 000000000..e8ab3395f --- /dev/null +++ b/config/stream-enrich-kafka.hocon @@ -0,0 +1,152 @@ +# Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. +# +# This program is licensed to you under the Apache License Version 2.0, and +# you may not use this file except in compliance with the Apache License +# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +# http://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the Apache License Version 2.0 is distributed on an "AS +# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the Apache License Version 2.0 for the specific language +# governing permissions and limitations there under. + +enrich { + + streams { + + in { + # Stream/topic where the raw events to be enriched are located + raw = {{streamsInRaw}} + raw = ${?ENRICH_STREAMS_IN_RAW} + } + + out { + # Stream/topic where the events that were successfully enriched will end up + enriched = {{outEnriched}} + enriched = ${?ENRICH_STREAMS_OUT_ENRICHED} + # Stream/topic where the event that failed enrichment will be stored + bad = {{outBad}} + bad = ${?ENRICH_STREAMS_OUT_BAD} + # Stream/topic where the pii tranformation events will end up + pii = {{outPii}} + pii = ${?ENRICH_STREAMS_OUT_PII} + + # How the output stream/topic will be partitioned. + # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, + # user_ipaddress, domain_sessionid, user_fingerprint. + # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the + # possible parittion keys correspond to. + # Otherwise, the partition key will be a random UUID. + partitionKey = {{partitionKeyName}} + partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY} + } + + sourceSink { + enabled = kafka + + # Optional. Region where the streams are located + # region = {{region}} + # region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION} + + # AWS credentials + # If both are set to 'default', use the default AWS credentials provider chain. + # If both are set to 'iam', use AWS IAM Roles to provision credentials. + # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + # aws { + # accessKey = iam + # accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY} + # secretKey = iam + # secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY} + # } + + # GCP credentials + # Either provide path to service account file or set environment variable GOOGLE_APPLICATION_CREDENTIALS + # gcp { + # creds = {{googleApplicationCredentials}} + # creds = ${?GOOGLE_APPLICATION_CREDENTIALS} + # } + + brokers = "{{kafkaBrokers}}" + + # Number of retries to perform before giving up on sending a record + retries = 0 + + # The kafka producer has a variety of possible configuration options defined at + # https://kafka.apache.org/documentation/#producerconfigs + # Some values are set to other values from this config by default: + # "bootstrap.servers" -> brokers + # retries -> retries + # "buffer.memory" -> buffer.byteLimit + # "linger.ms" -> buffer.timeLimit + #producerConf { + # acks = all + # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + # "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + #} + + # The kafka consumer has a variety of possible configuration options defined at + # https://kafka.apache.org/documentation/#consumerconfigs + # Some values are set to other values from this config by default: + # "bootstrap.servers" -> brokers + # "group.id" -> appName + #consumerConf { + # "enable.auto.commit" = true + # "auto.commit.interval.ms" = 1000 + # "auto.offset.reset" = earliest + # "session.timeout.ms" = 30000 + # "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer" + # "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer" + #} + } + + # After enrichment, events are accumulated in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the number of stored records reaches recordLimit or + # - the combined size of the stored records reaches byteLimit or + # - the time in milliseconds since it was last emptied exceeds timeLimit when + # a new event enters the buffer + buffer { + byteLimit = {{bufferByteThreshold}} + byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT} + recordLimit = {{bufferRecordThreshold}} # Not supported by Kafka; will be ignored + recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT} + timeLimit = {{bufferTimeThreshold}} + timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT} + } + + # Used as the Kafka consumer group ID. + appName = "{{appName}}" + appName = ${?ENRICH_STREAMS_APP_NAME} + } + + # The setting below requires an adapter being ready, i.e.: https://github.com/snowplow-incubator/remote-adapter-example + # remoteAdapters = [ + # { + # vendor: "com.globeandmail" + # version: "v1" + # url: "http://remote-adapter-example:8995/sampleRemoteAdapter" + # connectionTimeout: 1000 + # readTimeout: 5000 + # } + # ] + + # Optional section for tracking endpoints + monitoring { + snowplow { + collectorUri = "{{collectorUri}}" + collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI} + collectorPort = 80 + collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT} + appId = {{enrichAppName}} + appId = ${?ENRICH_MONITORING_APP_ID} + method = GET + method = ${?ENRICH_MONITORING_METHOD} + } + } + + # Optional section for Sentry + sentry { + dsn = ${?SENTRY_DSN} + } +}