From b93349e1ba7b6db4a836ba35db542f2ba2def8da Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Thu, 9 Nov 2023 10:48:22 -0600 Subject: [PATCH] stream: create telemetry span in kafka produce --- internal/incoming/stream/publisher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/incoming/stream/publisher.go b/internal/incoming/stream/publisher.go index 843fc90..d55f303 100644 --- a/internal/incoming/stream/publisher.go +++ b/internal/incoming/stream/publisher.go @@ -18,11 +18,12 @@ import ( "fmt" "net/url" - "github.com/Shopify/sarama" "github.com/moov-io/achgateway/internal/kafka" "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/base/log" + "github.com/moov-io/base/telemetry" + "github.com/Shopify/sarama" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/mempubsub" ) @@ -58,6 +59,9 @@ type kafkaProducer struct { } func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error { + _, span := telemetry.StartSpan(ctx, "producer-kafka-send") + defer span.End() + err := kp.topic.Send(ctx, m) if err != nil { var producerError sarama.ProducerError