Skip to content

Commit

Permalink
pipeline: pass context with span through to processMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Nov 3, 2023
1 parent ab018dc commit debc525
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (fr *FileReceiver) RegisterAdminRoutes(r *admin.Server) {
sub.HandleFunc("/pipeline/{isolatedDirectory}/file-uploaded", fr.manuallyProduceFileUploaded())
}

type processableMessage struct {
ctx context.Context
msg *pubsub.Message
}

// handleMessage will listen for an incoming.ACHFile to pass off to an aggregator for the shard
// responsible. It does so with a database lookup and the fixed set of Shards from the file config.
func (fr *FileReceiver) handleMessage(ctx context.Context, sub stream.Subscription) chan error {
Expand All @@ -210,12 +215,11 @@ func (fr *FileReceiver) handleMessage(ctx context.Context, sub stream.Subscripti
out <- nil
}
go func() {
receiver := make(chan *pubsub.Message)
receiver := make(chan *processableMessage)
go func() {
msg, err := sub.Receive(ctx)

var span trace.Span
ctx, span = telemetry.StartSpan(ctx, "file-receiver-handle-message")
ctx, span := telemetry.StartSpan(ctx, "file-receiver-handle-message")
defer span.End()

if err != nil {
Expand All @@ -234,13 +238,16 @@ func (fr *FileReceiver) handleMessage(ctx context.Context, sub stream.Subscripti
}
fr.logger.LogErrorf("ERROR receiving message: %v", err)
}
receiver <- msg
receiver <- &processableMessage{
ctx: ctx,
msg: msg,
}
}()

select {
case msg := <-receiver:
if msg != nil {
out <- fr.processMessage(ctx, msg)
case m := <-receiver:
if m != nil {
out <- fr.processMessage(m.ctx, m.msg)
return
} else {
cleanup()
Expand Down

0 comments on commit debc525

Please sign in to comment.