Skip to content

Commit

Permalink
tiny tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 15, 2025
1 parent 0005cf0 commit 4433e7f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ gen-sqlite:

.PHONY: lint test check
lint:
@echo you need to install 'sudo apt-get install -y libpcap-dev'
staticcheck -version
staticcheck ./...

test:
@echo you need to install 'sudo apt-get install -y libpcap-dev'
go test -v -race ./...

check: lint test
12 changes: 9 additions & 3 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (a *Aggregator) handleSendSourceBucket3(_ context.Context, hctx *rpc.Handle
}

func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tlstatshouse.SendSourceBucket2Bytes, bucket tlstatshouse.SourceBucket2Bytes, version3 bool) (string, error, bool) {
a.configMu.RLock()
configR := a.configR
a.configMu.RUnlock()

rng := rand.New()
now := time.Now()
nowUnix := uint32(now.Unix())
Expand Down Expand Up @@ -229,7 +233,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
// opportunistic mapping. We do not map addrStr. To find hosts with hostname not set use internal_log

if a.configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs {
if configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs {
key := a.aggKey(nowUnix, format.BuiltinMetricIDAggOutdatedAgents, [16]int32{0, 0, 0, 0, ownerTagId, 0, int32(addrIPV4)})
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAggOutdatedAgents)
Expand Down Expand Up @@ -446,7 +450,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
usedMetrics = append(usedMetrics, k.Metric)
}
if a.configR.Shard == ShardAggregatorHash || a.configR.Shard == ShardAggregatorXXHash {
if configR.Shard == ShardAggregatorHash || configR.Shard == ShardAggregatorXXHash {
// we unlock shard to caclculate hash not under it
aggBucket.lockShard(&lockedShard, -1, &measurementLocks)
}
Expand Down Expand Up @@ -482,7 +486,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
now2 := time.Now()
addValueCounterHost := func(metricInfo *format.MetricMetaValue, keys [16]int32, value float64, counter float64) {
key := a.aggKey(args.Time, metricInfo.MetricID, keys)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
if metricInfo.WithAgentEnvRouteArch {
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
}
a.sh2.AddValueCounterHost(key, value, counter, hostTagId, metricInfo)
}

Expand Down
14 changes: 7 additions & 7 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
return res
}

var config ConfigAggregatorRemote
var configR ConfigAggregatorRemote
a.configMu.RLock()
config = a.configR
configR = a.configR
a.configMu.RUnlock()

insertSizes := make(map[uint32]insertSize, len(buckets))
Expand Down Expand Up @@ -489,9 +489,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
recentTime := buckets[0].time // by convention first bucket is recent all other are historic
sampler := data_model.NewSampler(itemsCount, data_model.SamplerConfig{
Meta: a.metricStorage,
SampleNamespaces: config.SampleNamespaces,
SampleGroups: config.SampleGroups,
SampleKeys: config.SampleKeys,
SampleNamespaces: configR.SampleNamespaces,
SampleGroups: configR.SampleGroups,
SampleKeys: configR.SampleKeys,
Rand: rnd,
SampleFactorF: func(metricID int32, sf float64) {
key := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingFactor, [16]int32{0, 0, 0, 0, metricID, format.TagValueIDAggSamplingFactorReasonInsertSize})
Expand All @@ -506,7 +506,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
is := insertSize{}
for si := 0; si < len(b.shards); si++ {
for _, item := range b.shards[si].MultiItems {
whaleWeight := item.FinishStringTop(rnd, config.StringTopCountInsert) // all excess items are baked into Tail
whaleWeight := item.FinishStringTop(rnd, configR.StringTopCountInsert) // all excess items are baked into Tail

resPos := len(res)
res = appendMultiBadge(rnd, res, &item.Key, item, metricCache, usedTimestamps, v3Format)
Expand Down Expand Up @@ -556,7 +556,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
numContributors += int(b.contributorsCount())
}
resPos := len(res)
remainingBudget := int64(data_model.InsertBudgetFixed) + int64(config.InsertBudget*numContributors)
remainingBudget := int64(data_model.InsertBudgetFixed) + int64(configR.InsertBudget*numContributors)
// Budget is per contributor, so if they come in 1% groups, total size will approx. fit
// Also if 2x contributors come to spare, budget is also 2x
sampler.Run(remainingBudget)
Expand Down

0 comments on commit 4433e7f

Please sign in to comment.