Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka exporter and receiver configuration #5703

Merged
merged 30 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f02e2fc
Add configs for collector and ingester
joeyyy09 Jul 3, 2024
a728c2e
Update collector config
joeyyy09 Jul 10, 2024
fc7da59
Update ingester config
joeyyy09 Jul 14, 2024
53173c5
Add initial kafka_test.go
joeyyy09 Jul 14, 2024
b2133cc
Add debug logs in the config
joeyyy09 Jul 15, 2024
2ccff53
Update conig files for UI
joeyyy09 Jul 15, 2024
c9c32cc
Update conig files
joeyyy09 Jul 16, 2024
802413a
Add kafka e2e setup
joeyyy09 Jul 18, 2024
819ee21
Add kafka e2e setup
joeyyy09 Jul 18, 2024
d02136e
Add Readme for Kafka
joeyyy09 Jul 18, 2024
3cf0151
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
d09eaa4
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
0acf969
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
17ebd78
Fixes
joeyyy09 Jul 19, 2024
dfcba73
Fix broken syntax
joeyyy09 Jul 19, 2024
fe794c1
Add injectStorageCleaner parameter
joeyyy09 Jul 19, 2024
6ab1285
fix merge conflicts
joeyyy09 Jul 19, 2024
66ef1f9
fix merge conflicts
joeyyy09 Jul 19, 2024
4d4a882
Modify E2EStorageIntegration struct
joeyyy09 Jul 20, 2024
7cc6a6f
Fixes
joeyyy09 Jul 20, 2024
29581a4
Fixes
joeyyy09 Jul 20, 2024
580bdc9
Update Lint
joeyyy09 Jul 20, 2024
fea62c6
Fix Lint
joeyyy09 Jul 20, 2024
284acc9
Add e2eCleanup in e2eInitialize
joeyyy09 Jul 21, 2024
4ba5917
Fixes
joeyyy09 Jul 24, 2024
06e68ac
Lint fixes
joeyyy09 Jul 24, 2024
07f903e
fix ui
yurishkuro Jul 24, 2024
46121f5
Merge branch 'main' into kafka-config
yurishkuro Jul 24, 2024
4d624d6
schedule clean-up after reader/writer are created
yurishkuro Jul 24, 2024
d7c70cf
remove duplicate call to cleanup
yurishkuro Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:
jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
37 changes: 37 additions & 0 deletions cmd/jaeger/ingester-remote-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889
logs:
level: debug

extensions:
jaeger_query:
trace_storage: some_storage

jaeger_storage:
backends:
some_storage:
memory:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: some_storage
50 changes: 50 additions & 0 deletions cmd/jaeger/internal/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,56 @@ flowchart LR
end
```

## Kafka Integration

The primary difference between the Kafka integration tests and other integration tests lies in the flow of data. In the standard tests, spans are written by the SpanWriter, sent through an RPC_client directly to a receiver, then to an exporter, and written to a storage backend. Spans are read by the SpanReader, which queries the jaeger_query process accessing the storage backend. In contrast, the Kafka tests introduce Kafka as an intermediary. Spans go from the SpanWriter through an RPC_client to an OTLP receiver in the Jaeger Collector, exported to Kafka, received by the Jaeger Ingester, and then stored. For details, see the [Architecture](#KafkaArchitecture) section below.


## Kafka Architecture

``` mermaid
flowchart LR
Test -->|writeSpan| SpanWriter
SpanWriter --> RPCW[RPC_client]
RPCW --> OTLP_Receiver[Receiver]
OTLP_Receiver --> CollectorExporter[Kafka Exporter]
CollectorExporter --> Kafka[Kafka]
Kafka --> IngesterReceiver[Kafka Receiver]
IngesterReceiver --> IngesterExporter[Exporter]
IngesterExporter --> StorageBackend[(In-Memory Store)]
Test -->|readSpan| SpanReader
SpanReader --> RPCR[RPC_client]
RPCR --> QueryProcess[jaeger_query]
StorageCleaner -->|purge| StorageBackend
QueryProcess --> StorageBackend


subgraph Integration_Test_Executable
Test
SpanWriter
SpanReader
RPCW
RPCR
end

subgraph Jaeger Collector
OTLP_Receiver
CollectorExporter
end

subgraph Jaeger Ingester
IngesterReceiver
IngesterExporter
QueryProcess
StorageBackend
StorageCleaner[Storage Cleaner Extension]
end

subgraph Kafka
Topic
end
```

## Running tests locally

All integration tests can be run locally.
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@ func TestBadgerStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "badger")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunAll(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,5 @@ func TestCassandraStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "cassandra")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
26 changes: 20 additions & 6 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const otlpPort = 4317
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
SkipStorageCleaner bool
integration.StorageIntegration
ConfigFile string
}
Expand All @@ -46,7 +47,10 @@ type E2EStorageIntegration struct {
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
configFile := s.ConfigFile
if !s.SkipStorageCleaner {
configFile = createStorageCleanerConfig(t, s.ConfigFile, storage)
}
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -93,19 +97,29 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
require.NoError(t, cmd.Process.Kill())
if err := cmd.Process.Kill(); err != nil {
t.Errorf("Failed to kill Jaeger-v2 process: %v", err)
}
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
s.e2eCleanUp(t)
if t.Failed() {
// A Github Actions special annotation to create a foldable section
// in the Github runner output.
// https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines
fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs")
outLogs, err := os.ReadFile(outFile.Name())
require.NoError(t, err)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)
if err != nil {
t.Errorf("Failed to read output logs: %v", err)
} else {
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)
}

errLogs, err := os.ReadFile(errFile.Name())
require.NoError(t, err)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
if err != nil {
t.Errorf("Failed to read error logs: %v", err)
} else {
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
}
// End of Github Actions foldable section annotation.
fmt.Println("::endgroup::")
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,5 @@ func TestElasticsearchStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "elasticsearch")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
40 changes: 40 additions & 0 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
}

// Initialize and start the collector
collector.e2eInitialize(t, "kafka")

ingester := &E2EStorageIntegration{
ConfigFile: ingesterConfig,
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}

// Initialize and start the ingester
ingester.e2eInitialize(t, "kafka")

// Run the span store tests
ingester.RunSpanStoreTests(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@ func TestMemoryStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "memory")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunAll(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@ func TestOpenSearchStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "opensearch")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var fixtures embed.FS
// - in those functions it instantiates and populates this struct
// - it then calls RunAll.
//
// Some implementations may declate multuple tests, with different settings,
// Some implementations may declare multiple tests, with different settings,
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
Expand Down
Loading