Skip to content

Commit

Permalink
stream-enrich-kafka: reinstate config example (close #646)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jun 27, 2022
1 parent f107606 commit 3886e44
Showing 1 changed file with 152 additions and 0 deletions.
152 changes: 152 additions & 0 deletions config/stream-enrich-kafka.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

enrich {

streams {

in {
# Stream/topic where the raw events to be enriched are located
raw = {{streamsInRaw}}
raw = ${?ENRICH_STREAMS_IN_RAW}
}

out {
# Stream/topic where the events that were successfully enriched will end up
enriched = {{outEnriched}}
enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
# Stream/topic where the event that failed enrichment will be stored
bad = {{outBad}}
bad = ${?ENRICH_STREAMS_OUT_BAD}
# Stream/topic where the pii tranformation events will end up
pii = {{outPii}}
pii = ${?ENRICH_STREAMS_OUT_PII}

# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
partitionKey = {{partitionKeyName}}
partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}

sourceSink {
enabled = kafka

# Optional. Region where the streams are located
# region = {{region}}
# region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}

# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# aws {
# accessKey = iam
# accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
# secretKey = iam
# secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
# }

# GCP credentials
# Either provide path to service account file or set environment variable GOOGLE_APPLICATION_CREDENTIALS
# gcp {
# creds = {{googleApplicationCredentials}}
# creds = ${?GOOGLE_APPLICATION_CREDENTIALS}
# }

brokers = "{{kafkaBrokers}}"

# Number of retries to perform before giving up on sending a record
retries = 0

# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# retries -> retries
# "buffer.memory" -> buffer.byteLimit
# "linger.ms" -> buffer.timeLimit
#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}

# The kafka consumer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#consumerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# "group.id" -> appName
#consumerConf {
# "enable.auto.commit" = true
# "auto.commit.interval.ms" = 1000
# "auto.offset.reset" = earliest
# "session.timeout.ms" = 30000
# "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
# "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
#}
}

# After enrichment, events are accumulated in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = {{bufferByteThreshold}}
byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = {{bufferRecordThreshold}} # Not supported by Kafka; will be ignored
recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = {{bufferTimeThreshold}}
timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}

# Used as the Kafka consumer group ID.
appName = "{{appName}}"
appName = ${?ENRICH_STREAMS_APP_NAME}
}

# The setting below requires an adapter being ready, i.e.: https://github.com/snowplow-incubator/remote-adapter-example
# remoteAdapters = [
# {
# vendor: "com.globeandmail"
# version: "v1"
# url: "http://remote-adapter-example:8995/sampleRemoteAdapter"
# connectionTimeout: 1000
# readTimeout: 5000
# }
# ]

# Optional section for tracking endpoints
monitoring {
snowplow {
collectorUri = "{{collectorUri}}"
collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI}
collectorPort = 80
collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT}
appId = {{enrichAppName}}
appId = ${?ENRICH_MONITORING_APP_ID}
method = GET
method = ${?ENRICH_MONITORING_METHOD}
}
}

# Optional section for Sentry
sentry {
dsn = ${?SENTRY_DSN}
}
}

0 comments on commit 3886e44

Please sign in to comment.