Skip to content

Commit

Permalink
Return errors from span processor creation (jaegertracing#6488)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of jaegertracing#6487

## Description of the changes
- Creation of exporterhelper may result in an error, so extend the
processor construction signature to return the error.

## How was this change tested?
- CI

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 6, 2025
1 parent 232d805 commit 27af7b0
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 26 deletions.
6 changes: 5 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
})
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
spanProcessor, err := handlerBuilder.BuildSpanProcessor(additionalProcessors...)
if err != nil {
return fmt.Errorf("could not create span processor: %w", err)
}
c.spanProcessor = spanProcessor
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Handler: c.spanHandlers.GRPCHandler,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type SpanHandlers struct {
}

// BuildSpanProcessor builds the span processor to be used with the handlers
func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) processor.SpanProcessor {
func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) (processor.SpanProcessor, error) {
hostname, _ := os.Hostname()
svcMetrics := b.metricsFactory()
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/span_handler_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestNewSpanHandlerBuilder(t *testing.T) {
TenancyMgr: &tenancy.Manager{},
}

spanProcessor := builder.BuildSpanProcessor()
spanProcessor, err := builder.BuildSpanProcessor()
require.NoError(t, err)
spanHandlers := builder.BuildHandlers(spanProcessor)
assert.NotNil(t, spanHandlers.ZipkinSpansHandler)
assert.NotNil(t, spanHandlers.JaegerBatchesHandler)
Expand Down
13 changes: 8 additions & 5 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ func NewSpanProcessor(
spanWriter spanstore.Writer,
additional []ProcessSpan,
opts ...Option,
) processor.SpanProcessor {
sp := newSpanProcessor(spanWriter, additional, opts...)
) (processor.SpanProcessor, error) {
sp, err := newSpanProcessor(spanWriter, additional, opts...)
if err != nil {
return nil, err
}

sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) {
sp.processItemFromQueue(item)
Expand All @@ -73,10 +76,10 @@ func NewSpanProcessor(
sp.background(1*time.Minute, sp.updateQueueSize)
}

return sp
return sp, nil
}

func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) *spanProcessor {
func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) (*spanProcessor, error) {
options := Options.apply(opts...)
handlerMetrics := NewSpanProcessorMetrics(
options.serviceMetrics,
Expand Down Expand Up @@ -125,7 +128,7 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt
processSpanFuncs = append(processSpanFuncs, additional...)

sp.processSpan = ChainedProcessSpan(processSpanFuncs...)
return &sp
return &sp, nil
}

func (sp *spanProcessor) Close() error {
Expand Down
57 changes: 39 additions & 18 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestBySvcMetrics(t *testing.T) {
logger := zap.NewNop()
serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil})
hostMetrics := mb.Namespace(metrics.NSOptions{Name: "host", Tags: nil})
sp := newSpanProcessor(
sp, err := newSpanProcessor(
&fakeSpanWriter{},
nil,
Options.ServiceMetrics(serviceMetrics),
Expand All @@ -88,6 +88,7 @@ func TestBySvcMetrics(t *testing.T) {
Options.ReportBusy(false),
Options.SpanFilter(isSpanAllowed),
)
require.NoError(t, err)
var metricPrefix, format string
switch test.format {
case processor.ZipkinSpanFormat:
Expand Down Expand Up @@ -235,7 +236,8 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S

func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor)
p, err := NewSpanProcessor(w, nil, Options.QueueSize(1))
require.NoError(t, err)

res, err := p.ProcessSpans(
processor.SpansV1{
Expand All @@ -260,12 +262,14 @@ func TestSpanProcessorErrors(t *testing.T) {
mb := metricstest.NewFactory(time.Hour)
defer mb.Backend.Stop()
serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil})
p := NewSpanProcessor(w,
pp, err := NewSpanProcessor(w,
nil,
Options.Logger(logger),
Options.ServiceMetrics(serviceMetrics),
Options.QueueSize(1),
).(*spanProcessor)
)
require.NoError(t, err)
p := pp.(*spanProcessor)

res, err := p.ProcessSpans(processor.SpansV1{
Spans: []*model.Span{
Expand Down Expand Up @@ -311,12 +315,14 @@ func (w *blockingWriter) WriteSpan(context.Context, *model.Span) error {

func TestSpanProcessorBusy(t *testing.T) {
w := &blockingWriter{}
p := NewSpanProcessor(w,
pp, err := NewSpanProcessor(w,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.ReportBusy(true),
).(*spanProcessor)
)
require.NoError(t, err)
p := pp.(*spanProcessor)
defer require.NoError(t, p.Close())

// block the writer so that the first span is read from the queue and blocks the processor,
Expand Down Expand Up @@ -357,7 +363,9 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {
serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil})

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor)
pp, err := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics))
require.NoError(t, err)
p := pp.(*spanProcessor)
defer require.NoError(t, p.Close())

p.saveSpan(&model.Span{}, "")
Expand All @@ -376,7 +384,10 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {
}

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.CollectorTags(testCollectorTags)).(*spanProcessor)

pp, err := NewSpanProcessor(w, nil, Options.CollectorTags(testCollectorTags))
require.NoError(t, err)
p := pp.(*spanProcessor)

defer require.NoError(t, p.Close())
span := &model.Span{
Expand Down Expand Up @@ -465,7 +476,9 @@ func TestSpanProcessorCountSpan(t *testing.T) {
} else {
opts = append(opts, Options.DynQueueSizeMemory(0))
}
p := NewSpanProcessor(w, nil, opts...).(*spanProcessor)
pp, err := NewSpanProcessor(w, nil, opts...)
require.NoError(t, err)
p := pp.(*spanProcessor)
defer func() {
require.NoError(t, p.Close())
}()
Expand Down Expand Up @@ -578,7 +591,8 @@ func TestUpdateDynQueueSize(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &fakeSpanWriter{}
p := newSpanProcessor(w, nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes))
p, err := newSpanProcessor(w, nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes))
require.NoError(t, err)
assert.EqualValues(t, tt.initialCapacity, p.queue.Capacity())

p.spansProcessed.Store(tt.spansProcessed)
Expand All @@ -592,14 +606,16 @@ func TestUpdateDynQueueSize(t *testing.T) {

func TestUpdateQueueSizeNoActivityYet(t *testing.T) {
w := &fakeSpanWriter{}
p := newSpanProcessor(w, nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1))
p, err := newSpanProcessor(w, nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1))
require.NoError(t, err)
assert.NotPanics(t, p.updateQueueSize)
}

func TestStartDynQueueSizeUpdater(t *testing.T) {
w := &fakeSpanWriter{}
oneGiB := uint(1024 * 1024 * 1024)
p := newSpanProcessor(w, nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB))
p, err := newSpanProcessor(w, nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB))
require.NoError(t, err)
assert.EqualValues(t, 100, p.queue.Capacity())

p.spansProcessed.Store(1000)
Expand All @@ -625,7 +641,8 @@ func TestAdditionalProcessors(t *testing.T) {
w := &fakeSpanWriter{}

// nil doesn't fail
p := NewSpanProcessor(w, nil, Options.QueueSize(1))
p, err := NewSpanProcessor(w, nil, Options.QueueSize(1))
require.NoError(t, err)
res, err := p.ProcessSpans(processor.SpansV1{
Spans: []*model.Span{
{
Expand All @@ -647,7 +664,8 @@ func TestAdditionalProcessors(t *testing.T) {
f := func(_ *model.Span, _ string) {
count++
}
p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1))
p, err = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1))
require.NoError(t, err)
res, err = p.ProcessSpans(processor.SpansV1{
Spans: []*model.Span{
{
Expand All @@ -668,7 +686,8 @@ func TestAdditionalProcessors(t *testing.T) {

func TestSpanProcessorContextPropagation(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.QueueSize(1))
p, err := NewSpanProcessor(w, nil, Options.QueueSize(1))
require.NoError(t, err)

dummyTenant := "context-prop-test-tenant"

Expand Down Expand Up @@ -701,20 +720,22 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
}

w := &blockingWriter{}
p := NewSpanProcessor(w,
pp, err := NewSpanProcessor(w,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.OnDroppedSpan(customOnDroppedSpan),
Options.ReportBusy(true),
).(*spanProcessor)
)
require.NoError(t, err)
p := pp.(*spanProcessor)
defer p.Close()

// Acquire the lock externally to force the writer to block.
w.Lock()
defer w.Unlock()

_, err := p.ProcessSpans(processor.SpansV1{
_, err = p.ProcessSpans(processor.SpansV1{
Spans: []*model.Span{
{OperationName: "op1"},
},
Expand Down

0 comments on commit 27af7b0

Please sign in to comment.