diff --git a/cmd/jaeger/config-badger.yaml b/cmd/jaeger/config-badger.yaml index e919cd4411b4..5ce7ce9dbfee 100644 --- a/cmd/jaeger/config-badger.yaml +++ b/cmd/jaeger/config-badger.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [otlp] @@ -7,6 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: + healthcheckv2: + use_v2: true + http: + jaeger_query: trace_storage: some_store trace_storage_archive: another_store diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index e72ae33f411e..00b038144941 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [otlp] @@ -7,6 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: + healthcheckv2: + use_v2: true + http: + jaeger_query: trace_storage: some_storage trace_storage_archive: another_storage diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index bcff0987ca91..c5d16071137c 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [otlp] @@ -7,6 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: + healthcheckv2: + use_v2: true + http: + jaeger_query: trace_storage: some_storage trace_storage_archive: another_storage diff --git a/cmd/jaeger/collector-with-kafka.yaml b/cmd/jaeger/config-kafka-collector.yaml similarity index 85% rename from cmd/jaeger/collector-with-kafka.yaml rename to cmd/jaeger/config-kafka-collector.yaml index a248357b9784..ac438f30022a 100644 --- a/cmd/jaeger/collector-with-kafka.yaml +++ b/cmd/jaeger/config-kafka-collector.yaml @@ -1,4 +1,5 @@ service: + extensions: [healthcheckv2] pipelines: traces: receivers: [otlp, jaeger] @@ -10,6 +11,11 @@ service: metrics: level: detailed +extensions: + healthcheckv2: + use_v2: true + http: + receivers: otlp: protocols: diff --git a/cmd/jaeger/ingester-remote-storage.yaml b/cmd/jaeger/config-kafka-ingester.yaml similarity index 74% rename from cmd/jaeger/ingester-remote-storage.yaml rename to cmd/jaeger/config-kafka-ingester.yaml index 2f200c6533aa..9adfc308a9c3 100644 --- a/cmd/jaeger/ingester-remote-storage.yaml +++ b/cmd/jaeger/config-kafka-ingester.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [kafka] @@ -12,6 +12,12 @@ service: level: debug extensions: + healthcheckv2: + use_v2: true + http: + # use different port to avoid conflict with collector + endpoint: 0.0.0.0:14133 + jaeger_query: trace_storage: some_storage @@ -21,7 +27,7 @@ extensions: memory: max_traces: 100000 -receivers: +receivers: kafka: brokers: - localhost:9092 diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index 14973700aa9a..19be6e941c19 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [otlp] @@ -7,6 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: + healthcheckv2: + use_v2: true + http: + jaeger_query: trace_storage: some_storage trace_storage_archive: another_storage diff --git a/cmd/jaeger/config-remote-storage.yaml b/cmd/jaeger/config-remote-storage.yaml index a5e1b1528e14..7c8b3dd72288 100644 --- a/cmd/jaeger/config-remote-storage.yaml +++ b/cmd/jaeger/config-remote-storage.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, healthcheckv2] pipelines: traces: receivers: [otlp] @@ -7,6 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: + healthcheckv2: + use_v2: true + http: + jaeger_query: trace_storage: some-storage ui_config: ./cmd/jaeger/config-ui.json diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index dba80824d2d8..f068e81ef6a8 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -1,5 +1,5 @@ service: - extensions: [jaeger_storage, jaeger_query, remote_sampling] + extensions: [jaeger_storage, jaeger_query, remote_sampling, healthcheckv2] pipelines: traces: receivers: [otlp, jaeger, zipkin] @@ -7,7 +7,10 @@ service: exporters: [jaeger_storage_exporter] extensions: - # health_check: + healthcheckv2: + use_v2: true + http: + # pprof: # endpoint: 0.0.0.0:1777 # zpages: diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index ea7459c268f6..55f1517f9d77 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -4,13 +4,16 @@ package integration import ( + "bytes" "context" + "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" + "strings" "testing" "time" @@ -41,6 +44,7 @@ type E2EStorageIntegration struct { SkipStorageCleaner bool ConfigFile string + BinaryName string HealthCheckEndpoint string } @@ -49,16 +53,18 @@ type E2EStorageIntegration struct { // This function should be called before any of the tests start. func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) + if s.BinaryName == "" { + s.BinaryName = "jaeger-v2" + } configFile := s.ConfigFile if !s.SkipStorageCleaner { configFile = createStorageCleanerConfig(t, s.ConfigFile, storage) } - configFile, err := filepath.Abs(configFile) require.NoError(t, err, "Failed to get absolute path of the config file") require.FileExists(t, configFile, "Config file does not exist at the resolved path") - t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile) + t.Logf("Starting %s in the background with config file %s", s.BinaryName, configFile) outFile, err := os.OpenFile( filepath.Join(t.TempDir(), "jaeger_output_logs.txt"), @@ -66,7 +72,7 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { os.ModePerm, ) require.NoError(t, err) - t.Logf("Writing the Jaeger-v2 output logs into %s", outFile.Name()) + t.Logf("Writing the %s output logs into %s", s.BinaryName, outFile.Name()) errFile, err := os.OpenFile( filepath.Join(t.TempDir(), "jaeger_error_logs.txt"), @@ -74,7 +80,7 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { os.ModePerm, ) require.NoError(t, err) - t.Logf("Writing the Jaeger-v2 error logs into %s", errFile.Name()) + t.Logf("Writing the %s error logs into %s", s.BinaryName, errFile.Name()) cmd := exec.Cmd{ Path: "./cmd/jaeger/jaeger", @@ -88,57 +94,38 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { } t.Logf("Running command: %v", cmd.Args) require.NoError(t, cmd.Start()) - - // Wait for the binary to start and become ready to serve requests. - healthCheckEndpoint := s.HealthCheckEndpoint - if healthCheckEndpoint == "" { - healthCheckEndpoint = fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP) - } - require.Eventually(t, func() bool { - t.Logf("Checking if Jaeger-v2 is available on %s", healthCheckEndpoint) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil) - if err != nil { - t.Logf("HTTP request creation failed: %v", err) - return false - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - t.Logf("HTTP request failed: %v", err) - return false - } - defer resp.Body.Close() - return resp.StatusCode == http.StatusOK - }, 60*time.Second, 3*time.Second, "Jaeger-v2 did not start") - t.Log("Jaeger-v2 is ready") t.Cleanup(func() { if err := cmd.Process.Kill(); err != nil { - t.Errorf("Failed to kill Jaeger-v2 process: %v", err) + t.Errorf("Failed to kill %s process: %v", s.BinaryName, err) } if t.Failed() { // A Github Actions special annotation to create a foldable section // in the Github runner output. // https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines - fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs") + fmt.Printf("::group::🚧 🚧 🚧 %s binary logs\n", s.BinaryName) outLogs, err := os.ReadFile(outFile.Name()) if err != nil { t.Errorf("Failed to read output logs: %v", err) } else { - fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs) + fmt.Printf("🚧 🚧 🚧 %s output logs:\n%s", s.BinaryName, outLogs) } errLogs, err := os.ReadFile(errFile.Name()) if err != nil { t.Errorf("Failed to read error logs: %v", err) } else { - fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs) + fmt.Printf("🚧 🚧 🚧 %s error logs:\n%s", s.BinaryName, errLogs) } // End of Github Actions foldable section annotation. fmt.Println("::endgroup::") } }) + // Wait for the binary to start and become ready to serve requests. + require.Eventually(t, func() bool { return s.doHealthCheck(t) }, + 60*time.Second, 3*time.Second, "%s did not start", s.BinaryName) + t.Logf("%s is ready", s.BinaryName) + s.SpanWriter, err = createSpanWriter(logger, otlpPort) require.NoError(t, err) s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC) @@ -150,6 +137,56 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { }) } +func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { + healthCheckEndpoint := s.HealthCheckEndpoint + if healthCheckEndpoint == "" { + healthCheckEndpoint = "http://localhost:13133/status" + } + t.Logf("Checking if %s is available on %s", s.BinaryName, healthCheckEndpoint) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil) + if err != nil { + t.Logf("HTTP request creation failed: %v", err) + return false + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Logf("HTTP request failed: %v", err) + return false + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Logf("Failed to read HTTP response body: %v", err) + return false + } + if resp.StatusCode != http.StatusOK { + t.Logf("HTTP response not OK: %v", string(body)) + return false + } + // for backwards compatibility with other healthchecks + if !strings.HasSuffix(healthCheckEndpoint, "/status") { + t.Logf("OK HTTP from endpoint that is not healthcheckv2") + return true + } + + var healthResponse struct { + Status string `json:"status"` + } + if err := json.NewDecoder(bytes.NewReader(body)).Decode(&healthResponse); err != nil { + t.Logf("Failed to decode JSON response '%s': %v", string(body), err) + return false + } + + // Check if the status field in the JSON is "StatusOK" + if healthResponse.Status != "StatusOK" { + t.Logf("Received non-K status %s: %s", healthResponse.Status, string(body)) + return false + } + return true +} + // e2eCleanUp closes the SpanReader and SpanWriter gRPC connection. // This function should be called after all the tests are finished. func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) { @@ -205,6 +242,7 @@ func createStorageCleanerConfig(t *testing.T, configFile string, storage string) err = os.WriteFile(tempFile, newData, 0o600) require.NoError(t, err) + t.Logf("Transformed configuration file %s to %s", configFile, tempFile) return tempFile } diff --git a/cmd/jaeger/internal/integration/grpc_test.go b/cmd/jaeger/internal/integration/grpc_test.go index cd12984bc622..4f7c1ceefd22 100644 --- a/cmd/jaeger/internal/integration/grpc_test.go +++ b/cmd/jaeger/internal/integration/grpc_test.go @@ -4,9 +4,11 @@ package integration import ( + "fmt" "testing" "github.com/jaegertracing/jaeger/plugin/storage/integration" + "github.com/jaegertracing/jaeger/ports" ) type GRPCStorageIntegration struct { @@ -30,6 +32,8 @@ func TestGRPCStorage(t *testing.T) { s := &GRPCStorageIntegration{ E2EStorageIntegration: E2EStorageIntegration{ ConfigFile: "../../config-remote-storage.yaml", + // TODO this should be removed in favor of default health check endpoint + HealthCheckEndpoint: fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP), }, } s.CleanUp = s.cleanUp diff --git a/cmd/jaeger/internal/integration/kafka_test.go b/cmd/jaeger/internal/integration/kafka_test.go index 45038e335b6e..0b98d4b3b6c1 100644 --- a/cmd/jaeger/internal/integration/kafka_test.go +++ b/cmd/jaeger/internal/integration/kafka_test.go @@ -63,8 +63,7 @@ func createConfigWithEncoding(t *testing.T, configFile string, targetEncoding st err = os.WriteFile(tempFile, newData, 0o600) require.NoError(t, err, "Failed to write updated config file to: %s", tempFile) - t.Logf("Configuration file with encoding '%s' and topic '%s' written to: %s", targetEncoding, uniqueTopic, tempFile) - + t.Logf("Transformed configuration file %s to %s", configFile, tempFile) return tempFile } @@ -96,15 +95,17 @@ func TestKafkaStorage(t *testing.T) { // but the tests are run against the ingester. collector := &E2EStorageIntegration{ - SkipStorageCleaner: true, - ConfigFile: createConfigWithEncoding(t, "../../collector-with-kafka.yaml", test.encoding, uniqueTopic), - HealthCheckEndpoint: "http://localhost:8888/metrics", + BinaryName: "jaeger-v2-collector", + ConfigFile: createConfigWithEncoding(t, "../../config-kafka-collector.yaml", test.encoding, uniqueTopic), + SkipStorageCleaner: true, } collector.e2eInitialize(t, "kafka") t.Log("Collector initialized") ingester := &E2EStorageIntegration{ - ConfigFile: createConfigWithEncoding(t, "../../ingester-remote-storage.yaml", test.encoding, uniqueTopic), + BinaryName: "jaeger-v2-ingester", + ConfigFile: createConfigWithEncoding(t, "../../config-kafka-ingester.yaml", test.encoding, uniqueTopic), + HealthCheckEndpoint: "http://localhost:14133/status", StorageIntegration: integration.StorageIntegration{ CleanUp: purge, GetDependenciesReturnsSource: true, diff --git a/cmd/jaeger/internal/integration/span_reader.go b/cmd/jaeger/internal/integration/span_reader.go index 3553551de259..abd98ecd6aa6 100644 --- a/cmd/jaeger/internal/integration/span_reader.go +++ b/cmd/jaeger/internal/integration/span_reader.go @@ -96,7 +96,6 @@ func (r *spanReader) GetServices(ctx context.Context) ([]string, error) { if err != nil { return []string{}, err } - r.logger.Info(fmt.Sprintf("Received %d services", len(res.Services))) return res.Services, nil } @@ -109,8 +108,6 @@ func (r *spanReader) GetOperations(ctx context.Context, query spanstore.Operatio if err != nil { return operations, err } - r.logger.Info(fmt.Sprintf("Received %d operations", len(res.Operations))) - for _, operation := range res.Operations { operations = append(operations, spanstore.Operation{ Name: operation.Name, diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index ab0a89f8ee48..09bb5114cf28 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -146,6 +146,25 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { actual, err = s.SpanReader.GetServices(context.Background()) require.NoError(t, err) sort.Strings(actual) + t.Logf("Retrieved services: %v", actual) + if len(actual) > len(expected) { + // If the storage backend returns more services than expected, let's log traces for those + t.Log("🛑 Found unexpected services!") + for _, service := range actual { + traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ + ServiceName: service, + }) + if err != nil { + t.Log(err) + continue + } + for _, trace := range traces { + for _, span := range trace.Spans { + t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) + } + } + } + } return assert.ObjectsAreEqualValues(expected, actual) }) @@ -231,6 +250,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { sort.Slice(actual, func(i, j int) bool { return actual[i].Name < actual[j].Name }) + t.Logf("Retrieved operations: %v", actual) return assert.ObjectsAreEqualValues(expected, actual) }) diff --git a/scripts/kafka-integration-test.sh b/scripts/kafka-integration-test.sh index eecaaf2092d5..e4a1d375b01f 100755 --- a/scripts/kafka-integration-test.sh +++ b/scripts/kafka-integration-test.sh @@ -45,7 +45,7 @@ setup_kafka() { } dump_logs() { - echo "::group::Kafka logs" + echo "::group::🚧 🚧 🚧 Kafka logs" docker compose -f "${compose_file}" logs echo "::endgroup::" }