From 05dc78fba88df0bf8cd2abcae8fc4242d001157e Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Mon, 20 Jan 2025 07:45:02 +0000 Subject: [PATCH] update Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../processors/dependencyprocessor/config.go | 17 +++++++---------- .../processors/dependencyprocessor/e2e_test.go | 6 +++++- .../processors/dependencyprocessor/factory.go | 4 +--- plugin/storage/memory/memory.go | 5 ++--- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go index 8f828f87073..ed6750cded1 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/config.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -10,15 +10,12 @@ import ( ) type Config struct { + // AggregationInterval defines how often the processor aggregates dependencies. + // This controls the frequency of flushing dependency data to storage. + // Default dependency aggregation interval: 10 seconds AggregationInterval time.Duration `yaml:"aggregation_interval"` - InactivityTimeout time.Duration `yaml:"inactivity_timeout"` - Store *memory.Store `yaml:"-"` -} - -func DefaultConfig() Config { - return Config{ - AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds - InactivityTimeout: 2 * time.Second, // Default trace completion timeout: 2 seconds of inactivity - Store: memory.NewStore(), - } + // InactivityTimeout specifies the duration of inactivity after which a trace + // is considered complete and ready for dependency aggregation. + // Default trace completion timeout: 2 seconds of inactivity + InactivityTimeout time.Duration `yaml:"inactivity_timeout"` } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go index 57f6bb0ec3f..b5398d123cd 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go @@ -25,6 +25,7 @@ func TestDependencyProcessorEndToEnd(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) + cfg.AggregationInterval = 1 * time.Second // Create a mock next consumer (exporter) sink := new(consumertest.TracesSink) @@ -55,7 +56,10 @@ func TestDependencyProcessorEndToEnd(t *testing.T) { require.NoError(t, err) // Wait for the processor to process the trace - time.Sleep(cfg.AggregationInterval + 100*time.Millisecond) + assert.Eventually(t, func() bool { + deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) + return err == nil && len(deps) > 0 + }, cfg.AggregationInterval+time.Second, 100*time.Millisecond) // Verify dependency links deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go index 083be248d6b..ee063431f83 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -26,12 +26,10 @@ func NewFactory() processor.Factory { ) } -// createDefaultConfig returns the default configuration for the dependency processor. func createDefaultConfig() component.Config { return &Config{ - AggregationInterval: 5 * time.Second, + AggregationInterval: 10 * time.Second, InactivityTimeout: 2 * time.Second, - Store: memory.NewStore(), } } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 0ae0b70f979..43c0478c60b 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -52,8 +52,7 @@ func WithConfiguration(cfg Configuration) *Store { return &Store{ defaultConfig: cfg, perTenant: make(map[string]*Tenant), - useNewDependencies: false, // 添加初始化 - + useNewDependencies: false, } } @@ -88,7 +87,7 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - if st.useNewDependencies { // 添加条件判断 + if st.useNewDependencies { return st.getDependenciesNew(ctx) } m := st.getTenant(tenancy.GetTenant(ctx))