Skip to content

Commit

Permalink
feat: rename DisableTraceLocality to TraceCache
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Nov 27, 2024
1 parent b357020 commit 1d046ee
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 40 deletions.
2 changes: 1 addition & 1 deletion app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {
redisDB := 12 + i
cfg := defaultConfig(basePort, redisDB)
collectionCfg := cfg.GetCollectionConfig()
collectionCfg.DisableTraceLocality = true
collectionCfg.TraceCache = "distributed"
cfg.GetCollectionConfigVal = collectionCfg

apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
Expand Down
40 changes: 20 additions & 20 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (i *InMemCollector) Start() error {
i.Peers.RegisterUpdatedPeersCallback(i.redistributeTimer.Reset)
}

if i.Config.GetCollectionConfig().DisableTraceLocality {
if !i.Config.GetCollectionConfig().TraceLocalityEnabled() {
i.keptDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisionMessages = make(chan string, decisionMessageBufferSize)
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
Expand Down Expand Up @@ -524,7 +524,7 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) {

for _, sp := range trace.GetSpans() {

if i.Config.GetCollectionConfig().DisableTraceLocality {
if !i.Config.GetCollectionConfig().TraceLocalityEnabled() {
dc := i.createDecisionSpan(sp, trace, newTarget)
i.PeerTransmission.EnqueueEvent(dc)
continue
Expand Down Expand Up @@ -669,21 +669,21 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source
isMyTrace bool
)
// if trace locality is enabled, we should forward all spans to its correct peer
if i.Config.GetCollectionConfig().DisableTraceLocality {
targetShard, isMyTrace = i.IsMyTrace(sp.TraceID)
// if the span is a decision span and the trace no longer belong to us, we should not forward it to the peer
if !isMyTrace && sp.IsDecisionSpan() {
return
}

} else {
if i.Config.GetCollectionConfig().TraceLocalityEnabled() {
targetShard = i.Sharder.WhichShard(sp.TraceID)
isMyTrace = true
if !targetShard.Equals(i.Sharder.MyShard()) {
sp.APIHost = targetShard.GetAddress()
i.PeerTransmission.EnqueueSpan(sp)
return
}
} else {
targetShard, isMyTrace = i.IsMyTrace(sp.TraceID)
// if the span is a decision span and the trace no longer belong to us, we should not forward it to the peer
if !isMyTrace && sp.IsDecisionSpan() {
return
}

}

tcfg := i.Config.GetTracesConfig()
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func (i *InMemCollector) Stop() error {
close(i.fromPeer)
close(i.outgoingTraces)

if i.Config.GetCollectionConfig().DisableTraceLocality {
if !i.Config.GetCollectionConfig().TraceLocalityEnabled() {
close(i.dropDecisionBuffer)
close(i.keptDecisionBuffer)
}
Expand Down Expand Up @@ -1527,23 +1527,23 @@ func (i *InMemCollector) makeDecision(ctx context.Context, trace *types.Trace, s
HasRoot: hasRoot,
}

if i.Config.GetCollectionConfig().DisableTraceLocality {
if !i.Config.GetCollectionConfig().TraceLocalityEnabled() {
i.publishTraceDecision(ctx, td)
}

return &td, nil
}

func (i *InMemCollector) IsMyTrace(traceID string) (sharder.Shard, bool) {
if i.Config.GetCollectionConfig().TraceLocalityEnabled() {
return i.Sharder.MyShard(), true
}

// if trace locality is disabled, we should only process
// traces that belong to the current refinery
if i.Config.GetCollectionConfig().DisableTraceLocality {
targeShard := i.Sharder.WhichShard(traceID)

return targeShard, i.Sharder.MyShard().Equals(targeShard)
}
targeShard := i.Sharder.WhichShard(traceID)

return i.Sharder.MyShard(), true
return targeShard, i.Sharder.MyShard().Equals(targeShard)

}

Expand Down Expand Up @@ -1578,7 +1578,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
}

func (i *InMemCollector) sendKeptDecisions() {
if !i.Config.GetCollectionConfig().DisableTraceLocality {
if i.Config.GetCollectionConfig().TraceLocalityEnabled() {
return
}
interval := time.Duration(i.Config.GetCollectionConfig().KeptDecisionSendInterval)
Expand All @@ -1589,7 +1589,7 @@ func (i *InMemCollector) sendKeptDecisions() {
}

func (i *InMemCollector) sendDropDecisions() {
if !i.Config.GetCollectionConfig().DisableTraceLocality {
if i.Config.GetCollectionConfig().TraceLocalityEnabled() {
return
}
interval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
Expand Down
14 changes: 7 additions & 7 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
redistributeTimer: redistributeNotifier,
}

if conf.GetCollectionConfig().DisableTraceLocality {
if !conf.GetCollectionConfig().TraceLocalityEnabled() {
localPubSub.Subscribe(context.Background(), keptTraceDecisionTopic, c.signalKeptTraceDecisions)
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func TestAddRootSpan(t *testing.T) {

conf.Mux.Lock()
collectionCfg := conf.GetCollectionConfigVal
collectionCfg.DisableTraceLocality = true
collectionCfg.TraceCache = "distributed"
conf.GetCollectionConfigVal = collectionCfg
conf.Mux.Unlock()

Expand Down Expand Up @@ -527,8 +527,8 @@ func TestDryRunMode(t *testing.T) {
DryRun: true,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
DisableTraceLocality: true,
ShutdownDelay: config.Duration(1 * time.Millisecond),
TraceCache: "distributed",
},
}
transmission := &transmit.MockTransmission{}
Expand Down Expand Up @@ -2233,8 +2233,8 @@ func TestSendDropDecisions(t *testing.T) {
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
DisableTraceLocality: true,
ShutdownDelay: config.Duration(1 * time.Millisecond),
TraceCache: "distributed",
},
}
transmission := &transmit.MockTransmission{}
Expand Down Expand Up @@ -2318,7 +2318,7 @@ func TestExpiredTracesCleanup(t *testing.T) {
MaxBatchSize: 1500,
},
GetCollectionConfigVal: config.CollectionConfig{
DisableTraceLocality: true,
TraceCache: "distributed",
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
AddSpanCountToRoot: true,
Expand Down
2 changes: 1 addition & 1 deletion collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (s *StressRelief) Recalc() uint {
// If a single node is under significant stress, it can activate stress relief mode
overallStressLevel := uint(math.Max(float64(clusterStressLevel), float64(localLevel)))

if s.Config.GetCollectionConfig().DisableTraceLocality {
if !s.Config.GetCollectionConfig().TraceLocalityEnabled() {
overallStressLevel = clusterStressLevel
}
s.overallStressLevel = overallStressLevel
Expand Down
2 changes: 1 addition & 1 deletion collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestStressRelief_OverallStressLevel_DisableTraceLocality(t *testing.T) {
MinimumActivationDuration: config.Duration(5 * time.Second),
},
GetCollectionConfigVal: config.CollectionConfig{
DisableTraceLocality: true,
TraceCache: "distributed",
},
}
// On startup, the stress relief should not be active
Expand Down
17 changes: 15 additions & 2 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ type CollectionConfig struct {
DisableRedistribution bool `yaml:"DisableRedistribution"`
RedistributionDelay Duration `yaml:"RedistributionDelay" default:"30s"`

ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"`
DisableTraceLocality bool `yaml:"DisableTraceLocality"`
ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"`
TraceCache string `yaml:"TraceCache" default:"concentrated"`

MaxDropDecisionBatchSize int `yaml:"MaxDropDecisionBatchSize" default:"1000"`
DropDecisionSendInterval Duration `yaml:"DropDecisionSendInterval" default:"1s"`
Expand Down Expand Up @@ -355,6 +355,19 @@ func (c CollectionConfig) GetIncomingQueueSize() int {
return c.IncomingQueueSize
}

// TraceLocalityEnabled returns whether trace locality is enabled.
func (c CollectionConfig) TraceLocalityEnabled() bool {
switch c.TraceCache {
case "concentrated":
return true
case "distributed":
return false
default:
// Default to true for backwards compatibility
return true
}
}

type BufferSizeConfig struct {
UpstreamBufferSize int `yaml:"UpstreamBufferSize" default:"10_000"`
PeerBufferSize int `yaml:"PeerBufferSize" default:"100_000"`
Expand Down
14 changes: 7 additions & 7 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ groups:
`meta.refinery.dryrun.sample_rate` will be set to the sample rate
that would have been used.
NOTE: This setting is not compatible with `DisableTraceLocality=true`,
NOTE: This setting is not compatible with `TraceCache=distributed`,
because drop trace decisions shared among peers do not contain all
the relevant information needed to send traces to Honeycomb.
Expand Down Expand Up @@ -1340,22 +1340,22 @@ groups:
This value should be set to a bit less than the normal timeout period
for shutting down without forcibly terminating the process.
- name: DisableTraceLocality
type: bool
- name: TraceCache
type: string
valuetype: nondefault
firstversion: v2.9
default: false
default: "concentrated"
reload: false
summary: controls whether all spans that belongs to the same trace are sent to a single Refinery for processing.
description: >
When `false`, Refinery will route all spans that belong to the same trace to a single peer. This is the
default behavior ("Trace Locality") and the way Refinery has worked in the past. When `true`, Refinery
When `concentrated`, Refinery will route all spans that belong to the same trace to a single peer. This is the
default behavior ("Trace Locality") and the way Refinery has worked in the past. When `distributed`, Refinery
will instead keep spans on the node where they were received, and forward proxy spans that contain only
the key information needed to make a trace decision. This can reduce the amount of traffic between peers
in most cases, and can help avoid a situation where a single large trace can cause a memory overrun on
a single node.
If `true`, the amount of traffic between peers will be reduced, but the amount of traffic between Refinery
If `distributed`, the amount of traffic between peers will be reduced, but the amount of traffic between Refinery
and Redis will significantly increase, because Refinery uses Redis to distribute the trace decisions to all
nodes in the cluster. It is important to adjust the size of the Redis cluster in this case.
Expand Down
2 changes: 1 addition & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
}
}

if !r.Config.GetCollectionConfig().DisableTraceLocality {
if r.Config.GetCollectionConfig().TraceLocalityEnabled() {
// Figure out if we should handle this span locally or pass on to a peer
targetShard := r.Sharder.WhichShard(traceID)
if !targetShard.Equals(r.Sharder.MyShard()) {
Expand Down

0 comments on commit 1d046ee

Please sign in to comment.