From 275a5bf4f5bf8e053b52cdab54c819e88009dc28 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 25 Oct 2024 14:43:46 +0900 Subject: [PATCH] Disable chunk trimming in ingester (#6270) --- CHANGELOG.md | 1 + integration/query_fuzz_test.go | 160 ++++++++++++++++++++++++++++++++- pkg/ingester/ingester.go | 7 +- 3 files changed, 166 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30515f6472..524d7690e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280 * [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228 +* [ENHANCEMENT] Ingester: Disable chunk trimming. #6270 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 292f94421d..68dd2163ba 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -5,6 +5,7 @@ package integration import ( "context" + "fmt" "math/rand" "os" "path" @@ -35,6 +36,164 @@ import ( "github.com/cortexproject/cortex/pkg/util/log" ) +func TestDisableChunkTrimmingFuzz(t *testing.T) { + noneChunkTrimmingImage := "quay.io/cortexproject/cortex:v1.18.0" + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul1 := e2edb.NewConsulWithName("consul1") + consul2 := e2edb.NewConsulWithName("consul2") + require.NoError(t, s.StartAndWaitReady(consul1, consul2)) + + flags1 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul1.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + flags2 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul2.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path1 := path.Join(s.SharedDir(), "cortex-1") + path2 := path.Join(s.SharedDir(), "cortex-2") + + flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1}) + flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2}) + // Start Cortex replicas. + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "") + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, noneChunkTrimmingImage) + require.NoError(t, s.StartAndWaitReady(cortex1, cortex2)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + // Push some series to Cortex. + start := now.Add(-time.Minute * 120) + scrapeInterval := 30 * time.Second + + numSeries := 10 + numSamples := 240 + serieses := make([]prompb.TimeSeries, numSeries) + lbls := make([]labels.Labels, numSeries) + for i := 0; i < numSeries; i++ { + series := e2e.GenerateSeriesWithSamples(fmt.Sprintf("test_series_%d", i), start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "foo", Value: "bar"}) + serieses[i] = series + + builder := labels.NewBuilder(labels.EmptyLabels()) + for _, lbl := range series.Labels { + builder.Set(lbl.Name, lbl.Value) + } + lbls[i] = builder.Labels() + } + + res, err := c1.Push(serieses) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c2.Push(serieses) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + rnd := rand.New(rand.NewSource(now.Unix())) + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + type testCase struct { + query string + res1, res2 model.Value + err1, err2 error + } + + queryStart := time.Now().Add(-time.Minute * 40) + queryEnd := time.Now().Add(-time.Minute * 20) + cases := make([]*testCase, 0, 200) + testRun := 500 + for i := 0; i < testRun; i++ { + expr := ps.WalkRangeQuery() + query := expr.Pretty(0) + res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval) + res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval) + cases = append(cases, &testCase{ + query: query, + res1: res1, + res2: res2, + err1: err1, + err2: err2, + }) + } + + failures := 0 + for i, tc := range cases { + qt := "range query" + if tc.err1 != nil || tc.err2 != nil { + if !cmp.Equal(tc.err1, tc.err2) { + t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2) + failures++ + } + } else if !cmp.Equal(tc.res1, tc.res2, comparer) { + t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + failures++ + } + } + if failures > 0 { + require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures) + } +} + func TestVerticalShardingFuzz(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -159,7 +318,6 @@ func TestVerticalShardingFuzz(t *testing.T) { instantQuery bool } - now = time.Now() cases := make([]*testCase, 0, 200) for i := 0; i < 100; i++ { expr := ps.WalkInstantQuery() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7093138802..2a3187a282 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1982,8 +1982,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th if err != nil { return 0, 0, 0, err } + hints := &storage.SelectHints{ + Start: from, + End: through, + DisableTrimming: true, + } // It's not required to return sorted series because series are sorted by the Cortex querier. - ss := q.Select(ctx, false, nil, matchers...) + ss := q.Select(ctx, false, hints, matchers...) c() if ss.Err() != nil { return 0, 0, 0, ss.Err()