Skip to content

Commit

Permalink
[refactor] move root span handler into aggregator (#5478)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
-
#5389 (comment)

## Description of the changes
- Refactored `handleRootSpan` logic into a helper method in
aggregator.go.
  • Loading branch information
Pushkarm029 authored May 26, 2024
1 parent 584b6ff commit a194bd9
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 113 deletions.
5 changes: 4 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand Down Expand Up @@ -103,7 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {

var additionalProcessors []ProcessSpan
if c.aggregator != nil {
additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger))
additionalProcessors = append(additionalProcessors, func(span *model.Span, tenant string) {
c.aggregator.HandleRootSpan(span, c.logger)
})
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
Expand Down
21 changes: 21 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package app
import (
"context"
"io"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -47,6 +48,26 @@ func optionsForEphemeralPorts() *flags.CollectorOptions {
return collectorOpts
}

type mockAggregator struct {
callCount atomic.Int32
closeCount atomic.Int32
}

func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) {
t.callCount.Add(1)
}

func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) {
t.callCount.Add(1)
}

func (t *mockAggregator) Start() {}

func (t *mockAggregator) Close() error {
t.closeCount.Add(1)
return nil
}

func TestNewCollector(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand Down
42 changes: 0 additions & 42 deletions cmd/collector/app/root_span_handler.go

This file was deleted.

70 changes: 0 additions & 70 deletions cmd/collector/app/root_span_handler_test.go

This file was deleted.

6 changes: 6 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"io"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand All @@ -36,6 +38,10 @@ type Aggregator interface {
// Close() from io.Closer stops the aggregator from aggregating throughput.
io.Closer

// The HandleRootSpan function processes a span, checking if it's a root span.
// If it is, it extracts sampler parameters, then calls RecordThroughput.
HandleRootSpan(span *model.Span, logger *zap.Logger)

// RecordThroughput records throughput for an operation for aggregation.
RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64)

Expand Down
17 changes: 17 additions & 0 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,20 @@ func (a *aggregator) Close() error {
a.bgFinished.Wait()
return nil
}

func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) {
// simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
if span.ParentSpanID() != span_model.NewSpanID(0) {
return
}
service := span.Process.ServiceName
if service == "" || span.OperationName == "" {
return
}
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == span_model.SamplerTypeUnrecognized {
return
}
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
}
40 changes: 40 additions & 0 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,43 @@ func TestLowerboundThroughput(t *testing.T) {
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
}

func TestRecordThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "A",
}
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span, logger)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
}

0 comments on commit a194bd9

Please sign in to comment.