Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka exporter and receiver configuration #5703

Merged
merged 30 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f02e2fc
Add configs for collector and ingester
joeyyy09 Jul 3, 2024
a728c2e
Update collector config
joeyyy09 Jul 10, 2024
fc7da59
Update ingester config
joeyyy09 Jul 14, 2024
53173c5
Add initial kafka_test.go
joeyyy09 Jul 14, 2024
b2133cc
Add debug logs in the config
joeyyy09 Jul 15, 2024
2ccff53
Update conig files for UI
joeyyy09 Jul 15, 2024
c9c32cc
Update conig files
joeyyy09 Jul 16, 2024
802413a
Add kafka e2e setup
joeyyy09 Jul 18, 2024
819ee21
Add kafka e2e setup
joeyyy09 Jul 18, 2024
d02136e
Add Readme for Kafka
joeyyy09 Jul 18, 2024
3cf0151
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
d09eaa4
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
0acf969
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
17ebd78
Fixes
joeyyy09 Jul 19, 2024
dfcba73
Fix broken syntax
joeyyy09 Jul 19, 2024
fe794c1
Add injectStorageCleaner parameter
joeyyy09 Jul 19, 2024
6ab1285
fix merge conflicts
joeyyy09 Jul 19, 2024
66ef1f9
fix merge conflicts
joeyyy09 Jul 19, 2024
4d4a882
Modify E2EStorageIntegration struct
joeyyy09 Jul 20, 2024
7cc6a6f
Fixes
joeyyy09 Jul 20, 2024
29581a4
Fixes
joeyyy09 Jul 20, 2024
580bdc9
Update Lint
joeyyy09 Jul 20, 2024
fea62c6
Fix Lint
joeyyy09 Jul 20, 2024
284acc9
Add e2eCleanup in e2eInitialize
joeyyy09 Jul 21, 2024
4ba5917
Fixes
joeyyy09 Jul 24, 2024
06e68ac
Lint fixes
joeyyy09 Jul 24, 2024
07f903e
fix ui
yurishkuro Jul 24, 2024
46121f5
Merge branch 'main' into kafka-config
yurishkuro Jul 24, 2024
4d624d6
schedule clean-up after reader/writer are created
yurishkuro Jul 24, 2024
d7c70cf
remove duplicate call to cleanup
yurishkuro Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:
jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
37 changes: 37 additions & 0 deletions cmd/jaeger/ingester-remote-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889
logs:
level: debug

extensions:
jaeger_query:
trace_storage: some_storage

jaeger_storage:
backends:
some_storage:
memory:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: some_storage
120 changes: 120 additions & 0 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"context"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
)

// KafkaCollectorIntegration handles the Jaeger collector with a custom configuration.
type KafkaCollectorIntegration struct {
CollectorConfigFile string
collectorCmd *exec.Cmd
}

// e2eInitialize starts the Jaeger collector with the provided config file.
func (s *KafkaCollectorIntegration) e2eInitialize(t *testing.T) {
collectorConfigFile := createStorageCleanerConfig(t, s.CollectorConfigFile, "kafka")
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved

// Start the collector
t.Logf("Starting Jaeger collector in the background with config file %s", collectorConfigFile)
s.startProcess(t, "collector", collectorConfigFile)
}

func (s *KafkaCollectorIntegration) startProcess(t *testing.T, processType, configFile string) {
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved
outFile, err := os.OpenFile(
filepath.Join(t.TempDir(), fmt.Sprintf("jaeger_%s_output_logs.txt", processType)),
os.O_CREATE|os.O_WRONLY,
os.ModePerm,
)
require.NoError(t, err)
t.Logf("Writing the Jaeger %s output logs into %s", processType, outFile.Name())

errFile, err := os.OpenFile(
filepath.Join(t.TempDir(), fmt.Sprintf("jaeger_%s_error_logs.txt", processType)),
os.O_CREATE|os.O_WRONLY,
os.ModePerm,
)
require.NoError(t, err)
t.Logf("Writing the Jaeger %s error logs into %s", processType, errFile.Name())

cmd := &exec.Cmd{
Path: fmt.Sprintf("./cmd/jaeger/jaeger-%s", processType),
Args: []string{fmt.Sprintf("jaeger-%s", processType), "--config", configFile},
Dir: "../../../..",
Stdout: outFile,
Stderr: errFile,
}
require.NoError(t, cmd.Start())

require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved
t.Logf("Checking if Jaeger %s is available on %s", processType, url)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Log(err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 30*time.Second, 500*time.Millisecond, fmt.Sprintf("Jaeger %s did not start", processType))
t.Logf("Jaeger %s is ready", processType)

if processType == "collector" {
s.collectorCmd = cmd
}
}

// e2eCleanUp stops the processes.
func (s *KafkaCollectorIntegration) e2eCleanUp(t *testing.T) {
if s.collectorCmd != nil {
require.NoError(t, s.collectorCmd.Process.Kill())
}
}

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

// Initialize and start the collector
collector := &KafkaCollectorIntegration{
CollectorConfigFile: "../../collector-with-kafka.yaml",
}
collector.e2eInitialize(t)
t.Cleanup(func() {
collector.e2eCleanUp(t)
})

// Reuse E2EStorageIntegration for the ingester
ingester := &E2EStorageIntegration{
ConfigFile: "../../ingester-remote-storage.yaml",
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}
ingester.e2eInitialize(t, "kafka")
t.Cleanup(func() {
ingester.e2eCleanUp(t)
})

// Run the span store tests
ingester.RunSpanStoreTests(t)
}
Loading