Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

yunmaoQu
Copy link

Which problem is this PR solving?

Resolves #5911

Description of the changes

  • add dependency processor using Apache Beam

How was this change tested?

  • e2e tests

Checklist

@yunmaoQu yunmaoQu requested a review from a team as a code owner January 17, 2025 17:37
@yunmaoQu yunmaoQu requested a review from joe-elliott January 17, 2025 17:37
@yunmaoQu yunmaoQu force-pushed the add-dependency-processor branch from af5f794 to 60fb334 Compare January 17, 2025 17:42
perTenant: make(map[string]*Tenant),
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
useNewDependencies: false, // 添加初始化
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use English in comments

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is placed under /processors/ is needs to follow the OTEL Processor framework patterns, like having a factory, etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

@yurishkuro yurishkuro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • where is it hooked up to anything?
  • what would be the e2e testing for this component?

@yunmaoQu
Copy link
Author

yunmaoQu commented Jan 18, 2025

  • where is it hooked up to anything?
  • what would be the e2e testing for this component?

@yurishkuro I have fixed it

Copy link
Member

@yurishkuro yurishkuro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahadzaryab1 interesting direction here

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
Store *memory.Store `yaml:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage is not part of the config. The processor itself can retrieve storage from storage extension - see adaptive sampling processor for example.

)

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the fields

require.NoError(t, err)

// Wait for the processor to process the trace
time.Sleep(cfg.AggregationInterval + 100*time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use sleep in tests, use assert.Eventually


func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too frequent. We used to flush every 15min at Uber, since the topology does not change that often, but I can see a need for more frequent flushing if we want to capture movement of metrics.

Suggested change
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
AggregationInterval: 10 * time.Minute,

The comment is redundant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to flush every minute to support investigating metrics movement, but it will create too much data. We may have to consider integrating with Prometheus instead, and only focus on capturing topology here, which definitely does not need 1min frequency.

}

// createDefaultConfig returns the default configuration for the dependency processor.
func createDefaultConfig() component.Config {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it repeated vs config? Please keep in one place.

"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type dependencyProcessor struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not using Apache beam.

I think that's not a bad thing in the initial phase. When running with all in one, beam is not needed (unless we want to support checkpoints). But in multi-collector deployment this solution won't work. We can address it in the following steps.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not using Apache beam.

I think that's not a bad thing in the initial phase. When running with all in one, beam is not needed (unless we want to support checkpoints). But in multi-collector deployment this solution won't work. We can address it in the following steps.

Except this ,I update all based on your review.

func TestDependencyProcessorEndToEnd(t *testing.T) {
// Create a mock receiver, processor, and exporter
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use very short aggregation interval here

Signed-off-by: yunmaoQu <[email protected]>
@yunmaoQu
Copy link
Author

yunmaoQu commented Jan 20, 2025

@yurishkuro Except this ,I update all based on your review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement in-memory Service Dependency Graph using Apache Beam
2 participants