Skip to content

Commit

Permalink
feat: add dependency processor using Apache Beam
Browse files Browse the repository at this point in the history
Signed-off-by: yunmaoQu <[email protected]>
  • Loading branch information
yunmaoQu committed Jan 17, 2025
1 parent 7f16f49 commit 60fb334
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 17 deletions.
205 changes: 205 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type dependencyAggregator struct {
config *Config
telset component.TelemetrySettings
dependencyWriter *memory.Store
traces map[model.TraceID]*TraceState
tracesLock sync.RWMutex
closeChan chan struct{}
beamPipeline *beam.Pipeline
beamScope beam.Scope
}

type TraceState struct {
spans []*model.Span
spanMap map[model.SpanID]*model.Span
lastUpdateTime time.Time
timer *time.Timer
}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator {
beam.Init()
p, s := beam.NewPipelineWithRoot()
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
traces: make(map[model.TraceID]*TraceState),
beamPipeline: p,
beamScope: s,
}
}

func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
go func() {
ticker := time.NewTicker(agg.config.AggregationInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
agg.processTraces(context.Background()) // Pass context
case <-agg.closeChan:
agg.processTraces(context.Background()) // Pass context
return
}
}
}()
}

func (agg *dependencyAggregator) Close() error {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()
for _, traceState := range agg.traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}
return nil
}

func (agg *dependencyAggregator) HandleSpan(span *model.Span) {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()

traceState, ok := agg.traces[span.TraceID]
if !ok {
traceState = &TraceState{
spans: []*model.Span{},
spanMap: make(map[model.SpanID]*model.Span),
lastUpdateTime: time.Now(),
}
agg.traces[span.TraceID] = traceState
}

traceState.spans = append(traceState.spans, span)
traceState.spanMap[span.SpanID] = span
traceState.lastUpdateTime = time.Now()

if traceState.timer != nil {
traceState.timer.Stop()
}
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() {
agg.processTraces(context.Background()) // Pass context
})
}

func (agg *dependencyAggregator) processTraces(ctx context.Context) {
agg.tracesLock.Lock()
if len(agg.traces) == 0 {
agg.tracesLock.Unlock()
return
}
traces := agg.traces
agg.traces = make(map[model.TraceID]*TraceState)
agg.tracesLock.Unlock()
for _, traceState := range traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}

beamInput := agg.createBeamInput(traces)
if beamInput.IsValid() {
agg.calculateDependencies(ctx, beamInput)
}
}

func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection {
var allSpans []*model.Span
for _, traceState := range traces {
allSpans = append(allSpans, traceState.spans...)
}
if len(allSpans) == 0 {
return beam.PCollection{}
}
return beam.CreateList(agg.beamScope, allSpans)
}

func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) {
keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) {
return s.TraceID, s
}, input)

groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans)
depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink {
deps := map[string]*model.DependencyLink{}
var span *model.Span
for spansIter(span) {
processSpan(deps, span, agg.traces)
}
return depMapToSlice(deps)
}, groupedSpans)
flattened := beam.Flatten(agg.beamScope, depLinks)

beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) {
err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps)
if err != nil {
agg.telset.Logger.Error("Error writing dependencies", zap.Error(err))
}
}, flattened)

go func() {
err := beamx.Run(ctx, agg.beamPipeline)
if err != nil {
agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err))
}
agg.beamPipeline = beam.NewPipeline()
agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"}
}()
}

// processSpan is a copy from the memory storage plugin
func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) {
parentSpan := seekToSpan(s, traces)
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
return
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}

func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span {
traceState, ok := traces[span.TraceID]
if !ok {
return nil
}
return traceState.spanMap[span.ParentSpanID()]
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
retMe := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, dep)
}
return retMe
}
15 changes: 15 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dependencyprocessor

import "time"

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
}

func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖
InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成
}
}
61 changes: 61 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type dependencyProcessor struct {
config *Config
aggregator *dependencyAggregator // Define the aggregator below.
telset component.TelemetrySettings
dependencyWriter *memory.Store
closeChan chan struct{}
}

func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor {
return &dependencyProcessor{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
closeChan: make(chan struct{}),
}
}

func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error {
tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter)
tp.aggregator.Start(tp.closeChan)
return nil
}

func (tp *dependencyProcessor) close(ctx context.Context) error {
close(tp.closeChan)
if tp.aggregator != nil {
if err := tp.aggregator.Close(); err != nil {
return fmt.Errorf("failed to stop the dependency aggregator : %w", err)
}
}
return nil
}

func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process
}
tp.aggregator.HandleSpan(span)
}
}
return td, nil
}
72 changes: 55 additions & 17 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ type Store struct {
sync.RWMutex
// Each tenant gets a copy of default config.
// In the future this can be extended to contain per-tenant configuration.
defaultConfig Configuration
perTenant map[string]*Tenant
defaultConfig Configuration
perTenant map[string]*Tenant
useNewDependencies bool
}

// Tenant is an in-memory store of traces for a single tenant
type Tenant struct {
sync.RWMutex
ids []*model.TraceID
traces map[model.TraceID]*model.Trace
services map[string]struct{}
operations map[string]map[spanstore.Operation]struct{}
deduper adjuster.Adjuster
config Configuration
index int
ids []*model.TraceID
traces map[model.TraceID]*model.Trace
services map[string]struct{}
operations map[string]map[spanstore.Operation]struct{}
deduper adjuster.Adjuster
config Configuration
index int
dependencyLinks map[string]*model.DependencyLink
}

// NewStore creates an unbounded in-memory store
Expand All @@ -48,19 +50,22 @@ func NewStore() *Store {
// WithConfiguration creates a new in memory storage based on the given configuration
func WithConfiguration(cfg Configuration) *Store {
return &Store{
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
useNewDependencies: false, // 添加初始化

}
}

func newTenant(cfg Configuration) *Tenant {
return &Tenant{
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
dependencyLinks: make(map[string]*model.DependencyLink),
}
}

Expand All @@ -83,6 +88,9 @@ func (st *Store) getTenant(tenantID string) *Tenant {

// GetDependencies returns dependencies between services
func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
if st.useNewDependencies { // 添加条件判断
return st.getDependenciesNew(ctx)
}
m := st.getTenant(tenancy.GetTenant(ctx))
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
Expand Down Expand Up @@ -119,6 +127,36 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback
return retMe, nil
}

func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks))
for _, dep := range m.dependencyLinks {
retMe = append(retMe, *dep)
}
return retMe, nil
}

func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error {
m := st.getTenant(tenancy.GetTenant(ctx))
m.Lock()
defer m.Unlock()
for _, dep := range dependencies {
key := dep.Parent + "&&&" + dep.Child
if _, ok := m.dependencyLinks[key]; !ok {
m.dependencyLinks[key] = &model.DependencyLink{
Parent: dep.Parent,
Child: dep.Child,
CallCount: dep.CallCount,
}
} else {
m.dependencyLinks[key].CallCount += dep.CallCount
}
}
return nil
}

func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
Expand Down

0 comments on commit 60fb334

Please sign in to comment.