Skip to content

Commit

Permalink
fix: ActiveJob Propagate baggage information properly when performing (
Browse files Browse the repository at this point in the history
  • Loading branch information
yoheyk authored Oct 24, 2024
1 parent fcb40da commit 5b1c09d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ def start_span(name, _id, payload)
span = tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links)
end

{ span: span, ctx_token: attach_consumer_context(span) }
{ span: span, ctx_token: attach_consumer_context(span, parent_context) }
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
# 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK.
# @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
# @param parent_context [Context] The context to use as the parent for the consumer context
# @return [Numeric] Context token that must be detached when finished
def attach_consumer_context(span)
consumer_context = OpenTelemetry::Trace.context_with_span(span)
def attach_consumer_context(span, parent_context)
consumer_context = OpenTelemetry::Trace.context_with_span(span, parent_context: parent_context)
internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context)

OpenTelemetry::Context.attach(internal_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,6 @@
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)

_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
Expand All @@ -225,6 +210,21 @@
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
perform_enqueued_jobs

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
end

Expand All @@ -251,18 +251,6 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
Expand All @@ -284,6 +272,20 @@
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
perform_enqueued_jobs

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
end

Expand Down

0 comments on commit 5b1c09d

Please sign in to comment.