diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go new file mode 100644 index 00000000000..02ed6d149be --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -0,0 +1,205 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "sync" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +type dependencyAggregator struct { + config *Config + telset component.TelemetrySettings + dependencyWriter *memory.Store + traces map[model.TraceID]*TraceState + tracesLock sync.RWMutex + closeChan chan struct{} + beamPipeline *beam.Pipeline + beamScope beam.Scope +} + +type TraceState struct { + spans []*model.Span + spanMap map[model.SpanID]*model.Span + lastUpdateTime time.Time + timer *time.Timer +} + +func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator { + beam.Init() + p, s := beam.NewPipelineWithRoot() + return &dependencyAggregator{ + config: &cfg, + telset: telset, + dependencyWriter: dependencyWriter, + traces: make(map[model.TraceID]*TraceState), + beamPipeline: p, + beamScope: s, + } +} + +func (agg *dependencyAggregator) Start(closeChan chan struct{}) { + agg.closeChan = closeChan + go func() { + ticker := time.NewTicker(agg.config.AggregationInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + agg.processTraces(context.Background()) // Pass context + case <-agg.closeChan: + agg.processTraces(context.Background()) // Pass context + return + } + } + }() +} + +func (agg *dependencyAggregator) Close() error { + agg.tracesLock.Lock() + defer agg.tracesLock.Unlock() + for _, traceState := range agg.traces { + if traceState.timer != nil { + traceState.timer.Stop() + } + } + return nil +} + +func (agg *dependencyAggregator) HandleSpan(span *model.Span) { + agg.tracesLock.Lock() + defer agg.tracesLock.Unlock() + + traceState, ok := agg.traces[span.TraceID] + if !ok { + traceState = &TraceState{ + spans: []*model.Span{}, + spanMap: make(map[model.SpanID]*model.Span), + lastUpdateTime: time.Now(), + } + agg.traces[span.TraceID] = traceState + } + + traceState.spans = append(traceState.spans, span) + traceState.spanMap[span.SpanID] = span + traceState.lastUpdateTime = time.Now() + + if traceState.timer != nil { + traceState.timer.Stop() + } + traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { + agg.processTraces(context.Background()) // Pass context + }) +} + +func (agg *dependencyAggregator) processTraces(ctx context.Context) { + agg.tracesLock.Lock() + if len(agg.traces) == 0 { + agg.tracesLock.Unlock() + return + } + traces := agg.traces + agg.traces = make(map[model.TraceID]*TraceState) + agg.tracesLock.Unlock() + for _, traceState := range traces { + if traceState.timer != nil { + traceState.timer.Stop() + } + } + + beamInput := agg.createBeamInput(traces) + if beamInput.IsValid() { + agg.calculateDependencies(ctx, beamInput) + } +} + +func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection { + var allSpans []*model.Span + for _, traceState := range traces { + allSpans = append(allSpans, traceState.spans...) + } + if len(allSpans) == 0 { + return beam.PCollection{} + } + return beam.CreateList(agg.beamScope, allSpans) +} + +func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { + keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) { + return s.TraceID, s + }, input) + + groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) + depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink { + deps := map[string]*model.DependencyLink{} + var span *model.Span + for spansIter(span) { + processSpan(deps, span, agg.traces) + } + return depMapToSlice(deps) + }, groupedSpans) + flattened := beam.Flatten(agg.beamScope, depLinks) + + beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) { + err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps) + if err != nil { + agg.telset.Logger.Error("Error writing dependencies", zap.Error(err)) + } + }, flattened) + + go func() { + err := beamx.Run(ctx, agg.beamPipeline) + if err != nil { + agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err)) + } + agg.beamPipeline = beam.NewPipeline() + agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"} + }() +} + +// processSpan is a copy from the memory storage plugin +func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) { + parentSpan := seekToSpan(s, traces) + if parentSpan != nil { + if parentSpan.Process.ServiceName == s.Process.ServiceName { + return + } + depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName + if _, ok := deps[depKey]; !ok { + deps[depKey] = &model.DependencyLink{ + Parent: parentSpan.Process.ServiceName, + Child: s.Process.ServiceName, + CallCount: 1, + } + } else { + deps[depKey].CallCount++ + } + } +} + +func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span { + traceState, ok := traces[span.TraceID] + if !ok { + return nil + } + return traceState.spanMap[span.ParentSpanID()] +} + +// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin +func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { + retMe := make([]*model.DependencyLink, 0, len(deps)) + for _, dep := range deps { + retMe = append(retMe, dep) + } + return retMe +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go new file mode 100644 index 00000000000..929439c1c45 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -0,0 +1,15 @@ +package dependencyprocessor + +import "time" + +type Config struct { + AggregationInterval time.Duration `yaml:"aggregation_interval"` + InactivityTimeout time.Duration `yaml:"inactivity_timeout"` +} + +func DefaultConfig() Config { + return Config{ + AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖 + InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成 + } +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go new file mode 100644 index 00000000000..01d2185792b --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -0,0 +1,61 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" +) + +type dependencyProcessor struct { + config *Config + aggregator *dependencyAggregator // Define the aggregator below. + telset component.TelemetrySettings + dependencyWriter *memory.Store + closeChan chan struct{} +} + +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor { + return &dependencyProcessor{ + config: &cfg, + telset: telset, + dependencyWriter: dependencyWriter, + closeChan: make(chan struct{}), + } +} + +func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error { + tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) + tp.aggregator.Start(tp.closeChan) + return nil +} + +func (tp *dependencyProcessor) close(ctx context.Context) error { + close(tp.closeChan) + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the dependency aggregator : %w", err) + } + } + return nil +} + +func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches := v1adapter.ProtoFromTraces(td) + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + tp.aggregator.HandleSpan(span) + } + } + return td, nil +} diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 02dff26a584..0ae0b70f979 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -24,20 +24,22 @@ type Store struct { sync.RWMutex // Each tenant gets a copy of default config. // In the future this can be extended to contain per-tenant configuration. - defaultConfig Configuration - perTenant map[string]*Tenant + defaultConfig Configuration + perTenant map[string]*Tenant + useNewDependencies bool } // Tenant is an in-memory store of traces for a single tenant type Tenant struct { sync.RWMutex - ids []*model.TraceID - traces map[model.TraceID]*model.Trace - services map[string]struct{} - operations map[string]map[spanstore.Operation]struct{} - deduper adjuster.Adjuster - config Configuration - index int + ids []*model.TraceID + traces map[model.TraceID]*model.Trace + services map[string]struct{} + operations map[string]map[spanstore.Operation]struct{} + deduper adjuster.Adjuster + config Configuration + index int + dependencyLinks map[string]*model.DependencyLink } // NewStore creates an unbounded in-memory store @@ -48,19 +50,22 @@ func NewStore() *Store { // WithConfiguration creates a new in memory storage based on the given configuration func WithConfiguration(cfg Configuration) *Store { return &Store{ - defaultConfig: cfg, - perTenant: make(map[string]*Tenant), + defaultConfig: cfg, + perTenant: make(map[string]*Tenant), + useNewDependencies: false, // 添加初始化 + } } func newTenant(cfg Configuration) *Tenant { return &Tenant{ - ids: make([]*model.TraceID, cfg.MaxTraces), - traces: map[model.TraceID]*model.Trace{}, - services: map[string]struct{}{}, - operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDUniquifier(), - config: cfg, + ids: make([]*model.TraceID, cfg.MaxTraces), + traces: map[model.TraceID]*model.Trace{}, + services: map[string]struct{}{}, + operations: map[string]map[spanstore.Operation]struct{}{}, + deduper: adjuster.ZipkinSpanIDUniquifier(), + config: cfg, + dependencyLinks: make(map[string]*model.DependencyLink), } } @@ -83,6 +88,9 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + if st.useNewDependencies { // 添加条件判断 + return st.getDependenciesNew(ctx) + } m := st.getTenant(tenancy.GetTenant(ctx)) // deduper used below can modify the spans, so we take an exclusive lock m.Lock() @@ -119,6 +127,36 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback return retMe, nil } +func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) + m.RLock() + defer m.RUnlock() + retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks)) + for _, dep := range m.dependencyLinks { + retMe = append(retMe, *dep) + } + return retMe, nil +} + +func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error { + m := st.getTenant(tenancy.GetTenant(ctx)) + m.Lock() + defer m.Unlock() + for _, dep := range dependencies { + key := dep.Parent + "&&&" + dep.Child + if _, ok := m.dependencyLinks[key]; !ok { + m.dependencyLinks[key] = &model.DependencyLink{ + Parent: dep.Parent, + Child: dep.Child, + CallCount: dep.CallCount, + } + } else { + m.dependencyLinks[key].CallCount += dep.CallCount + } + } + return nil +} + func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { for _, s := range trace.Spans { if s.SpanID == spanID {