Skip to content

Commit

Permalink
Add push writen header to response
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 4, 2024
1 parent ab2a6ea commit a417e1f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 9 deletions.
11 changes: 11 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integration

import (
"math/rand"
"net/http"
"path"
"testing"
"time"
Expand Down Expand Up @@ -82,6 +83,7 @@ func TestIngest(t *testing.T) {
res, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "0")

// sample
result, err := c.Query("test_series", now)
Expand All @@ -99,11 +101,13 @@ func TestIngest(t *testing.T) {
res, err = c.PushV2(symbols2, histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "0")

symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
res, err = c.PushV2(symbols3, histogramFloatSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "0")

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
Expand Down Expand Up @@ -192,6 +196,7 @@ func TestExemplar(t *testing.T) {
res, err := c.PushV2(req.Symbols, req.Timeseries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "1")

start := time.Now().Add(-time.Minute)
end := now.Add(time.Minute)
Expand All @@ -200,3 +205,9 @@ func TestExemplar(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(exemplars))
}

func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
}
30 changes: 21 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
return seriesKeys, validatedTimeseries, int64(validatedMetadata), int64(validatedFloatSamples), int64(validatedHistogramSamples), int64(validatedExemplars), firstPartialErr, nil
}

func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.TimeSeries, userID string) error {
func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.TimeSeries, userID string, stats *WriteStats) error {
span, _ := opentracing.StartSpanFromContext(ctx, "doBatchV2")
defer span.Finish()

Expand Down Expand Up @@ -736,13 +736,13 @@ func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteReques
timeseries = append(timeseries, validatedTimeseries[i])
}

return d.sendV2(localCtx, req.Symbols, ingester, timeseries, req.Source)
return d.sendV2(localCtx, req.Symbols, ingester, timeseries, req.Source, stats)
}, func() {
cancel()
})
}

func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.TimeSeries, source cortexpbv2.WriteRequest_SourceEnum) error {
func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.TimeSeries, source cortexpbv2.WriteRequest_SourceEnum, stats *WriteStats) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -760,7 +760,7 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
req.Timeseries = timeseries
req.Source = source

_, err = c.PushV2(ctx, &req)
resp, err := c.PushV2(ctx, &req)

if len(timeseries) > 0 {
d.ingesterAppends.WithLabelValues(id, typeSamples).Inc()
Expand All @@ -774,6 +774,13 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
}
}

if resp != nil {
// track stats
stats.SetSamples(resp.Samples)
stats.SetHistograms(resp.Histograms)
stats.SetExemplars(resp.Exemplars)
}

return err
}

Expand Down Expand Up @@ -885,7 +892,6 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
fmt.Println("V2 d.ingestionRate.Rate()", d.ingestionRate.Rate())
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
Expand Down Expand Up @@ -935,13 +941,20 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)

keys := seriesKeys

err = d.doBatchV2(ctx, req, subRing, keys, validatedTimeseries, userID)
s := WriteStats{}

err = d.doBatchV2(ctx, req, subRing, keys, validatedTimeseries, userID, &s)
if err != nil {
return nil, err
}

// TODO(Sungjin1212) track stat
return &cortexpbv2.WriteResponse{}, firstPartialErr
resp := &cortexpbv2.WriteResponse{
Samples: s.LoadSamples(),
Histograms: s.LoadHistogram(),
Exemplars: s.LoadExemplars(),
}

return resp, firstPartialErr
}

// Push implements client.IngesterServer
Expand Down Expand Up @@ -981,7 +994,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
fmt.Println("V1 d.ingestionRate.Rate()", d.ingestionRate.Rate())
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
Expand Down
62 changes: 62 additions & 0 deletions pkg/distributor/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package distributor

import (
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
)

type WriteStats struct {
// Samples represents X-Prometheus-Remote-Write-Written-Samples
Samples int64
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
Histograms int64
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
Exemplars int64
}

func (w *WriteStats) SetSamples(samples int64) {
if w == nil {
return
}

atomic.StoreInt64(&w.Samples, samples)
}

func (w *WriteStats) SetHistograms(histograms int64) {
if w == nil {
return
}

atomic.StoreInt64(&w.Histograms, histograms)
}

func (w *WriteStats) SetExemplars(exemplars int64) {
if w == nil {
return
}

atomic.StoreInt64(&w.Exemplars, exemplars)
}

func (w *WriteStats) LoadSamples() int64 {
if w == nil {
return 0
}

return atomic.LoadInt64(&w.Samples)
}

func (w *WriteStats) LoadHistogram() int64 {
if w == nil {
return 0
}

return atomic.LoadInt64(&w.Histograms)
}

func (w *WriteStats) LoadExemplars() int64 {
if w == nil {
return 0
}

return atomic.LoadInt64(&w.Exemplars)
}

0 comments on commit a417e1f

Please sign in to comment.