Skip to content

Commit

Permalink
Add remote write v2 api
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 7, 2024
1 parent 6865ff8 commit 3475726
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 43 deletions.
25 changes: 20 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/compactor"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
Expand All @@ -45,6 +46,9 @@ import (

// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface.
type DistributorPushWrapper func(next push.Func) push.Func

// DistributorPushWrapperV2 wraps around a push. It is similar to middleware.Interface.
type DistributorPushWrapperV2 func(next push.FuncV2) push.FuncV2
type ConfigHandler func(actualCfg interface{}, defaultCfg interface{}) http.HandlerFunc

type Config struct {
Expand All @@ -60,7 +64,8 @@ type Config struct {

// This allows downstream projects to wrap the distributor push function
// and access the deserialized write requests before/after they are pushed.
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`
DistributorPushWrapperV2 DistributorPushWrapperV2 `yaml:"-"`

// The CustomConfigHandler allows for providing a different handler for the
// `/config` endpoint. If this field is set _before_ the API module is
Expand Down Expand Up @@ -107,6 +112,15 @@ func (cfg *Config) Validate() error {
return nil
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPushV2(d *distributor.Distributor) push.FuncV2 {
if cfg.DistributorPushWrapperV2 != nil {
return cfg.DistributorPushWrapperV2(d.PushV2)
}

return d.PushV2
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
if cfg.DistributorPushWrapper != nil {
Expand Down Expand Up @@ -277,7 +291,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), a.cfg.wrapDistributorPushV2(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
Expand All @@ -289,7 +303,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), a.cfg.wrapDistributorPushV2(d)), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand All @@ -304,6 +318,7 @@ type Ingester interface {
AllUserStatsHandler(http.ResponseWriter, *http.Request)
ModeHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
PushV2(context.Context, *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)
}

// RegisterIngester registers the ingesters HTTP and GRPC service
Expand All @@ -322,12 +337,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, i.PushV2), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, i.PushV2), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring/client"
Expand All @@ -33,6 +34,7 @@ import (
// Pusher is an ingester server that accepts pushes.
type Pusher interface {
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)
}

type PusherAppender struct {
Expand Down
14 changes: 11 additions & 3 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,22 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type fakePusher struct {
request *cortexpb.WriteRequest
response *cortexpb.WriteResponse
err error
request *cortexpb.WriteRequest
requestV2 *cortexpbv2.WriteRequest
response *cortexpb.WriteResponse
responseV2 *cortexpbv2.WriteResponse
err error
}

func (p *fakePusher) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
p.requestV2 = r
return p.responseV2, p.err
}

func (p *fakePusher) Push(ctx context.Context, r *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ruler/pusher_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/mock"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
)

type pusherMock struct {
Expand All @@ -16,6 +17,11 @@ func newPusherMock() *pusherMock {
return &pusherMock{}
}

func (m *pusherMock) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1)
}

func (m *pusherMock) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*cortexpb.WriteResponse), args.Error(1)
Expand Down
148 changes: 131 additions & 17 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,44 @@ package push

import (
"context"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/remote"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
)

const (
remoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
remoteWriteVersion1HeaderValue = "0.1.0"
remoteWriteVersion20HeaderValue = "2.0.0"
appProtoContentType = "application/x-protobuf"
appProtoV1ContentType = "application/x-protobuf;proto=prometheus.WriteRequest"
appProtoV2ContentType = "application/x-protobuf;proto=io.prometheus.write.v2.Request"

rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
)

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// FuncV2 defines the type of the pushV2. It is similar to http.HandlerFunc.
type FuncV2 func(ctx context.Context, request *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushV2 FuncV2) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand All @@ -28,31 +50,123 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
logger = log.WithSourceIPs(source, logger)
}
}
var req cortexpb.PreallocWriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)

contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = appProtoContentType
}

msgType, err := parseProtoMsg(contentType)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

req.SkipLabelNameValidation = false
if req.Source == 0 {
req.Source = cortexpb.API
if msgType != config.RemoteWriteProtoMsgV1 && msgType != config.RemoteWriteProtoMsgV2 {
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

enc := r.Header.Get("Content-Encoding")
if enc == "" {
} else if enc != string(remote.SnappyBlockCompression) {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, remote.SnappyBlockCompression)
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

if _, err := push(ctx, &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
switch msgType {
case config.RemoteWriteProtoMsgV1:
var req cortexpb.PreallocWriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

req.SkipLabelNameValidation = false
if req.Source == 0 {
req.Source = cortexpb.API
}

if _, err := push(ctx, &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode()/100 == 5 {
level.Error(logger).Log("msg", "push error", "err", err)
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
level.Warn(logger).Log("msg", "push refused", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
case config.RemoteWriteProtoMsgV2:
var req cortexpbv2.WriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
fmt.Println("err", err)
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if resp.GetCode()/100 == 5 {
level.Error(logger).Log("msg", "push error", "err", err)
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
level.Warn(logger).Log("msg", "push refused", "err", err)

req.SkipLabelNameValidation = false
if req.Source == 0 {
req.Source = cortexpbv2.API
}

if resp, err := pushV2(ctx, &req); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
w.Header().Set(rw20WrittenSamplesHeader, "0")
w.Header().Set(rw20WrittenHistogramsHeader, "0")
w.Header().Set(rw20WrittenExemplarsHeader, "0")
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode()/100 == 5 {
level.Error(logger).Log("msg", "push error", "err", err)
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
level.Warn(logger).Log("msg", "push refused", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
} else {
w.Header().Set(rw20WrittenSamplesHeader, strconv.FormatInt(resp.Samples, 10))
w.Header().Set(rw20WrittenHistogramsHeader, strconv.FormatInt(resp.Histograms, 10))
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(resp.Exemplars, 10))
}
http.Error(w, string(resp.Body), int(resp.Code))
}
})
}

// Refer to parseProtoMsg in https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
func parseProtoMsg(contentType string) (config.RemoteWriteProtoMsg, error) {
contentType = strings.TrimSpace(contentType)

parts := strings.Split(contentType, ";")
if parts[0] != appProtoContentType {
return "", fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType)
}
// Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter
for _, p := range parts[1:] {
pair := strings.Split(p, "=")
if len(pair) != 2 {
return "", fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType)
}
if pair[0] == "proto" {
ret := config.RemoteWriteProtoMsg(pair[1])
if err := ret.Validate(); err != nil {
return "", fmt.Errorf("got %v content type; %w", contentType, err)
}
return ret, nil
}
}
// No "proto=" parameter, assuming v1.
return config.RemoteWriteProtoMsgV1, nil
}
Loading

0 comments on commit 3475726

Please sign in to comment.