Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint: DO NOT MERGE Add tracing to collect loop #1385

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0249305
feat: extract key fields from rules config (#1327)
VinozzZ Sep 16, 2024
f8388c8
fix: Use peer transmission during redistribute and shutdown events (#…
MikeGoldsmith Sep 16, 2024
f6589e1
feat: Improve log messages to be more informative (#1322)
MikeGoldsmith Sep 16, 2024
19d24ee
fix: remove unnecessary assertion to any (#1333)
VinozzZ Sep 16, 2024
4625f13
feat: Update Honeycomb logger to use EMAThroughput sampler (#1328)
MikeGoldsmith Sep 16, 2024
5f8b2e6
maint: rename sent_reason_cache to kept_reason_cache (#1346)
VinozzZ Sep 20, 2024
637a3f7
feat: Improve shutdown logic (#1347)
kentquirk Sep 23, 2024
8c3b91a
fix: remove InMemoryCollector from liveness check on shutdown (#1349)
VinozzZ Sep 23, 2024
7528846
fix: set 0 for otel metrics during registration (#1352)
VinozzZ Sep 26, 2024
209cf99
maint: Refactor metrics registration to streamline declaration and en…
VinozzZ Sep 26, 2024
eb061f0
feat: generate metrics documentation (#1351)
VinozzZ Sep 26, 2024
4e49ccb
feat(doc): separate table for metrics contains prefix (#1354)
VinozzZ Sep 27, 2024
19986e4
feat: extract decision span from full span (#1338)
VinozzZ Sep 30, 2024
6104c79
maint(deps): bump the minor-patch group with 13 updates (#1357)
dependabot[bot] Oct 1, 2024
f4d3f9f
feat: forward decision span through peer endpoint (#1342)
VinozzZ Oct 2, 2024
52a689a
feat: Record original user agent for spans and logs (#1358)
MikeGoldsmith Oct 3, 2024
57d2243
fix: Put a limit on the size of sampler keys (#1364)
kentquirk Oct 7, 2024
37c4c8b
fix: Only set incoming user agent if not already present (#1366)
MikeGoldsmith Oct 7, 2024
75a8f31
maint: add collector_redistribute_traces_duration_ms metric (#1368)
VinozzZ Oct 9, 2024
46aec47
feat: make collector health check timeout configurable (#1371)
VinozzZ Oct 10, 2024
0b377ac
docs: Update configMeta.yaml with capitalization fixes (#1373)
mjingle Oct 10, 2024
bebf49b
maint: Update main documentation with 2.8.3 release (#1374)
TylerHelmuth Oct 10, 2024
84eeba9
maint: cherry pick v2.8.4 commits into main. (#1383)
TylerHelmuth Oct 11, 2024
f2d7784
Add tracing to collect loop
TylerHelmuth Oct 11, 2024
0a81b2b
Add more tracing to send expired
TylerHelmuth Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func (a *App) Start() error {
}

a.Logger.Debug().Logf("Starting up App...")
a.Metrics.Register("config_hash", "gauge")
a.Metrics.Register("rule_config_hash", "gauge")
for _, metric := range configHashMetrics {
a.Metrics.Register(metric)
}
a.IncomingRouter.SetVersion(a.Version)
a.PeerRouter.SetVersion(a.Version)

Expand All @@ -64,3 +65,18 @@ func (a *App) Stop() error {
a.Logger.Debug().Logf("Shutting down App...")
return nil
}

var configHashMetrics = []metrics.Metadata{
metrics.Metadata{
Name: "config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current configuration",
},
metrics.Metadata{
Name: "rule_config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current rules configuration",
},
}
165 changes: 126 additions & 39 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
)

const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e"
Expand Down Expand Up @@ -88,14 +89,8 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
basePort int,
peers peer.Peers,
enableHostMetadata bool,
) (*App, inject.Graph) {
c := &config.MockConfig{
func defaultConfig(basePort int) *config.MockConfig {
return &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
Expand All @@ -109,8 +104,7 @@ func newStartedApp(
GetListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort),
GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1),
GetHoneycombAPIVal: "http://api.honeycomb.io",
GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second)},
AddHostMetadataToTrace: enableHostMetadata,
GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second), EnableTraceLocality: true},
TraceIdFieldNames: []string{"trace.trace_id"},
ParentIdFieldNames: []string{"trace.parent_id"},
SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)},
Expand All @@ -119,7 +113,16 @@ func newStartedApp(
AcceptOnlyListedKeys: true,
},
}
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
peerTransmission transmission.Sender,
peers peer.Peers,
cfg *config.MockConfig,
) (*App, inject.Graph) {
c := cfg
var err error
if peers == nil {
peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}}
Expand Down Expand Up @@ -158,13 +161,13 @@ func newStartedApp(
})
assert.NoError(t, err)

sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: c.GetTracesConfigVal.MaxBatchSize,
if peerTransmission == nil {
sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
peerTransmission = &transmission.Honeycomb{
MaxBatchSize: cfg.GetTracesConfigVal.MaxBatchSize,
BatchTimeout: libhoney.DefaultBatchTimeout,
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
PendingWorkCapacity: uint(cfg.GetPeerBufferSize()),
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Expand All @@ -175,7 +178,10 @@ func newStartedApp(
DisableGzipCompression: true,
EnableMsgpackEncoding: true,
Metrics: sdPeer,
},
}
}
peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: peerTransmission,
})
assert.NoError(t, err)

Expand Down Expand Up @@ -210,7 +216,7 @@ func newStartedApp(
assert.NoError(t, err)

// Racy: wait just a moment for ListenAndServe to start up.
time.Sleep(10 * time.Millisecond)
time.Sleep(15 * time.Millisecond)
return &a, g
}

Expand All @@ -228,7 +234,8 @@ func TestAppIntegration(t *testing.T) {
port := 10500

sender := &transmission.MockSender{}
app, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
app, graph := newStartedApp(t, sender, nil, nil, cfg)

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
Expand Down Expand Up @@ -265,7 +272,8 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) {
port := 10600

sender := &transmission.MockSender{}
a, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })

Expand Down Expand Up @@ -305,7 +313,8 @@ func TestAppIntegrationWithUnauthorizedKey(t *testing.T) {
port := 10700

sender := &transmission.MockSender{}
a, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })

Expand Down Expand Up @@ -346,7 +355,9 @@ func TestPeerRouting(t *testing.T) {
Peers: peerList,
ID: peerList[i],
}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)

apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

Expand Down Expand Up @@ -404,7 +415,6 @@ func TestPeerRouting(t *testing.T) {
},
}
assert.Equal(t, expectedEvent, senders[0].Events()[0])

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's who the trace belongs to.
req, err = http.NewRequest(
Expand All @@ -418,18 +428,26 @@ func TestPeerRouting(t *testing.T) {

req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
require.Eventually(t, func() bool {
return len(senders[0].Events()) == 2
}, 2*time.Second, 2*time.Millisecond)
assert.Equal(t, expectedEvent, senders[0].Events()[0])
expectedEvent.Metadata = map[string]any{
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": senders[0].Events()[1].Metadata.(map[string]any)["enqueued_at"],
}
assert.Equal(t, expectedEvent, senders[0].Events()[1])
}

func TestHostMetadataSpanAdditions(t *testing.T) {
t.Parallel()
port := 14000

sender := &transmission.MockSender{}
app, graph := newStartedApp(t, sender, port, nil, true)
cfg := defaultConfig(port)
cfg.AddHostMetadataToTrace = true
app, graph := newStartedApp(t, sender, nil, nil, cfg)

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
Expand Down Expand Up @@ -483,11 +501,12 @@ func TestEventsEndpoint(t *testing.T) {
ID: peerList[i],
}

apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)
apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
// Deliver to host 1, it should be passed to host 0
zEnc, _ := zstd.NewWriter(nil)
blob := zEnc.EncodeAll([]byte(`{"foo":"bar","trace.trace_id":"1"}`), nil)
req, err := http.NewRequest(
Expand All @@ -506,7 +525,6 @@ func TestEventsEndpoint(t *testing.T) {
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
Expand All @@ -530,7 +548,6 @@ func TestEventsEndpoint(t *testing.T) {
},
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's the host this trace belongs to.

Expand All @@ -556,7 +573,6 @@ func TestEventsEndpoint(t *testing.T) {
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
Expand All @@ -581,14 +597,16 @@ func TestEventsEndpoint(t *testing.T) {
senders[0].Events()[0],
)
}

func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peerList := []string{
"http://localhost:15001",
"http://localhost:15003",
}
// this traceID was chosen because it hashes to the appropriate shard for this
// test. You can't change it or the number of peers and still expect the test to pass.
traceID := "4"

var apps [2]*App
var senders [2]*transmission.MockSender
Expand All @@ -600,16 +618,15 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
ID: peerList[i],
}

app, graph := newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)

app, graph := newStartedApp(t, senders[i], nil, peers, cfg)
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)
}

// this traceID was chosen because it hashes to the appropriate shard for this
// test. You can't change it or the number of peers and still expect the test to pass.
traceID := "4"
traceData := []byte(fmt.Sprintf(`{"foo":"bar","trace.trace_id":"%s"}`, traceID))
// Deliver to host 1, it should be passed to host 0 and emitted there.
zEnc, _ := zstd.NewWriter(nil)
Expand Down Expand Up @@ -654,7 +671,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
},
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0.

Expand Down Expand Up @@ -706,6 +722,75 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
)
}

func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()

peerList := []string{"http://localhost:17001", "http://localhost:17003"}

var apps [2]*App
var senders [2]*transmission.MockSender
var peerSenders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 17000 + (i * 2)
senders[i] = &transmission.MockSender{}
peerSenders[i] = &transmission.MockSender{}
peers := &peer.MockPeers{
Peers: peerList,
ID: peerList[i],
}
cfg := defaultConfig(basePort)
collectionCfg := cfg.GetCollectionConfigVal
collectionCfg.EnableTraceLocality = false
cfg.GetCollectionConfigVal = collectionCfg

apps[i], graph = newStartedApp(t, senders[i], peerSenders[i], peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
req, err := http.NewRequest(
"POST",
"http://localhost:17002/1/batch/dataset",
nil,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

// this span index was chosen because it hashes to the appropriate shard for this
// test. You can't change it and expect the test to pass.
blob := `[` + string(spans[10]) + `]`
req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
require.Eventually(t, func() bool {
return len(peerSenders[1].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

expectedEvent := &transmission.Event{
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 2,
APIHost: "http://localhost:17001",
Timestamp: now,
Data: map[string]interface{}{
"trace_id": "2",
"meta.refinery.min_span": true,
"meta.annotation_type": types.SpanAnnotationTypeUnknown,
"meta.refinery.root": false,
"meta.refinery.span_data_size": 157,
},
Metadata: map[string]any{
"api_host": "http://localhost:17001",
"dataset": "dataset",
"environment": "",
"enqueued_at": peerSenders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
},
}
assert.Equal(t, expectedEvent, peerSenders[1].Events()[0])
}

var (
now = time.Now().UTC()
nowString = now.Format(time.RFC3339Nano)
Expand Down Expand Up @@ -766,7 +851,8 @@ func BenchmarkTraces(b *testing.B) {
W: io.Discard,
},
}
_, graph := newStartedApp(b, sender, 11000, nil, false)
cfg := defaultConfig(11000)
_, graph := newStartedApp(b, sender, nil, nil, cfg)

req, err := http.NewRequest(
"POST",
Expand Down Expand Up @@ -868,7 +954,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
ID: peerList[i],
}

apps[i], graph = newStartedApp(b, sender, basePort, peers, false)
cfg := defaultConfig(basePort)
apps[i], graph = newStartedApp(b, sender, nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
Expand Down
Loading
Loading