Skip to content

Commit

Permalink
Transition metric writers away from accepting endpoint arg
Browse files Browse the repository at this point in the history
The writers we have used have de-facto only written to one endpoint.
Some implementations such as the storage writer only ever supported one
endpoint and ignored the endpoint argument provided to the Write method.
This split was confusing and error prone.

Future work involves adding alternate forwarders which will use
potentially different protocols than our normal collector to ingestor
flow. Instead of metric collectors having one instance of a writer and
multiple endpoints configured, these metric writers now have a
collection of writers to send their data to.
  • Loading branch information
Mike Keesey authored and mkeesey committed Jan 7, 2025
1 parent b20f915 commit 035217c
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 70 deletions.
23 changes: 11 additions & 12 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,20 @@ func realMain(ctx *cli.Context) error {

cfg.ReplaceVariable("$(HOSTNAME)", hostname)

var endpoints []string
var endpoint string
if cfg.Endpoint != "" {
for _, endpoint := range []string{cfg.Endpoint} {
u, err := url.Parse(endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err)
}
endpoint = cfg.Endpoint

if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint %s must be http or https", endpoint)
}
u, err := url.Parse(cfg.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err)
}

logger.Infof("Using remote write endpoint %s", endpoint)
endpoints = append(endpoints, endpoint)
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint %s must be http or https", endpoint)
}

logger.Infof("Using remote write endpoint %s", endpoint)
}

if cfg.StorageDir == "" {
Expand Down Expand Up @@ -298,7 +297,7 @@ func realMain(ctx *cli.Context) error {
Scraper: scraperOpts,
ListenAddr: cfg.ListenAddr,
NodeName: hostname,
Endpoints: endpoints,
Endpoint: endpoint,
LiftLabels: sortedLiftedLabels,
AddAttributes: addAttributes,
LiftAttributes: liftAttributes,
Expand Down
5 changes: 3 additions & 2 deletions collector/otlp/logs_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
type LogsProxyServiceOpts struct {
AddAttributes map[string]string
LiftAttributes []string
Endpoints []string
Endpoint string
InsecureSkipVerify bool
}

Expand Down Expand Up @@ -60,7 +60,8 @@ func NewLogsProxyService(opts LogsProxyServiceOpts) *LogsProxyService {
}

rpcClients := make(map[string]logsv1connect.LogsServiceClient)
for _, endpoint := range opts.Endpoints {
if opts.Endpoint != "" {
endpoint := opts.Endpoint
// We have to strip the path component from our endpoint so gRPC can correctly setup its routing
uri, err := url.Parse(endpoint)
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions collector/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,20 @@ type OltpMetricWriterOpts struct {
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int

Endpoints []string

Client remote.RemoteWriteClient
Clients []remote.RemoteWriteClient
}

type OltpMetricWriter struct {
requestTransformer *transform.RequestTransformer
endpoints []string
remoteClient remote.RemoteWriteClient
remoteClients []remote.RemoteWriteClient
maxBatchSize int
disableMetricsForwarding bool
}

func NewOltpMetricWriter(opts OltpMetricWriterOpts) *OltpMetricWriter {
return &OltpMetricWriter{
requestTransformer: opts.RequestTransformer,
endpoints: opts.Endpoints,
remoteClient: opts.Client,
remoteClients: opts.Clients,
maxBatchSize: opts.MaxBatchSize,
disableMetricsForwarding: opts.DisableMetricsForwarding,
}
Expand Down Expand Up @@ -187,7 +183,7 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques
return nil
}

if len(t.endpoints) == 0 || logger.IsDebug() {
if len(t.remoteClients) == 0 || logger.IsDebug() {
var sb strings.Builder
for _, ts := range wr.Timeseries {
sb.Reset()
Expand All @@ -212,14 +208,14 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques

start := time.Now()
defer func() {
logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.endpoints), time.Since(start))
logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.remoteClients), time.Since(start))
}()

g, gCtx := errgroup.WithContext(ctx)
for _, endpoint := range t.endpoints {
endpoint := endpoint
for _, remoteClient := range t.remoteClients {
remoteClient := remoteClient
g.Go(func() error {
return t.remoteClient.Write(gCtx, endpoint, wr)
return remoteClient.Write(gCtx, wr)
})
}
if err := g.Wait(); err != nil {
Expand Down
21 changes: 10 additions & 11 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ type ScraperOpts struct {
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int

Endpoints []string

RemoteClient remote.RemoteWriteClient
RemoteClients []remote.RemoteWriteClient
}

func (s *ScraperOpts) RequestTransformer() *transform.RequestTransformer {
Expand Down Expand Up @@ -110,7 +108,7 @@ type Scraper struct {
informerRegistration cache.ResourceEventHandlerRegistration

requestTransformer *transform.RequestTransformer
remoteClient remote.RemoteWriteClient
remoteClients []remote.RemoteWriteClient
scrapeClient *MetricsClient
seriesCreator *seriesCreator

Expand All @@ -127,7 +125,7 @@ func NewScraper(opts *ScraperOpts) *Scraper {
opts: *opts,
seriesCreator: &seriesCreator{},
requestTransformer: opts.RequestTransformer(),
remoteClient: opts.RemoteClient,
remoteClients: opts.RemoteClients,
targets: make(map[string]ScrapeTarget),
}
}
Expand Down Expand Up @@ -201,7 +199,9 @@ func (s *Scraper) scrape(ctx context.Context) {
case <-t.C:
s.scrapeTargets(ctx)
case <-reconnectTimer.C:
s.remoteClient.CloseIdleConnections()
for _, remoteClient := range s.remoteClients {
remoteClient.CloseIdleConnections()
}
}
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error
return nil
}

if len(s.opts.Endpoints) == 0 || logger.IsDebug() {
if len(s.remoteClients) == 0 || logger.IsDebug() {
var sb strings.Builder
for _, ts := range wr.Timeseries {
sb.Reset()
Expand All @@ -325,14 +325,13 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error

start := time.Now()
defer func() {
logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.opts.Endpoints), time.Since(start))
logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.remoteClients), time.Since(start))
}()

g, gCtx := errgroup.WithContext(ctx)
for _, endpoint := range s.opts.Endpoints {
endpoint := endpoint
for _, remoteClient := range s.remoteClients {
g.Go(func() error {
return s.remoteClient.Write(gCtx, endpoint, wr)
return remoteClient.Write(gCtx, wr)
})
}
return g.Wait()
Expand Down
16 changes: 7 additions & 9 deletions collector/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -29,15 +30,13 @@ func TestScraper_sendBatch(t *testing.T) {
name: "TestEmptyWriteRequest",
writeRequest: &prompb.WriteRequest{},
opts: &ScraperOpts{
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 0},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}},
},
},
{
name: "TestValidWriteRequest",
opts: &ScraperOpts{
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 1},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 1}},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Expand All @@ -56,8 +55,7 @@ func TestScraper_sendBatch(t *testing.T) {
name: "TestDefaultDropMetrics",
opts: &ScraperOpts{
DefaultDropMetrics: true,
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 0},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Expand All @@ -80,8 +78,8 @@ func TestScraper_sendBatch(t *testing.T) {
wr := s.flushBatchIfNecessary(context.Background(), tt.writeRequest)
err := s.sendBatch(context.Background(), wr)
require.NoError(t, err)
if tt.opts.RemoteClient.(*fakeClient).expectedSamples > 0 {
require.True(t, tt.opts.RemoteClient.(*fakeClient).called)
if tt.opts.RemoteClients[0].(*fakeClient).expectedSamples > 0 {
require.True(t, tt.opts.RemoteClients[0].(*fakeClient).called)
}
})
}
Expand Down Expand Up @@ -143,7 +141,7 @@ type fakeClient struct {
called bool
}

func (f *fakeClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (f *fakeClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
f.called = true
if len(wr.Timeseries) != f.expectedSamples {
return fmt.Errorf("expected %d samples, got %d", f.expectedSamples, len(wr.Timeseries))
Expand Down
17 changes: 8 additions & 9 deletions collector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Azure/adx-mon/pkg/http"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/storage"
"github.com/Azure/adx-mon/transform"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Service struct {
type ServiceOpts struct {
ListenAddr string
NodeName string
Endpoints []string
Endpoint string

// LogCollectionHandlers is the list of log collection handlers
LogCollectionHandlers []LogCollectorOpts
Expand Down Expand Up @@ -197,7 +198,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
logsProxySvc := otlp.NewLogsProxyService(otlp.LogsProxyServiceOpts{
LiftAttributes: opts.LiftAttributes,
AddAttributes: opts.AddAttributes,
Endpoints: opts.Endpoints,
Endpoint: opts.Endpoint,
InsecureSkipVerify: opts.InsecureSkipVerify,
})

Expand All @@ -220,8 +221,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
for _, handlerOpts := range opts.OtlpMetricsHandlers {
writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
Client: &StoreRemoteClient{store},
Endpoints: opts.Endpoints,
Clients: []remote.RemoteWriteClient{&StoreRemoteClient{store}},
MaxBatchSize: opts.MaxBatchSize,
DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
})
Expand Down Expand Up @@ -249,11 +249,11 @@ func NewService(opts *ServiceOpts) (*Service, error) {
transferQueue chan *cluster.Batch
partitioner cluster.MetricPartitioner
)
if len(opts.Endpoints) > 0 {
if opts.Endpoint != "" {
// This is a static partitioner that forces all entries to be assigned to the remote endpoint.
partitioner = remotePartitioner{
host: "remote",
addr: opts.Endpoints[0],
addr: opts.Endpoint,
}

r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Expand Down Expand Up @@ -293,8 +293,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
var scraper *Scraper
if opts.Scraper != nil {
scraperOpts := opts.Scraper
scraperOpts.RemoteClient = &StoreRemoteClient{store}
scraperOpts.Endpoints = opts.Endpoints
scraperOpts.RemoteClients = []remote.RemoteWriteClient{&StoreRemoteClient{store}}

scraper = NewScraper(opts.Scraper)
}
Expand Down Expand Up @@ -524,7 +523,7 @@ type StoreRemoteClient struct {
store storage.Store
}

func (s *StoreRemoteClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (s *StoreRemoteClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return s.store.WriteTimeSeries(ctx, wr.Timeseries)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/promremote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// Client is a client for the prometheus remote write API. It is safe to be shared between goroutines.
type Client struct {
httpClient *http.Client
endpoint string
opts ClientOpts
}

Expand Down Expand Up @@ -64,6 +65,9 @@ type ClientOpts struct {

// DisableKeepAlives controls whether the client disables HTTP keep-alives.
DisableKeepAlives bool

// Endpoint for writing to the prometheus remote write API.
Endpoint string
}

func (c ClientOpts) WithDefaults() ClientOpts {
Expand Down Expand Up @@ -112,6 +116,7 @@ func NewClient(opts ClientOpts) (*Client, error) {

return &Client{
httpClient: httpClient,
endpoint: opts.Endpoint,
opts: opts,
}, nil
}
Expand All @@ -131,7 +136,7 @@ func (c *Client) Write(ctx context.Context, endpoint string, wr *prompb.WriteReq
encoded := snappy.Encode(b1[:0], b)
body := bytes.NewReader(encoded)

req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", endpoint), body)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", c.endpoint), body)
if err != nil {
return fmt.Errorf("new request: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/promremote/promremote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"time"

"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/stretchr/testify/require"
)

func TestSendBatchWithValidData(t *testing.T) {
client := &MockClient{}
proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 10, false)
proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 10, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -41,7 +42,7 @@ func TestSendBatchWithValidData(t *testing.T) {

func TestSendBatchWithEmptyBatch(t *testing.T) {
client := &MockClient{}
proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 1, false)
proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 1, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -76,6 +77,6 @@ type MockClient struct{}

func (m *MockClient) CloseIdleConnections() {}

func (m *MockClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (m *MockClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return nil
}
Loading

0 comments on commit 035217c

Please sign in to comment.