Skip to content

Commit

Permalink
Implement regex matcher cache on Store gateway: (#6491)
Browse files Browse the repository at this point in the history
* Implement regex matcher cache on SG

Signed-off-by: alanprot <[email protected]>

* changelog

Signed-off-by: alanprot <[email protected]>

* lint

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jan 7, 2025
1 parent bf8af36 commit 2e1700e
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 112 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester: Add support for cache query matchers via `-ingester.matchers-cache-max-items. #6477
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to
# ignore blocks that are marked for deletion with some delay. This ensures
Expand Down
6 changes: 5 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,10 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]

# Duration after which the blocks marked for deletion will be filtered out
# while fetching blocks. The idea of ignore-deletion-marks-delay is to ignore
# blocks that are marked for deletion with some delay. This ensures store can
Expand Down Expand Up @@ -3182,7 +3186,7 @@ instance_limits:
# CLI flag: -ingester.disable-chunk-trimming
[disable_chunk_trimming: <boolean> | default = false]
# Maximum number of entries in the matchers cache. 0 to disable.
# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -ingester.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
```
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")

f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -719,7 +719,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe

if cfg.MatchersCacheMaxItems > 0 {
r := prometheus.NewRegistry()
registerer.MustRegister(newMatchCacheMetrics(r, logger))
registerer.MustRegister(cortex_tsdb.NewMatchCacheMetrics("cortex_ingester", r, logger))
i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r))
if err != nil {
return nil, err
Expand Down
30 changes: 15 additions & 15 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,21 @@ func TestMatcherCache(t *testing.T) {
}

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(fmt.Sprintf(`
# HELP ingester_matchers_cache_evicted_total Total number of items evicted from the cache
# TYPE ingester_matchers_cache_evicted_total counter
ingester_matchers_cache_evicted_total 1
# HELP ingester_matchers_cache_hits_total Total number of cache hits for series matchers
# TYPE ingester_matchers_cache_hits_total counter
ingester_matchers_cache_hits_total %v
# HELP ingester_matchers_cache_items Total number of cached items
# TYPE ingester_matchers_cache_items gauge
ingester_matchers_cache_items %v
# HELP ingester_matchers_cache_max_items Maximum number of items that can be cached
# TYPE ingester_matchers_cache_max_items gauge
ingester_matchers_cache_max_items 50
# HELP ingester_matchers_cache_requests_total Total number of cache requests for series matchers
# TYPE ingester_matchers_cache_requests_total counter
ingester_matchers_cache_requests_total %v
# HELP cortex_ingester_matchers_cache_evicted_total Total number of items evicted from the cache
# TYPE cortex_ingester_matchers_cache_evicted_total counter
cortex_ingester_matchers_cache_evicted_total 1
# HELP cortex_ingester_matchers_cache_hits_total Total number of cache hits for series matchers
# TYPE cortex_ingester_matchers_cache_hits_total counter
cortex_ingester_matchers_cache_hits_total %v
# HELP cortex_ingester_matchers_cache_items Total number of cached items
# TYPE cortex_ingester_matchers_cache_items gauge
cortex_ingester_matchers_cache_items %v
# HELP cortex_ingester_matchers_cache_max_items Maximum number of items that can be cached
# TYPE cortex_ingester_matchers_cache_max_items gauge
cortex_ingester_matchers_cache_max_items 50
# HELP cortex_ingester_matchers_cache_requests_total Total number of cache requests for series matchers
# TYPE cortex_ingester_matchers_cache_requests_total counter
cortex_ingester_matchers_cache_requests_total %v
`, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, cfg.MatchersCacheMaxItems, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total"))
}

Expand Down
70 changes: 0 additions & 70 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ingester

import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -678,71 +676,3 @@ func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Re
func (sm *tsdbMetrics) removeRegistryForUser(userID string) {
sm.regs.RemoveUserRegistry(userID, false)
}

type matcherCacheMetrics struct {
r *prometheus.Registry
logger log.Logger

requestsTotal *prometheus.Desc
hitsTotal *prometheus.Desc
numItems *prometheus.Desc
maxItems *prometheus.Desc
evicted *prometheus.Desc
}

func newMatchCacheMetrics(r *prometheus.Registry, l log.Logger) *matcherCacheMetrics {
m := &matcherCacheMetrics{
r: r,
logger: l,
requestsTotal: prometheus.NewDesc(
"ingester_matchers_cache_requests_total",
"Total number of cache requests for series matchers",
nil, nil),
hitsTotal: prometheus.NewDesc(
"ingester_matchers_cache_hits_total",
"Total number of cache hits for series matchers",
nil, nil),
numItems: prometheus.NewDesc(
"ingester_matchers_cache_items",
"Total number of cached items",
nil, nil),
maxItems: prometheus.NewDesc(
"ingester_matchers_cache_max_items",
"Maximum number of items that can be cached",
nil, nil),
evicted: prometheus.NewDesc(
"ingester_matchers_cache_evicted_total",
"Total number of items evicted from the cache",
nil, nil),
}
return m
}

func (m *matcherCacheMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.requestsTotal
out <- m.hitsTotal
out <- m.numItems
out <- m.maxItems
out <- m.evicted
}

func (m *matcherCacheMetrics) Collect(out chan<- prometheus.Metric) {
gm, err := m.r.Gather()
if err != nil {
level.Warn(m.logger).Log("msg", "failed to gather metrics from registry", "err", err)
return
}

mfm, err := util.NewMetricFamilyMap(gm)

if err != nil {
level.Warn(m.logger).Log("msg", "failed to create metric family map", "err", err)
return
}

out <- prometheus.MustNewConstMetric(m.requestsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_requests_total"))
out <- prometheus.MustNewConstMetric(m.hitsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_hits_total"))
out <- prometheus.MustNewConstMetric(m.numItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_items"))
out <- prometheus.MustNewConstMetric(m.maxItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_max_items"))
out <- prometheus.MustNewConstMetric(m.evicted, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_evicted_total"))
}
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ type BucketStoreConfig struct {
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
Expand Down Expand Up @@ -373,6 +374,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-series-token-factor", 25, "Multiplication factor used for touched series token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
}

// Validate the config.
Expand Down
79 changes: 79 additions & 0 deletions pkg/storage/tsdb/matchers_cache_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package tsdb

import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
)

type MatcherCacheMetrics struct {
r *prometheus.Registry
logger log.Logger

requestsTotal *prometheus.Desc
hitsTotal *prometheus.Desc
numItems *prometheus.Desc
maxItems *prometheus.Desc
evicted *prometheus.Desc
}

func NewMatchCacheMetrics(prefix string, r *prometheus.Registry, l log.Logger) *MatcherCacheMetrics {
m := &MatcherCacheMetrics{
r: r,
logger: l,
requestsTotal: prometheus.NewDesc(
fmt.Sprintf("%v_matchers_cache_requests_total", prefix),
"Total number of cache requests for series matchers",
nil, nil),
hitsTotal: prometheus.NewDesc(
fmt.Sprintf("%v_matchers_cache_hits_total", prefix),
"Total number of cache hits for series matchers",
nil, nil),
numItems: prometheus.NewDesc(
fmt.Sprintf("%v_matchers_cache_items", prefix),
"Total number of cached items",
nil, nil),
maxItems: prometheus.NewDesc(
fmt.Sprintf("%v_matchers_cache_max_items", prefix),
"Maximum number of items that can be cached",
nil, nil),
evicted: prometheus.NewDesc(
fmt.Sprintf("%v_matchers_cache_evicted_total", prefix),
"Total number of items evicted from the cache",
nil, nil),
}
return m
}

func (m *MatcherCacheMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.requestsTotal
out <- m.hitsTotal
out <- m.numItems
out <- m.maxItems
out <- m.evicted
}

func (m *MatcherCacheMetrics) Collect(out chan<- prometheus.Metric) {
gm, err := m.r.Gather()
if err != nil {
level.Warn(m.logger).Log("msg", "failed to gather metrics from registry", "err", err)
return
}

mfm, err := util.NewMetricFamilyMap(gm)

if err != nil {
level.Warn(m.logger).Log("msg", "failed to create metric family map", "err", err)
return
}

out <- prometheus.MustNewConstMetric(m.requestsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_requests_total"))
out <- prometheus.MustNewConstMetric(m.hitsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_hits_total"))
out <- prometheus.MustNewConstMetric(m.numItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_items"))
out <- prometheus.MustNewConstMetric(m.maxItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_max_items"))
out <- prometheus.MustNewConstMetric(m.evicted, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_evicted_total"))
}
16 changes: 16 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type BucketStores struct {
// Index cache shared across all tenants.
indexCache storecache.IndexCache

// Matchers cache shared across all tenants
matcherCache storecache.MatchersCache

// Chunks bytes pool shared across all tenants.
chunksPool pool.Pool[byte]

Expand Down Expand Up @@ -140,6 +143,17 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
}),
}

u.matcherCache = storecache.NoopMatchersCache

if cfg.BucketStore.MatchersCacheMaxItems > 0 {
r := prometheus.NewRegistry()
reg.MustRegister(tsdb.NewMatchCacheMetrics("cortex_storegateway", r, logger))
u.matcherCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.BucketStore.MatchersCacheMaxItems), storecache.WithPromRegistry(r))
if err != nil {
return nil, err
}
}

// Init the index cache.
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore.IndexCache, logger, reg); err != nil {
return nil, errors.Wrap(err, "create index cache")
Expand Down Expand Up @@ -600,8 +614,10 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
}

bucketStoreReg := prometheus.NewRegistry()

bucketStoreOpts := []store.BucketStoreOption{
store.WithLogger(userLogger),
store.WithMatchersCache(u.matcherCache),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
return util_log.HeadersFromContext(ctx, logger)
}),
Expand Down
23 changes: 0 additions & 23 deletions pkg/util/matchers.go

This file was deleted.

0 comments on commit 2e1700e

Please sign in to comment.