Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade storage integration test to v2 Trace Reader #6388

Merged
merged 15 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const otlpPort = 4317
Expand Down Expand Up @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
spanReader, err := createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
Expand Down Expand Up @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
spanReader, err := v1adapter.GetV1Reader(s.TraceReader)
require.NoError(t, err)
require.NoError(t, spanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/tailsampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) {
var actual []string
assert.Eventually(t, func() bool {
var err error
actual, err = ts.SpanReader.GetServices(context.Background())
actual, err = ts.TraceReader.GetServices(context.Background())
require.NoError(t, err)
sort.Strings(actual)
return assert.ObjectsAreEqualValues(ts.expectedServices, actual)
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type BadgerIntegrationStorage struct {
Expand All @@ -35,8 +36,9 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanWriter, err = s.factory.CreateSpanWriter()
require.NoError(t, err)

s.SpanReader, err = s.factory.CreateSpanReader()
spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type CassandraStorageIntegration struct {
Expand Down Expand Up @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type GRPCStorageIntegrationTestSuite struct {
Expand All @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {

s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
45 changes: 27 additions & 18 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

//go:embed fixtures
Expand All @@ -42,7 +44,7 @@ var fixtures embed.FS
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
Expand Down Expand Up @@ -79,7 +81,7 @@ type StorageIntegration struct {
// the service name is formatted "query##-service".
type QueryFixtures struct {
Caption string
Query *spanstore.TraceQueryParameters
Query *tracestore.TraceQueryParams
ExpectedFixtures []string
}

Expand Down Expand Up @@ -143,7 +145,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
var actual []string
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetServices(context.Background())
actual, err = s.TraceReader.GetServices(context.Background())
if err != nil {
t.Log(err)
return false
Expand All @@ -154,9 +156,10 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
// If the storage backend returns more services than expected, let's log traces for those
t.Log("🛑 Found unexpected services!")
for _, service := range actual {
traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
continue
Expand Down Expand Up @@ -216,8 +219,10 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) >= len(expected.Spans)
})

Expand All @@ -242,27 +247,27 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)

var expected []spanstore.Operation
var expected []tracestore.Operation
if s.GetOperationsMissingSpanKind {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1"},
{Name: "example-operation-3"},
{Name: "example-operation-4"},
}
} else {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1", SpanKind: ""},
{Name: "example-operation-3", SpanKind: "server"},
{Name: "example-operation-4", SpanKind: "client"},
}
}
s.loadParseAndWriteExampleTrace(t)

var actual []spanstore.Operation
var actual []tracestore.Operation
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetOperations(context.Background(),
spanstore.OperationQueryParameters{ServiceName: "example-service-1"})
actual, err = s.TraceReader.GetOperations(context.Background(),
tracestore.OperationQueryParameters{ServiceName: "example-service-1"})
if err != nil {
t.Log(err)
return false
Expand All @@ -289,11 +294,13 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
}
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
Expand All @@ -302,9 +309,10 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {

t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := model.TraceID{High: 0, Low: 1}
trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, trace)
assert.Nil(t, traces)
})
}

Expand Down Expand Up @@ -342,11 +350,12 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
}
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace {
func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace {
var traces []*model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
traces, err = s.SpanReader.FindTraces(context.Background(), query)
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
return false
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const defaultLocalKafkaBroker = "127.0.0.1:9092"
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
spanConsumer.Start()

s.SpanWriter = spanWriter
s.SpanReader = &ingester{traceStore}
spanReader := &ingester{traceStore}
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.CleanUp = func(_ *testing.T) {}
s.SkipArchiveTest = true
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type MemStorageIntegrationTestSuite struct {
Expand All @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.SpanReader = store
spanReader := store
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.SpanWriter = store
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore
Expand Down
54 changes: 52 additions & 2 deletions storage_v2/v1adapter/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,27 @@ import (

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarningsToModelSpans(traces, spanMap)
return batches
}

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
//
// TODO remove this function in favor of V1BatchesFromTraces
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
return V1BatchesFromTraces(traces)
}

// V1BatchesToTraces converts Jaeger model batches ([]*model.Batch)
// to OpenTelemetry traces (ptrace.Traces).
func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces {
Expand All @@ -32,6 +42,46 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces {
return traces
}

// V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces.
// Returns spanstore.ErrTraceNotFound for empty iterators.
func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
var (
jaegerTraces []*model.Trace
iterErr error
)
jptrace.AggregateTraces(otelSeq)(func(otelTrace ptrace.Traces, err error) bool {
if err != nil {
iterErr = err
return false
}
jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace))
return true
})
if iterErr != nil {
return nil, iterErr
}
if len(jaegerTraces) == 0 {
return nil, spanstore.ErrTraceNotFound
}
return jaegerTraces, nil
}

// modelTraceFromOtelTrace extracts spans from otel traces
func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace {
var spans []*model.Span
batches := V1BatchesFromTraces(otelTrace)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
proc := *batch.Process // shallow clone
span.Process = &proc
}
spans = append(spans, span)
}
}
return &model.Trace{Spans: spans}
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand Down
Loading
Loading