-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add dependency processor using Apache Beam #6560
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: yunmaoQu <[email protected]>
af5f794
to
60fb334
Compare
plugin/storage/memory/memory.go
Outdated
perTenant: make(map[string]*Tenant), | ||
defaultConfig: cfg, | ||
perTenant: make(map[string]*Tenant), | ||
useNewDependencies: false, // 添加初始化 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use English in comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
// Copyright (c) 2025 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package dependencyprocessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is placed under /processors/ is needs to follow the OTEL Processor framework patterns, like having a factory, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- where is it hooked up to anything?
- what would be the e2e testing for this component?
@yurishkuro I have fixed it |
Signed-off-by: yunmaoQu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mahadzaryab1 interesting direction here
type Config struct { | ||
AggregationInterval time.Duration `yaml:"aggregation_interval"` | ||
InactivityTimeout time.Duration `yaml:"inactivity_timeout"` | ||
Store *memory.Store `yaml:"-"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage is not part of the config. The processor itself can retrieve storage from storage extension - see adaptive sampling processor for example.
) | ||
|
||
type Config struct { | ||
AggregationInterval time.Duration `yaml:"aggregation_interval"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the fields
require.NoError(t, err) | ||
|
||
// Wait for the processor to process the trace | ||
time.Sleep(cfg.AggregationInterval + 100*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use sleep in tests, use assert.Eventually
|
||
func DefaultConfig() Config { | ||
return Config{ | ||
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is way too frequent. We used to flush every 15min at Uber, since the topology does not change that often, but I can see a need for more frequent flushing if we want to capture movement of metrics.
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds | |
AggregationInterval: 10 * time.Minute, |
The comment is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to flush every minute to support investigating metrics movement, but it will create too much data. We may have to consider integrating with Prometheus instead, and only focus on capturing topology here, which definitely does not need 1min frequency.
} | ||
|
||
// createDefaultConfig returns the default configuration for the dependency processor. | ||
func createDefaultConfig() component.Config { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it repeated vs config? Please keep in one place.
"github.com/jaegertracing/jaeger/storage_v2/v1adapter" | ||
) | ||
|
||
type dependencyProcessor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not using Apache beam.
I think that's not a bad thing in the initial phase. When running with all in one, beam is not needed (unless we want to support checkpoints). But in multi-collector deployment this solution won't work. We can address it in the following steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not using Apache beam.
I think that's not a bad thing in the initial phase. When running with all in one, beam is not needed (unless we want to support checkpoints). But in multi-collector deployment this solution won't work. We can address it in the following steps.
Except this ,I update all based on your review.
func TestDependencyProcessorEndToEnd(t *testing.T) { | ||
// Create a mock receiver, processor, and exporter | ||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig().(*Config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use very short aggregation interval here
Signed-off-by: yunmaoQu <[email protected]>
@yurishkuro Except this ,I update all based on your review. |
Which problem is this PR solving?
Resolves #5911
Description of the changes
How was this change tested?
Checklist
jaeger
:make lint test
jaeger-ui
:npm run lint
andnpm run test