Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint: DO NOT MERGE experiment with duration as a hist #1377

Closed
wants to merge 8 commits into from
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Refinery Changelog

## 2.8.3 2024-10-08

### Changes

- fix: Only set incoming user agent if not already present (#1366) | [Mike Goldsmith](https://github.com/MikeGoldsmith)
- fix: Put a limit on the size of sampler keys (#1364) | [Kent Quirk](https://github.com/kentquirk)
- feat: Record original user agent for spans and logs (#1358) | [Mike Goldsmith](https://github.com/MikeGoldsmith)
- feat: Make collector health check timeout configurable (#1371) | [Yingrong Zhao](https://github.com/vinozzZ)
- fix: ConvertNumeric now handles bools (#1336) | [Kent Quirk](https://github.com/kentquirk)

## 2.8.2 2024-09-13

Expand Down
8 changes: 8 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

While [CHANGELOG.md](./CHANGELOG.md) contains detailed documentation and links to all the source code changes in a given release, this document is intended to be aimed at a more comprehensible version of the contents of the release from the point of view of users of Refinery.

## Version 2.8.3

This is a bug fix release and includes the follow changes+~+:
* Fixes a bug when building sampler key using a very high cardinality field. The fix is to only take the first 100 unique field values.
* Fixes a bug so the `is_alive` and `is_ready` metrics now report either 0 or 1 correctly.

It also adds support for Refinery to record the original sender user agent in event data under the key `meta.refinery.incoming_user_agent` adds a configurtion option for the health check timeout.

## Version 2.8.2

This is a bug fix release.
Expand Down
6 changes: 6 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func newStartedApp(
}

func post(t testing.TB, req *http.Request) {
req.Header.Set("User-Agent", "Test-Client")
resp, err := httpClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
Expand Down Expand Up @@ -392,6 +393,7 @@ func TestPeerRouting(t *testing.T) {
"field10": float64(10),
"long": "this is a test of the emergency broadcast system",
"meta.refinery.original_sample_rate": uint(2),
"meta.refinery.incoming_user_agent": "Test-Client",
"foo": "bar",
},
Metadata: map[string]any{
Expand Down Expand Up @@ -517,6 +519,7 @@ func TestEventsEndpoint(t *testing.T) {
"trace.trace_id": "1",
"foo": "bar",
"meta.refinery.original_sample_rate": uint(10),
"meta.refinery.incoming_user_agent": "Test-Client",
},
Metadata: map[string]any{
"api_host": "http://api.honeycomb.io",
Expand Down Expand Up @@ -566,6 +569,7 @@ func TestEventsEndpoint(t *testing.T) {
"trace.trace_id": "1",
"foo": "bar",
"meta.refinery.original_sample_rate": uint(10),
"meta.refinery.incoming_user_agent": "Test-Client",
},
Metadata: map[string]any{
"api_host": "http://api.honeycomb.io",
Expand Down Expand Up @@ -639,6 +643,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
"trace.trace_id": traceID,
"foo": "bar",
"meta.refinery.original_sample_rate": uint(10),
"meta.refinery.incoming_user_agent": "Test-Client",
},
Metadata: map[string]any{
"api_host": "http://api.honeycomb.io",
Expand Down Expand Up @@ -688,6 +693,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
"trace.trace_id": traceID,
"foo": "bar",
"meta.refinery.original_sample_rate": uint(10),
"meta.refinery.incoming_user_agent": "Test-Client",
},
Metadata: map[string]any{
"api_host": "http://api.honeycomb.io",
Expand Down
8 changes: 5 additions & 3 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (i *InMemCollector) Start() error {
// listen for config reloads
i.Config.RegisterReloadCallback(i.sendReloadSignal)

i.Health.Register(CollectorHealthKey, 3*time.Second)
i.Health.Register(CollectorHealthKey, time.Duration(imcConfig.HealthCheckTimeout))

i.Metrics.Register("trace_duration_ms", "histogram")
i.Metrics.Register("trace_span_count", "histogram")
Expand Down Expand Up @@ -330,6 +330,8 @@ func (i *InMemCollector) collect() {
defer i.mutex.Unlock()

for {
startTime := time.Now()

i.Health.Ready(CollectorHealthKey, true)
// record channel lengths as histogram but also as gauges
i.Metrics.Histogram("collector_incoming_queue", float64(len(i.incoming)))
Expand Down Expand Up @@ -376,18 +378,18 @@ func (i *InMemCollector) collect() {
return
}
i.processSpan(sp)
continue
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
return
}
i.processSpan(sp)
continue
case <-i.reload:
i.reloadConfigs()
}
}

i.Metrics.Histogram("collector_collect_loop_duration_ms", float64(time.Now().Sub(startTime).Milliseconds()))
}
}

Expand Down
1 change: 1 addition & 0 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ type CollectionConfig struct {
PeerQueueSize int `yaml:"PeerQueueSize"`
IncomingQueueSize int `yaml:"IncomingQueueSize"`
AvailableMemory MemorySize `yaml:"AvailableMemory" cmdenv:"AvailableMemory"`
HealthCheckTimeout Duration `yaml:"HealthCheckTimeout" default:"3s"`
MaxMemoryPercentage int `yaml:"MaxMemoryPercentage" default:"75"`
MaxAlloc MemorySize `yaml:"MaxAlloc"`
DisableRedistribution bool `yaml:"DisableRedistribution"`
Expand Down
10 changes: 10 additions & 0 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,16 @@ groups:
This value should be set to a bit less than the normal timeout period
for shutting down without forcibly terminating the process.

- name: HealthCheckTimeout
type: duration
valuetype: nondefault
firstversion: v2.8
default: 3s
reload: false
summary: Controls the maximum duration allowed for collection health checks to complete.
description: >
The `HealthCheckTimeout` setting specifies the maximum duration allowed for the health checks of the collection subsystems to complete. If a subsystem does not respond within this timeout period, it will be marked as unhealthy. This timeout value should be set carefully to ensure that transient delays do not lead to unnecessary failure detection while still allowing for timely identification of actual health issues.

- name: BufferSizes
title: "Buffer Sizes"
description: >
Expand Down
7 changes: 5 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ func ConvertNumeric(val interface{}) float64 {
return n
case float32:
return float64(n)
default:
return 0
case bool:
if n {
return 1
}
}
return 0
}

func PrefixMetricName(prefix string, name string) string {
Expand Down
34 changes: 34 additions & 0 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
package metrics

import "testing"

func TestConvertNumeric(t *testing.T) {
tests := []struct {
name string
val interface{}
want float64
}{
// TODO: Add test cases.
{"int", 17, 17},
{"uint", uint(17), 17},
{"int64", int64(17), 17},
{"uint64", uint64(17), 17},
{"int32", int32(17), 17},
{"uint32", uint32(17), 17},
{"int16", int16(17), 17},
{"uint16", uint16(17), 17},
{"int8", int8(17), 17},
{"uint8", uint8(17), 17},
{"float64", float64(17), 17},
{"float32", float32(17), 17},
{"bool", true, 1},
{"bool", false, 0},
{"string", "17", 0},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ConvertNumeric(tt.val); got != tt.want {
t.Errorf("ConvertNumeric() = %v, want %v", got, tt.want)
}
})
}
}
4 changes: 2 additions & 2 deletions route/otlp_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (r *Router) postOTLPLogs(w http.ResponseWriter, req *http.Request) {
return
}

if err := r.processOTLPRequest(req.Context(), result.Batches, keyToUse); err != nil {
if err := r.processOTLPRequest(req.Context(), result.Batches, keyToUse, ri.UserAgent); err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (l *LogsServer) Export(ctx context.Context, req *collectorlogs.ExportLogsSe
return nil, huskyotlp.AsGRPCError(err)
}

if err := l.router.processOTLPRequest(ctx, result.Batches, keyToUse); err != nil {
if err := l.router.processOTLPRequest(ctx, result.Batches, keyToUse, ri.UserAgent); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

Expand Down
57 changes: 57 additions & 0 deletions route/otlp_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,63 @@ func TestLogsOTLPHandler(t *testing.T) {
assert.Equal(t, 0, len(router.Collector.(*collect.MockCollector).Spans))
mockCollector.Flush()
})

t.Run("logs record incoming user agent - gRPC", func(t *testing.T) {
md := metadata.New(map[string]string{"x-honeycomb-team": legacyAPIKey, "x-honeycomb-dataset": "ds", "user-agent": "my-user-agent"})
ctx := metadata.NewIncomingContext(context.Background(), md)

req := &collectorlogs.ExportLogsServiceRequest{
ResourceLogs: []*logs.ResourceLogs{{
ScopeLogs: []*logs.ScopeLogs{{
LogRecords: createLogsRecords(),
}},
}},
}
_, err := logsServer.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 1, len(mockTransmission.Events))
event := mockTransmission.Events[0]
assert.Equal(t, "my-user-agent", event.Data["meta.refinery.incoming_user_agent"])

mockTransmission.Flush()
assert.Equal(t, 0, len(router.Collector.(*collect.MockCollector).Spans))
mockCollector.Flush()
})

t.Run("logs record incoming user agent - HTTP", func(t *testing.T) {
req := &collectorlogs.ExportLogsServiceRequest{
ResourceLogs: []*logs.ResourceLogs{{
ScopeLogs: []*logs.ScopeLogs{{
LogRecords: createLogsRecords(),
}},
}},
}
body, err := protojson.Marshal(req)
if err != nil {
t.Error(err)
}

request, _ := http.NewRequest("POST", "/v1/logs", bytes.NewReader(body))
request.Header = http.Header{}
request.Header.Set("content-type", "application/json")
request.Header.Set("x-honeycomb-team", legacyAPIKey)
request.Header.Set("x-honeycomb-dataset", "dataset")
request.Header.Set("user-agent", "my-user-agent")

w := httptest.NewRecorder()
router.postOTLPLogs(w, request)
assert.Equal(t, w.Code, http.StatusOK)

assert.Equal(t, 1, len(mockTransmission.Events))
event := mockTransmission.Events[0]
assert.Equal(t, "my-user-agent", event.Data["meta.refinery.incoming_user_agent"])

mockTransmission.Flush()
assert.Equal(t, 0, len(router.Collector.(*collect.MockCollector).Spans))
mockCollector.Flush()
})
}

func createLogsRecords() []*logs.LogRecord {
Expand Down
4 changes: 2 additions & 2 deletions route/otlp_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (r *Router) postOTLPTrace(w http.ResponseWriter, req *http.Request) {
return
}

if err := r.processOTLPRequest(req.Context(), result.Batches, keyToUse); err != nil {
if err := r.processOTLPRequest(req.Context(), result.Batches, keyToUse, ri.UserAgent); err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (t *TraceServer) Export(ctx context.Context, req *collectortrace.ExportTrac
return nil, huskyotlp.AsGRPCError(err)
}

if err := t.router.processOTLPRequest(ctx, result.Batches, keyToUse); err != nil {
if err := t.router.processOTLPRequest(ctx, result.Batches, keyToUse, ri.UserAgent); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

Expand Down
58 changes: 58 additions & 0 deletions route/otlp_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,13 @@ func TestOTLPHandler(t *testing.T) {
ReceiveKeys: []string{},
AcceptOnlyListedKeys: true,
}
defer func() {
router.Config.(*config.MockConfig).GetAccessKeyConfigVal = config.AccessKeyConfig{
ReceiveKeys: []string{legacyAPIKey},
AcceptOnlyListedKeys: false,
}
}()

req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
ScopeSpans: []*trace.ScopeSpans{{
Expand All @@ -498,6 +505,57 @@ func TestOTLPHandler(t *testing.T) {
assert.Equal(t, 0, len(mockTransmission.Events))
mockTransmission.Flush()
})

t.Run("spans record incoming user agent - gRPC", func(t *testing.T) {
md := metadata.New(map[string]string{"x-honeycomb-team": legacyAPIKey, "x-honeycomb-dataset": "ds", "user-agent": "my-user-agent"})
ctx := metadata.NewIncomingContext(context.Background(), md)

req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
ScopeSpans: []*trace.ScopeSpans{{
Spans: helperOTLPRequestSpansWithStatus(),
}},
}},
}
traceServer := NewTraceServer(router)
_, err := traceServer.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 2, len(mockTransmission.Events))
event := mockTransmission.Events[0]
assert.Equal(t, "my-user-agent", event.Data["meta.refinery.incoming_user_agent"])
mockTransmission.Flush()
})

t.Run("spans record incoming user agent - HTTP", func(t *testing.T) {
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
ScopeSpans: []*trace.ScopeSpans{{
Spans: helperOTLPRequestSpansWithStatus(),
}},
}},
}
body, err := protojson.Marshal(req)
if err != nil {
t.Error(err)
}

request, _ := http.NewRequest("POST", "/v1/traces", bytes.NewReader(body))
request.Header = http.Header{}
request.Header.Set("content-type", "application/json")
request.Header.Set("x-honeycomb-team", legacyAPIKey)
request.Header.Set("x-honeycomb-dataset", "dataset")
request.Header.Set("user-agent", "my-user-agent")

w := httptest.NewRecorder()
router.postOTLPTrace(w, request)

assert.Equal(t, 2, len(mockTransmission.Events))
event := mockTransmission.Events[0]
assert.Equal(t, "my-user-agent", event.Data["meta.refinery.incoming_user_agent"])
mockTransmission.Flush()
})
}

func helperOTLPRequestSpansWithoutStatus() []*trace.Span {
Expand Down
Loading
Loading