From fd2854125676fb91159f5ff36ba771199d93bea3 Mon Sep 17 00:00:00 2001 From: "Emmanuel E. Ebenezer" Date: Thu, 26 Dec 2024 06:09:06 +0100 Subject: [PATCH] Upgrade storage integration test to v2 Trace Reader (#6388) ## Which problem is this PR solving? - Part of #6366 ## Description of the changes - Incrementally swaps the fields of `StorageIntegration` to align with v2 storage api while supporting v1 api - Updates test functions accordingly to work with the updated fields ## How was this change tested? - make test ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [ ] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Emmanuel Emonueje Ebenezer Signed-off-by: Ebenezer Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- .../internal/integration/e2e_integration.go | 8 +- .../internal/integration/tailsampling_test.go | 2 +- .../storage/integration/badgerstore_test.go | 4 +- plugin/storage/integration/cassandra_test.go | 4 +- .../storage/integration/elasticsearch_test.go | 4 +- plugin/storage/integration/grpc_test.go | 4 +- plugin/storage/integration/integration.go | 56 ++++--- plugin/storage/integration/kafka_test.go | 4 +- plugin/storage/integration/memstore_test.go | 4 +- storage_v2/v1adapter/translator.go | 49 +++++- storage_v2/v1adapter/translator_test.go | 148 ++++++++++++++++++ 11 files changed, 254 insertions(+), 33 deletions(-) diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index c8681e4ed7e..9cb44d210ab 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -24,6 +24,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" "github.com/jaegertracing/jaeger/plugin/storage/integration" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const otlpPort = 4317 @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { s.SpanWriter, err = createSpanWriter(logger, otlpPort) require.NoError(t, err) - s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC) + spanReader, err := createSpanReader(logger, ports.QueryGRPC) require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) t.Cleanup(func() { // Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection. @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { // e2eCleanUp closes the SpanReader and SpanWriter gRPC connection. // This function should be called after all the tests are finished. func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) { - require.NoError(t, s.SpanReader.(io.Closer).Close()) + spanReader, err := v1adapter.GetV1Reader(s.TraceReader) + require.NoError(t, err) + require.NoError(t, spanReader.(io.Closer).Close()) require.NoError(t, s.SpanWriter.(io.Closer).Close()) } diff --git a/cmd/jaeger/internal/integration/tailsampling_test.go b/cmd/jaeger/internal/integration/tailsampling_test.go index 6ce27cc649e..d548552c112 100644 --- a/cmd/jaeger/internal/integration/tailsampling_test.go +++ b/cmd/jaeger/internal/integration/tailsampling_test.go @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) { var actual []string assert.Eventually(t, func() bool { var err error - actual, err = ts.SpanReader.GetServices(context.Background()) + actual, err = ts.TraceReader.GetServices(context.Background()) require.NoError(t, err) sort.Strings(actual) return assert.ObjectsAreEqualValues(ts.expectedServices, actual) diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 6bec5b45365..23778fcb87a 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -14,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type BadgerIntegrationStorage struct { @@ -35,8 +36,9 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { s.SpanWriter, err = s.factory.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = s.factory.CreateSpanReader() + spanReader, err := s.factory.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.SamplingStore, err = s.factory.CreateSamplingStore(0) require.NoError(t, err) diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 41d67c94544..559529498d9 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type CassandraStorageIntegration struct { @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 36f44a2bd19..37d91aab4f3 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const ( @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 8ca6df51ddb..17e58887a1b 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type GRPCStorageIntegrationTestSuite struct { @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index c03c6fb911c..f9e6e9a9a78 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -27,6 +27,8 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) //go:embed fixtures @@ -42,7 +44,7 @@ var fixtures embed.FS // and RunAll() under different conditions. type StorageIntegration struct { SpanWriter spanstore.Writer - SpanReader spanstore.Reader + TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer @@ -79,7 +81,7 @@ type StorageIntegration struct { // the service name is formatted "query##-service". type QueryFixtures struct { Caption string - Query *spanstore.TraceQueryParameters + Query *tracestore.TraceQueryParams ExpectedFixtures []string } @@ -143,7 +145,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetServices(context.Background()) + actual, err = s.TraceReader.GetServices(context.Background()) if err != nil { t.Log(err) return false @@ -154,9 +156,10 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { // If the storage backend returns more services than expected, let's log traces for those t.Log("🛑 Found unexpected services!") for _, service := range actual { - traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ + iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ ServiceName: service, }) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) continue @@ -214,10 +217,13 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { expected := s.writeLargeTraceWithDuplicateSpanIds(t) expectedTraceID := expected.Spans[0].TraceID - var actual *model.Trace + actual := &model.Trace{} // no spans found := s.waitForCondition(t, func(_ *testing.T) bool { - var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) + if len(traces) > 0 { + actual = traces[0] + } return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -242,15 +248,15 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - var expected []spanstore.Operation + var expected []tracestore.Operation if s.GetOperationsMissingSpanKind { - expected = []spanstore.Operation{ + expected = []tracestore.Operation{ {Name: "example-operation-1"}, {Name: "example-operation-3"}, {Name: "example-operation-4"}, } } else { - expected = []spanstore.Operation{ + expected = []tracestore.Operation{ {Name: "example-operation-1", SpanKind: ""}, {Name: "example-operation-3", SpanKind: "server"}, {Name: "example-operation-4", SpanKind: "client"}, @@ -258,11 +264,11 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { } s.loadParseAndWriteExampleTrace(t) - var actual []spanstore.Operation + var actual []tracestore.Operation found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetOperations(context.Background(), - spanstore.OperationQueryParameters{ServiceName: "example-service-1"}) + actual, err = s.TraceReader.GetOperations(context.Background(), + tracestore.OperationQueryParameters{ServiceName: "example-service-1"}) if err != nil { t.Log(err) return false @@ -287,14 +293,18 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { expected := s.loadParseAndWriteExampleTrace(t) expectedTraceID := expected.Spans[0].TraceID - var actual *model.Trace + actual := &model.Trace{} // no spans found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) + return false + } + if len(traces) > 0 { + actual = traces[0] } - return err == nil && len(actual.Spans) == len(expected.Spans) + return len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { CompareTraces(t, expected, actual) @@ -302,9 +312,10 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { t.Run("NotFound error", func(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} - trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) - assert.Equal(t, spanstore.ErrTraceNotFound, err) - assert.Nil(t, trace) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()}) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) + require.NoError(t, err) // v2 TraceReader no longer returns an error for not found + assert.Empty(t, traces) }) } @@ -342,11 +353,12 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { } } -func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace { +func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace { var traces []*model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - traces, err = s.SpanReader.FindTraces(context.Background(), query) + iterTraces := s.TraceReader.FindTraces(context.Background(), *query) + traces, err = v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) return false diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 24b2131993d..2b42773ca4a 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -25,6 +25,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/kafka" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const defaultLocalKafkaBroker = "127.0.0.1:9092" @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) { spanConsumer.Start() s.SpanWriter = spanWriter - s.SpanReader = &ingester{traceStore} + spanReader := &ingester{traceStore} + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.CleanUp = func(_ *testing.T) {} s.SkipArchiveTest = true } diff --git a/plugin/storage/integration/memstore_test.go b/plugin/storage/integration/memstore_test.go index 35eaede7517..f910a2f6ed0 100644 --- a/plugin/storage/integration/memstore_test.go +++ b/plugin/storage/integration/memstore_test.go @@ -11,6 +11,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type MemStorageIntegrationTestSuite struct { @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) { store := memory.NewStore() archiveStore := memory.NewStore() s.SamplingStore = memory.NewSamplingStore(2) - s.SpanReader = store + spanReader := store + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.SpanWriter = store s.ArchiveSpanReader = archiveStore s.ArchiveSpanWriter = archiveStore diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index dda96a5630a..835bff80437 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -10,17 +10,26 @@ import ( "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" ) -// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) +// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces) // to Jaeger model batches ([]*model.Batch). -func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { +func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch { batches := jaegerTranslator.ProtoFromTraces(traces) spanMap := createSpanMapFromBatches(batches) transferWarningsToModelSpans(traces, spanMap) return batches } +// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) +// to Jaeger model batches ([]*model.Batch). +// +// TODO remove this function in favor of V1BatchesFromTraces +func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { + return V1BatchesFromTraces(traces) +} + // V1BatchesToTraces converts Jaeger model batches ([]*model.Batch) // to OpenTelemetry traces (ptrace.Traces). func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces { @@ -32,6 +41,42 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces { return traces } +// V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces. +func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { + var ( + jaegerTraces []*model.Trace + iterErr error + ) + jptrace.AggregateTraces(otelSeq)(func(otelTrace ptrace.Traces, err error) bool { + if err != nil { + iterErr = err + return false + } + jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace)) + return true + }) + if iterErr != nil { + return nil, iterErr + } + return jaegerTraces, nil +} + +// modelTraceFromOtelTrace extracts spans from otel traces +func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace { + var spans []*model.Span + batches := V1BatchesFromTraces(otelTrace) + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + proc := *batch.Process // shallow clone + span.Process = &proc + } + spans = append(spans, span) + } + } + return &model.Trace{Spans: spans} +} + func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { spanMap := make(map[model.SpanID]*model.Span) for _, batch := range batches { diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index e78a71f423f..a3b5ca5ab17 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -4,14 +4,18 @@ package v1adapter import ( + "errors" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" ) func TestProtoFromTraces_AddsWarnings(t *testing.T) { @@ -105,3 +109,147 @@ func TestProtoToTraces_AddsWarnings(t *testing.T) { assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) assert.Equal(t, []string{"test-warning-3"}, jptrace.GetWarnings(span3)) } + +func TestV1TracesFromSeq2(t *testing.T) { + var ( + processNoServiceName string = "OTLPResourceNoServiceName" + startTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time + ) + + testCases := []struct { + name string + expectedModelTraces []*model.Trace + seqTrace iter.Seq2[[]ptrace.Traces, error] + expectedErr error + }{ + { + name: "sequence with one trace", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-success-a", + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + // Add a new span and set attributes + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-success-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + yield([]ptrace.Traces{testTrace}, nil) + }, + expectedErr: nil, + }, + { + name: "sequence with two chunks of a trace", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-two-chunks-a", + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, + }, { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(2), + OperationName: "op-two-chunks-b", + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + traceChunk1 := ptrace.NewTraces() + rSpans1 := traceChunk1.ResourceSpans().AppendEmpty() + sSpans1 := rSpans1.ScopeSpans().AppendEmpty() + spans1 := sSpans1.Spans() + modelTraceID := model.NewTraceID(2, 3) + span1 := spans1.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-two-chunks-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + traceChunk2 := ptrace.NewTraces() + rSpans2 := traceChunk2.ResourceSpans().AppendEmpty() + sSpans2 := rSpans2.ScopeSpans().AppendEmpty() + spans2 := sSpans2.Spans() + span2 := spans2.AppendEmpty() + span2.SetTraceID(modelTraceID.ToOTELTraceID()) + span2.SetName("op-two-chunks-b") + span2.SetSpanID(model.NewSpanID(2).ToOTELSpanID()) + // Yield the test trace + yield([]ptrace.Traces{traceChunk1, traceChunk2}, nil) + }, + expectedErr: nil, + }, + { + // a case that occurs when no trace is contained in the iterator + name: "empty sequence", + expectedModelTraces: nil, + seqTrace: func(_ func([]ptrace.Traces, error) bool) {}, + expectedErr: nil, + }, + { + name: "sequence containing error", + expectedModelTraces: nil, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-error-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + if !yield([]ptrace.Traces{testTrace}, nil) { + return + } + yield(nil, errors.New("unexpected-op-err")) + }, + expectedErr: errors.New("unexpected-op-err"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualTraces, err := V1TracesFromSeq2(tc.seqTrace) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, len(tc.expectedModelTraces), len(actualTraces)) + if len(tc.expectedModelTraces) < 1 { + return + } + for i, etrace := range tc.expectedModelTraces { + eSpans := etrace.Spans + aSpans := actualTraces[i].Spans + require.Equal(t, len(eSpans), len(aSpans)) + for j, espan := range eSpans { + assert.Equal(t, espan.TraceID, aSpans[j].TraceID) + assert.Equal(t, espan.OperationName, aSpans[j].OperationName) + assert.Equal(t, espan.Process, aSpans[j].Process) + } + } + }) + } +}