Skip to content

Commit

Permalink
better tags mapper 2
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 15, 2025
1 parent 67df006 commit 7838c6a
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 27 deletions.
2 changes: 1 addition & 1 deletion internal/agent/agent_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (s *Shard) addBuiltInsLocked() {
if elements > 0 {
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheElements, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(elements), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheSize, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(sumSize), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheTTL, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(s.CurrentTime)-float64(averageTS), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheAverageTTL, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(s.CurrentTime)-float64(averageTS), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventAdd}).Tail.AddCounterHost(s.rng, float64(adds), 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventEvict}).Tail.AddCounterHost(s.rng, float64(evicts), 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventTimestampUpdate}).Tail.AddCounterHost(s.rng, float64(timestampUpdates), 0)
Expand Down
2 changes: 2 additions & 0 deletions internal/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ func (a *Aggregator) agentBeforeFlushBucketFunc(_ *agent.Agent, nowUnix uint32)
key = a.aggKey(nowUnix, format.BuiltinMetricIDAggActiveSenders, [16]int32{0, 0, 0, 0, format.TagValueIDConveyorHistoric})
a.sh2.AddValueCounterHost(key, float64(historicSends), 1, a.aggregatorHost, format.BuiltinMetricMetaAggActiveSenders)

key = a.aggKey(nowUnix, format.BuiltinMetricIDMappingQueueSize, [16]int32{})
a.sh2.AddValueCounterHost(key, float64(a.tagsMapper2.UnknownTagsLen()), 1, a.aggregatorHost, format.BuiltinMetricMetaMappingQueueSize)
/* TODO - replace with direct agent call
a.metricStorage.MetricsMu.Lock()
Expand Down
11 changes: 9 additions & 2 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
aggBucket.lockShard(&lockedShard, -1, &measurementLocks)
}

a.tagsMapper2.AddUnknownTags(unknownTags, aggBucket.time)
unknownMapRemove, unknownMapAdd, unknownListAdd, createMapAdd := a.tagsMapper2.AddUnknownTags(unknownTags, aggBucket.time)

aggBucket.mu.Lock()

Expand Down Expand Up @@ -531,7 +531,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
addCounterHost := func(metricInfo *format.MetricMetaValue, keys [16]int32, counter float64) {
key := a.aggKey(args.Time, metricInfo.MetricID, keys)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
if metricInfo.WithAgentEnvRouteArch {
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
}
a.sh2.AddCounterHost(key, counter, hostTagId, metricInfo)
}

Expand All @@ -549,6 +551,11 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
addCounterHost(format.BuiltinMetricMetaMappingCacheEvent, [16]int32{0, format.TagValueIDComponentAggregator, format.TagValueIDMappingCacheEventHit}, float64(mappingHits))
addCounterHost(format.BuiltinMetricMetaMappingCacheEvent, [16]int32{0, format.TagValueIDComponentAggregator, format.TagValueIDMappingCacheEventMiss}, float64(mappingMisses))

addCounterHost(format.BuiltinMetricMetaMappingQueueEvent, [16]int32{0, format.TagValueIDMappingQueueEventUnknownMapRemove}, float64(unknownMapRemove))
addCounterHost(format.BuiltinMetricMetaMappingQueueEvent, [16]int32{0, format.TagValueIDMappingQueueEventUnknownMapAdd}, float64(unknownMapAdd))
addCounterHost(format.BuiltinMetricMetaMappingQueueEvent, [16]int32{0, format.TagValueIDMappingQueueEventUnknownListAdd}, float64(unknownListAdd))
addCounterHost(format.BuiltinMetricMetaMappingQueueEvent, [16]int32{0, format.TagValueIDMappingQueueEventCreateMapAdd}, float64(createMapAdd))

addValueCounterHost(format.BuiltinMetricMetaAggSizeCompressed, [16]int32{0, 0, 0, 0, conveyor, spare}, float64(len(hctx.Request)), 1)

addValueCounterHost(format.BuiltinMetricMetaAggSizeUncompressed, [16]int32{0, 0, 0, 0, conveyor, spare}, float64(args.OriginalSize), 1)
Expand Down
52 changes: 32 additions & 20 deletions internal/aggregator/tags_mapper2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ import (
"github.com/vkcom/statshouse/internal/format"
"github.com/vkcom/statshouse/internal/metajournal"
"github.com/vkcom/statshouse/internal/pcache"
"pgregory.net/rand"
)

const maxUknownTagsInBucket = 128 // keep for low at first, then increase gradually
const maxCreateOrLoadTagsPerIteration = 1024 // keep for low at first, then increase gradually
const tagUsedSecondsCounterToCreate = 10 // if used in 10 different seconds, then create
const clearUnknownTagsEverySeconds = 120 // must be relatively large so enough time to collect statistics
const maxUknownTagsInBucket = 128 // keep for low at first, then increase gradually
const maxCreateTagsPerIteration = 1024 // keep for low at first, then increase gradually
const maxLoadTagsPerIteration = 1024 // keep for low at first, then increase gradually
const tagUsedSecondsCounterToCreate = 10 // if used in 10 different seconds, then create
const maxUnknownTagsToKeep = 1_000_000
const maxSendKnownTagsToAgent = 256

type unknownTag struct { // fits well into cache line
time uint32
secondsCounter uint32
}

// TODO - metrics
// TODO - evict by size not time
type tagsMapper2 struct {
agg *Aggregator
sh2 *agent.Agent
Expand All @@ -41,9 +39,10 @@ type tagsMapper2 struct {
unknownTags map[string]unknownTag // collect statistics here
unknownTagsList []string // ordered list of keys. IF unknownTags is large, but does not change, we will try to load each key once and stop until new keys are added.
createTags map[string]format.CreateMappingExtra
clearTime time.Time // clear unknownTags periodically (but with random intervals)
}

// TODO make unknownTagsList algo.CircularSlice[string]

func NewTagsMapper2(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal.MetricsStorage, loader *metajournal.MetricMetaLoader) *tagsMapper2 {
ms := &tagsMapper2{
agg: agg,
Expand All @@ -56,26 +55,46 @@ func NewTagsMapper2(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajourna
return ms
}

func (ms *tagsMapper2) AddUnknownTags(unknownTags map[string]format.CreateMappingExtra, time uint32) {
func (ms *tagsMapper2) UnknownTagsLen() int {
ms.mu.Lock()
defer ms.mu.Unlock()
return len(ms.unknownTags)
}

func (ms *tagsMapper2) AddUnknownTags(unknownTags map[string]format.CreateMappingExtra, time uint32) (
unknownMapRemove int, unknownMapAdd int, unknownListAdd int, createMapAdd int) {
ms.mu.Lock()
defer ms.mu.Unlock()
var sumSeconds int64
for k, v := range ms.unknownTags { // can delete a bit more items than strictly necessary
if len(ms.unknownTags)+len(unknownTags) <= maxUnknownTagsToKeep {
break
}
unknownMapRemove++
sumSeconds += int64(v.secondsCounter)
delete(ms.unknownTags, k) // but stays in list, so list and map do not correspond 1-1 to each other
}
for k, v := range unknownTags {
u, ok := ms.unknownTags[k]
if time > u.time {
u.time = time
u.secondsCounter++
if u.secondsCounter > tagUsedSecondsCounterToCreate {
createMapAdd++
ms.createTags[k] = v
u.secondsCounter = 0
// we do not delete from ms.unknownTags, because it will be most likely added back immediately,
// but we clear counter, so we will not add to ms.createTags every iteration
}
ms.unknownTags[k] = u
if !ok {
unknownMapAdd++
if !ok && len(ms.unknownTagsList) < maxUnknownTagsToKeep {
unknownListAdd++
ms.unknownTagsList = append(ms.unknownTagsList, k)
}
}
}
return
}

func (ms *tagsMapper2) goRun() {
Expand All @@ -87,7 +106,7 @@ func (ms *tagsMapper2) goRun() {
counter := 0
for str, extra := range createTags {
counter++
if counter > maxCreateOrLoadTagsPerIteration {
if counter > maxCreateTagsPerIteration {
break // simply forget the rest, will load/create more on the next iteration
}
tagValue := ms.createTag(str, extra)
Expand Down Expand Up @@ -140,21 +159,14 @@ func (ms *tagsMapper2) createTag(str string, extra format.CreateMappingExtra) in
}

func (ms *tagsMapper2) getTagsToCreateOrLoad() (map[string]format.CreateMappingExtra, []string) {
now := time.Now()
ms.mu.Lock()
defer ms.mu.Unlock()
createTags := ms.createTags
ms.createTags = map[string]format.CreateMappingExtra{}
loadTags := ms.unknownTagsList
if len(loadTags) > maxCreateOrLoadTagsPerIteration {
loadTags = loadTags[:maxCreateOrLoadTagsPerIteration]
if len(loadTags) > maxLoadTagsPerIteration {
loadTags = loadTags[:maxLoadTagsPerIteration]
}
ms.unknownTagsList = ms.unknownTagsList[len(loadTags):]
if ms.clearTime.Before(now) {
ms.unknownTags = map[string]unknownTag{} // simply forget everything
ms.unknownTagsList = nil
sec := clearUnknownTagsEverySeconds + time.Duration(rand.Int63n(clearUnknownTagsEverySeconds))
ms.clearTime = now.Add(time.Second * sec)
}
return createTags, loadTags
}
4 changes: 3 additions & 1 deletion internal/format/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ var (
BuiltinMetricIDBudgetOwner: BuiltinMetricMetaBudgetOwner,
BuiltinMetricIDMappingCacheElements: BuiltinMetricMetaMappingCacheElements,
BuiltinMetricIDMappingCacheSize: BuiltinMetricMetaMappingCacheSize,
BuiltinMetricIDMappingCacheTTL: BuiltinMetricMetaMappingCacheTTL,
BuiltinMetricIDMappingCacheAverageTTL: BuiltinMetricMetaMappingCacheAverageTTL,
BuiltinMetricIDMappingCacheEvent: BuiltinMetricMetaMappingCacheEvent,
BuiltinMetricIDMappingQueueSize: BuiltinMetricMetaMappingQueueSize,
BuiltinMetricIDMappingQueueEvent: BuiltinMetricMetaMappingQueueEvent,
}

// this set is very small, and we do not want to set Visible property for hunderds of metrics
Expand Down
46 changes: 43 additions & 3 deletions internal/format/builtin_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2923,10 +2923,10 @@ var BuiltinMetricMetaMappingCacheSize = &MetricMetaValue{
}},
}

const BuiltinMetricIDMappingCacheTTL = -132
const BuiltinMetricIDMappingCacheAverageTTL = -132

var BuiltinMetricMetaMappingCacheTTL = &MetricMetaValue{
Name: "__mapping_cache_ttl",
var BuiltinMetricMetaMappingCacheAverageTTL = &MetricMetaValue{
Name: "__mapping_cache_average_ttl",
Kind: MetricKindValue,
Description: "Average cache element timestamp relative to now",
MetricType: MetricSecond,
Expand Down Expand Up @@ -2965,3 +2965,43 @@ var BuiltinMetricMetaMappingCacheEvent = &MetricMetaValue{
}),
}},
}

const BuiltinMetricIDMappingQueueSize = -134

var BuiltinMetricMetaMappingQueueSize = &MetricMetaValue{
Name: "__mapping_queue_size",
Kind: MetricKindValue,
Description: "Elements in aggregator new conveyor mapping queue",
NoSampleAgent: true,
BuiltinAllowedToReceive: false,
WithAgentEnvRouteArch: false,
WithAggregatorID: true,
Tags: []MetricMetaTag{{
Description: "-", // reserve for component
}},
}

const BuiltinMetricIDMappingQueueEvent = -135

var BuiltinMetricMetaMappingQueueEvent = &MetricMetaValue{
Name: "__mapping_queue_events",
Kind: MetricKindCounter,
Description: "Events in aggregator new conveyor mapping queue",
NoSampleAgent: true,
BuiltinAllowedToReceive: false,
WithAgentEnvRouteArch: false,
WithAggregatorID: true,
Tags: []MetricMetaTag{{
Description: "-", // reserve for component
}, {
Description: "event",
ValueComments: convertToValueComments(map[int32]string{
TagValueIDMappingQueueEventUnknownMapRemove: "uknown_map_remove",
TagValueIDMappingQueueEventUnknownMapAdd: "unknown_map_add",
TagValueIDMappingQueueEventUnknownListRemove: "unknown_list_remove",
TagValueIDMappingQueueEventUnknownListAdd: "unknown_list_add",
TagValueIDMappingQueueEventCreateMapAdd: "create_map_add",
TagValueIDMappingQueueEventCreateMapRemove: "create_map_remove",
}),
}},
}
7 changes: 7 additions & 0 deletions internal/format/builtin_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ const (
TagValueIDMappingCacheEventTimestampUpdateSkip = 4
TagValueIDMappingCacheEventAdd = 5
TagValueIDMappingCacheEventEvict = 6

TagValueIDMappingQueueEventUnknownMapRemove = 1
TagValueIDMappingQueueEventUnknownMapAdd = 2
TagValueIDMappingQueueEventUnknownListRemove = 3
TagValueIDMappingQueueEventUnknownListAdd = 4
TagValueIDMappingQueueEventCreateMapAdd = 5
TagValueIDMappingQueueEventCreateMapRemove = 6
)

var (
Expand Down

0 comments on commit 7838c6a

Please sign in to comment.