diff --git a/Makefile b/Makefile index 210bca719..839c7a7b3 100644 --- a/Makefile +++ b/Makefile @@ -85,12 +85,10 @@ gen-sqlite: .PHONY: lint test check lint: - @echo you need to install 'sudo apt-get install -y libpcap-dev' staticcheck -version staticcheck ./... test: - @echo you need to install 'sudo apt-get install -y libpcap-dev' go test -v -race ./... check: lint test diff --git a/internal/aggregator/aggregator_handlers.go b/internal/aggregator/aggregator_handlers.go index d30fa260b..fa60913d4 100644 --- a/internal/aggregator/aggregator_handlers.go +++ b/internal/aggregator/aggregator_handlers.go @@ -195,6 +195,10 @@ func (a *Aggregator) handleSendSourceBucket3(_ context.Context, hctx *rpc.Handle } func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tlstatshouse.SendSourceBucket2Bytes, bucket tlstatshouse.SourceBucket2Bytes, version3 bool) (string, error, bool) { + a.configMu.RLock() + configR := a.configR + a.configMu.RUnlock() + rng := rand.New() now := time.Now() nowUnix := uint32(now.Unix()) @@ -229,7 +233,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl } // opportunistic mapping. We do not map addrStr. To find hosts with hostname not set use internal_log - if a.configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs { + if configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs { key := a.aggKey(nowUnix, format.BuiltinMetricIDAggOutdatedAgents, [16]int32{0, 0, 0, 0, ownerTagId, 0, int32(addrIPV4)}) key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAggOutdatedAgents) @@ -446,7 +450,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl } usedMetrics = append(usedMetrics, k.Metric) } - if a.configR.Shard == ShardAggregatorHash || a.configR.Shard == ShardAggregatorXXHash { + if configR.Shard == ShardAggregatorHash || configR.Shard == ShardAggregatorXXHash { // we unlock shard to caclculate hash not under it aggBucket.lockShard(&lockedShard, -1, &measurementLocks) } @@ -482,7 +486,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl now2 := time.Now() addValueCounterHost := func(metricInfo *format.MetricMetaValue, keys [16]int32, value float64, counter float64) { key := a.aggKey(args.Time, metricInfo.MetricID, keys) - key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + if metricInfo.WithAgentEnvRouteArch { + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + } a.sh2.AddValueCounterHost(key, value, counter, hostTagId, metricInfo) } diff --git a/internal/aggregator/aggregator_insert.go b/internal/aggregator/aggregator_insert.go index 9f8888007..cc90c9eeb 100644 --- a/internal/aggregator/aggregator_insert.go +++ b/internal/aggregator/aggregator_insert.go @@ -419,9 +419,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, return res } - var config ConfigAggregatorRemote + var configR ConfigAggregatorRemote a.configMu.RLock() - config = a.configR + configR = a.configR a.configMu.RUnlock() insertSizes := make(map[uint32]insertSize, len(buckets)) @@ -489,9 +489,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, recentTime := buckets[0].time // by convention first bucket is recent all other are historic sampler := data_model.NewSampler(itemsCount, data_model.SamplerConfig{ Meta: a.metricStorage, - SampleNamespaces: config.SampleNamespaces, - SampleGroups: config.SampleGroups, - SampleKeys: config.SampleKeys, + SampleNamespaces: configR.SampleNamespaces, + SampleGroups: configR.SampleGroups, + SampleKeys: configR.SampleKeys, Rand: rnd, SampleFactorF: func(metricID int32, sf float64) { key := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingFactor, [16]int32{0, 0, 0, 0, metricID, format.TagValueIDAggSamplingFactorReasonInsertSize}) @@ -506,7 +506,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, is := insertSize{} for si := 0; si < len(b.shards); si++ { for _, item := range b.shards[si].MultiItems { - whaleWeight := item.FinishStringTop(rnd, config.StringTopCountInsert) // all excess items are baked into Tail + whaleWeight := item.FinishStringTop(rnd, configR.StringTopCountInsert) // all excess items are baked into Tail resPos := len(res) res = appendMultiBadge(rnd, res, &item.Key, item, metricCache, usedTimestamps, v3Format) @@ -556,7 +556,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, numContributors += int(b.contributorsCount()) } resPos := len(res) - remainingBudget := int64(data_model.InsertBudgetFixed) + int64(config.InsertBudget*numContributors) + remainingBudget := int64(data_model.InsertBudgetFixed) + int64(configR.InsertBudget*numContributors) // Budget is per contributor, so if they come in 1% groups, total size will approx. fit // Also if 2x contributors come to spare, budget is also 2x sampler.Run(remainingBudget)