Skip to content

Commit

Permalink
Support remote write v2
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 4, 2024
1 parent 41d171d commit ab2a6ea
Show file tree
Hide file tree
Showing 30 changed files with 10,652 additions and 435 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))

# Manually declared dependencies And what goes into each exe
pkg/cortexpb/cortex.pb.go: pkg/cortexpb/cortex.proto
pkg/cortexpbv2/cortexv2.pb.go: pkg/cortexpbv2/cortexv2.proto
pkg/ingester/client/ingester.pb.go: pkg/ingester/client/ingester.proto
pkg/distributor/distributorpb/distributor.pb.go: pkg/distributor/distributorpb/distributor.proto
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
Expand Down
123 changes: 123 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)

Expand Down Expand Up @@ -149,6 +151,46 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
return
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

labelRefs := []uint32{1, 2}
symbols = []string{"", "__name__", name}

for i, label := range additionalLabels {
symbols = append(symbols, label.Name, label.Value)
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
}

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph cortexpbv2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h)
}

// Generate the series
series = append(series, cortexpbv2.TimeSeries{
LabelsRefs: labelRefs,
Histograms: []cortexpbv2.Histogram{ph},
})

return
}

func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

Expand Down Expand Up @@ -188,6 +230,87 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
st.Symbolize("__name__")
st.Symbolize(name)

lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
st.Symbolize(label.Name)
st.Symbolize(label.Value)
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
symbols = st.Symbols()
labelRefs := cortexpbv2.GetLabelsRefsFromLabels(symbols, lbs)
// Generate the series
series = append(series, cortexpbv2.TimeSeries{
LabelsRefs: labelRefs,
Samples: []cortexpbv2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: cortexpbv2.Metadata{
Type: cortexpbv2.METRIC_TYPE_GAUGE,
},
})

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}

func GenerateSeriesV2WithSamples(
name string,
startTime time.Time,
scrapeInterval time.Duration,
startValue int, numSamples int,
additionalLabels ...prompb.Label,
) (symbols []string, series cortexpbv2.TimeSeries) {
tsMillis := TimeToMilliseconds(startTime)
durMillis := scrapeInterval.Milliseconds()

symbols = []string{"", "__name__", name}
labelRefs := []uint32{1, 2}

for i, label := range additionalLabels {
symbols = append(symbols, label.Name, label.Value)
labelRefs = append(labelRefs, uint32(i*2+3), uint32(i*2+4))
}

startTMillis := tsMillis
samples := make([]cortexpbv2.Sample, numSamples)
for i := 0; i < numSamples; i++ {
samples[i] = cortexpbv2.Sample{
Timestamp: startTMillis,
Value: float64(i + startValue),
}
startTMillis += durMillis
}

return symbols, cortexpbv2.TimeSeries{
LabelsRefs: labelRefs,
Samples: samples,
}
}

func GenerateSeriesWithSamples(
name string,
startTime time.Time,
Expand Down
42 changes: 40 additions & 2 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
yaml "gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
)
Expand Down Expand Up @@ -113,6 +113,39 @@ func NewPromQueryClient(address string) (*Client, error) {
return c, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
// Create write request
Expand Down Expand Up @@ -334,6 +367,11 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
Loading

0 comments on commit ab2a6ea

Please sign in to comment.