Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: yunmaoQu <[email protected]>
  • Loading branch information
yunmaoQu committed Jan 20, 2025
1 parent 5ccf68d commit 05dc78f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
17 changes: 7 additions & 10 deletions cmd/jaeger/internal/processors/dependencyprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ import (
)

type Config struct {
// AggregationInterval defines how often the processor aggregates dependencies.
// This controls the frequency of flushing dependency data to storage.
// Default dependency aggregation interval: 10 seconds
AggregationInterval time.Duration `yaml:"aggregation_interval"`
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
Store *memory.Store `yaml:"-"`
}

func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
InactivityTimeout: 2 * time.Second, // Default trace completion timeout: 2 seconds of inactivity
Store: memory.NewStore(),
}
// InactivityTimeout specifies the duration of inactivity after which a trace
// is considered complete and ready for dependency aggregation.
// Default trace completion timeout: 2 seconds of inactivity
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestDependencyProcessorEndToEnd(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

cfg.AggregationInterval = 1 * time.Second
// Create a mock next consumer (exporter)
sink := new(consumertest.TracesSink)

Expand Down Expand Up @@ -55,7 +56,10 @@ func TestDependencyProcessorEndToEnd(t *testing.T) {
require.NoError(t, err)

// Wait for the processor to process the trace
time.Sleep(cfg.AggregationInterval + 100*time.Millisecond)
assert.Eventually(t, func() bool {
deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval)
return err == nil && len(deps) > 0
}, cfg.AggregationInterval+time.Second, 100*time.Millisecond)

// Verify dependency links
deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ func NewFactory() processor.Factory {
)
}

// createDefaultConfig returns the default configuration for the dependency processor.
func createDefaultConfig() component.Config {
return &Config{
AggregationInterval: 5 * time.Second,
AggregationInterval: 10 * time.Second,
InactivityTimeout: 2 * time.Second,
Store: memory.NewStore(),
}
}

Expand Down
5 changes: 2 additions & 3 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func WithConfiguration(cfg Configuration) *Store {
return &Store{
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
useNewDependencies: false, // 添加初始化

useNewDependencies: false,
}
}

Expand Down Expand Up @@ -88,7 +87,7 @@ func (st *Store) getTenant(tenantID string) *Tenant {

// GetDependencies returns dependencies between services
func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
if st.useNewDependencies { // 添加条件判断
if st.useNewDependencies {
return st.getDependenciesNew(ctx)
}
m := st.getTenant(tenancy.GetTenant(ctx))
Expand Down

0 comments on commit 05dc78f

Please sign in to comment.