diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index c2c5b4a9272..a443f7e115e 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -95,7 +95,11 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { }) } - c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...) + spanProcessor, err := handlerBuilder.BuildSpanProcessor(additionalProcessors...) + if err != nil { + return fmt.Errorf("could not create span processor: %w", err) + } + c.spanProcessor = spanProcessor c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ Handler: c.spanHandlers.GRPCHandler, diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index d0fb4f7557e..e7d690288aa 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -36,7 +36,7 @@ type SpanHandlers struct { } // BuildSpanProcessor builds the span processor to be used with the handlers -func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) processor.SpanProcessor { +func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) (processor.SpanProcessor, error) { hostname, _ := os.Hostname() svcMetrics := b.metricsFactory() hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}}) diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index 4d0c6a1f06a..70cdc08fd87 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -44,7 +44,8 @@ func TestNewSpanHandlerBuilder(t *testing.T) { TenancyMgr: &tenancy.Manager{}, } - spanProcessor := builder.BuildSpanProcessor() + spanProcessor, err := builder.BuildSpanProcessor() + require.NoError(t, err) spanHandlers := builder.BuildHandlers(spanProcessor) assert.NotNil(t, spanHandlers.ZipkinSpansHandler) assert.NotNil(t, spanHandlers.JaegerBatchesHandler) diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 64832f0eaca..e040301dde2 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -60,8 +60,11 @@ func NewSpanProcessor( spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option, -) processor.SpanProcessor { - sp := newSpanProcessor(spanWriter, additional, opts...) +) (processor.SpanProcessor, error) { + sp, err := newSpanProcessor(spanWriter, additional, opts...) + if err != nil { + return nil, err + } sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) { sp.processItemFromQueue(item) @@ -73,10 +76,10 @@ func NewSpanProcessor( sp.background(1*time.Minute, sp.updateQueueSize) } - return sp + return sp, nil } -func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) *spanProcessor { +func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) (*spanProcessor, error) { options := Options.apply(opts...) handlerMetrics := NewSpanProcessorMetrics( options.serviceMetrics, @@ -125,7 +128,7 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt processSpanFuncs = append(processSpanFuncs, additional...) sp.processSpan = ChainedProcessSpan(processSpanFuncs...) - return &sp + return &sp, nil } func (sp *spanProcessor) Close() error { diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index f52494098e9..87f991b6b07 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -77,7 +77,7 @@ func TestBySvcMetrics(t *testing.T) { logger := zap.NewNop() serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) hostMetrics := mb.Namespace(metrics.NSOptions{Name: "host", Tags: nil}) - sp := newSpanProcessor( + sp, err := newSpanProcessor( &fakeSpanWriter{}, nil, Options.ServiceMetrics(serviceMetrics), @@ -88,6 +88,7 @@ func TestBySvcMetrics(t *testing.T) { Options.ReportBusy(false), Options.SpanFilter(isSpanAllowed), ) + require.NoError(t, err) var metricPrefix, format string switch test.format { case processor.ZipkinSpanFormat: @@ -235,7 +236,8 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S func TestSpanProcessor(t *testing.T) { w := &fakeSpanWriter{} - p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) + p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + require.NoError(t, err) res, err := p.ProcessSpans( processor.SpansV1{ @@ -260,12 +262,14 @@ func TestSpanProcessorErrors(t *testing.T) { mb := metricstest.NewFactory(time.Hour) defer mb.Backend.Stop() serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) - p := NewSpanProcessor(w, + pp, err := NewSpanProcessor(w, nil, Options.Logger(logger), Options.ServiceMetrics(serviceMetrics), Options.QueueSize(1), - ).(*spanProcessor) + ) + require.NoError(t, err) + p := pp.(*spanProcessor) res, err := p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ @@ -311,12 +315,14 @@ func (w *blockingWriter) WriteSpan(context.Context, *model.Span) error { func TestSpanProcessorBusy(t *testing.T) { w := &blockingWriter{} - p := NewSpanProcessor(w, + pp, err := NewSpanProcessor(w, nil, Options.NumWorkers(1), Options.QueueSize(1), Options.ReportBusy(true), - ).(*spanProcessor) + ) + require.NoError(t, err) + p := pp.(*spanProcessor) defer require.NoError(t, p.Close()) // block the writer so that the first span is read from the queue and blocks the processor, @@ -357,7 +363,9 @@ func TestSpanProcessorWithNilProcess(t *testing.T) { serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) w := &fakeSpanWriter{} - p := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor) + pp, err := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)) + require.NoError(t, err) + p := pp.(*spanProcessor) defer require.NoError(t, p.Close()) p.saveSpan(&model.Span{}, "") @@ -376,7 +384,10 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) { } w := &fakeSpanWriter{} - p := NewSpanProcessor(w, nil, Options.CollectorTags(testCollectorTags)).(*spanProcessor) + + pp, err := NewSpanProcessor(w, nil, Options.CollectorTags(testCollectorTags)) + require.NoError(t, err) + p := pp.(*spanProcessor) defer require.NoError(t, p.Close()) span := &model.Span{ @@ -465,7 +476,9 @@ func TestSpanProcessorCountSpan(t *testing.T) { } else { opts = append(opts, Options.DynQueueSizeMemory(0)) } - p := NewSpanProcessor(w, nil, opts...).(*spanProcessor) + pp, err := NewSpanProcessor(w, nil, opts...) + require.NoError(t, err) + p := pp.(*spanProcessor) defer func() { require.NoError(t, p.Close()) }() @@ -578,7 +591,8 @@ func TestUpdateDynQueueSize(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { w := &fakeSpanWriter{} - p := newSpanProcessor(w, nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes)) + p, err := newSpanProcessor(w, nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes)) + require.NoError(t, err) assert.EqualValues(t, tt.initialCapacity, p.queue.Capacity()) p.spansProcessed.Store(tt.spansProcessed) @@ -592,14 +606,16 @@ func TestUpdateDynQueueSize(t *testing.T) { func TestUpdateQueueSizeNoActivityYet(t *testing.T) { w := &fakeSpanWriter{} - p := newSpanProcessor(w, nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1)) + p, err := newSpanProcessor(w, nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1)) + require.NoError(t, err) assert.NotPanics(t, p.updateQueueSize) } func TestStartDynQueueSizeUpdater(t *testing.T) { w := &fakeSpanWriter{} oneGiB := uint(1024 * 1024 * 1024) - p := newSpanProcessor(w, nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB)) + p, err := newSpanProcessor(w, nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB)) + require.NoError(t, err) assert.EqualValues(t, 100, p.queue.Capacity()) p.spansProcessed.Store(1000) @@ -625,7 +641,8 @@ func TestAdditionalProcessors(t *testing.T) { w := &fakeSpanWriter{} // nil doesn't fail - p := NewSpanProcessor(w, nil, Options.QueueSize(1)) + p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + require.NoError(t, err) res, err := p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ { @@ -647,7 +664,8 @@ func TestAdditionalProcessors(t *testing.T) { f := func(_ *model.Span, _ string) { count++ } - p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) + p, err = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) + require.NoError(t, err) res, err = p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ { @@ -668,7 +686,8 @@ func TestAdditionalProcessors(t *testing.T) { func TestSpanProcessorContextPropagation(t *testing.T) { w := &fakeSpanWriter{} - p := NewSpanProcessor(w, nil, Options.QueueSize(1)) + p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + require.NoError(t, err) dummyTenant := "context-prop-test-tenant" @@ -701,20 +720,22 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { } w := &blockingWriter{} - p := NewSpanProcessor(w, + pp, err := NewSpanProcessor(w, nil, Options.NumWorkers(1), Options.QueueSize(1), Options.OnDroppedSpan(customOnDroppedSpan), Options.ReportBusy(true), - ).(*spanProcessor) + ) + require.NoError(t, err) + p := pp.(*spanProcessor) defer p.Close() // Acquire the lock externally to force the writer to block. w.Lock() defer w.Unlock() - _, err := p.ProcessSpans(processor.SpansV1{ + _, err = p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ {OperationName: "op1"}, },