diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 83811f68fe6..a277c66f62f 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -227,8 +227,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) { } func (sp *spanProcessor) writeSpan(ctx context.Context, span *model.Span) error { - spanWriter, err := v1adapter.GetV1Writer(sp.traceWriter) - if err == nil { + if spanWriter, ok := v1adapter.GetV1Writer(sp.traceWriter); ok { return spanWriter.WriteSpan(ctx, span) } traces := v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}}) diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 72880fc3fd1..063da1fb8b5 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" @@ -22,7 +23,9 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metricstore/disabled" "github.com/jaegertracing/jaeger/storage/metricstore" + "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/depstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) @@ -94,7 +97,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error { var opts querysvc.QueryServiceOptions var v2opts v2querysvc.QueryServiceOptions - // TODO archive storage still uses v1 factory if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil { return err } @@ -140,26 +142,58 @@ func (s *server) addArchiveStorage( return nil } - f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesArchive, host) + f, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesArchive, host) if err != nil { - return fmt.Errorf("cannot find archive storage factory: %w", err) + return fmt.Errorf("cannot find traces archive storage factory: %w", err) } - if !opts.InitArchiveStorage(f, s.telset.Logger) { - s.telset.Logger.Info("Archive storage not initialized") + traceReader, traceWriter := s.initArchiveStorage(f) + if traceReader == nil || traceWriter == nil { + return nil } - ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger) - if ar != nil && aw != nil { - v2opts.ArchiveTraceReader = ar - v2opts.ArchiveTraceWriter = aw - } else { - s.telset.Logger.Info("Archive storage not initialized") - } + v2opts.ArchiveTraceReader = traceReader + v2opts.ArchiveTraceWriter = traceWriter + + spanReader, spanWriter := getV1Adapters(traceReader, traceWriter) + + opts.ArchiveSpanReader = spanReader + opts.ArchiveSpanWriter = spanWriter return nil } +func (s *server) initArchiveStorage(f tracestore.Factory) (tracestore.Reader, tracestore.Writer) { + reader, err := f.CreateTraceReader() + if err != nil { + s.telset.Logger.Error("Cannot init traces archive storage reader", zap.Error(err)) + return nil, nil + } + writer, err := f.CreateTraceWriter() + if err != nil { + s.telset.Logger.Error("Cannot init traces archive storage writer", zap.Error(err)) + return nil, nil + } + return reader, writer +} + +func getV1Adapters( + reader tracestore.Reader, + writer tracestore.Writer, +) (spanstore.Reader, spanstore.Writer) { + v1Reader, ok := v1adapter.GetV1Reader(reader) + if !ok { + v1Reader = v1adapter.NewSpanReader(reader) + } + + v1Writer, ok := v1adapter.GetV1Writer(writer) + if !ok { + v1Writer = v1adapter.NewSpanWriter(writer) + } + + return v1Reader, v1Writer +} + func (s *server) createMetricReader(host component.Host) (metricstore.Reader, error) { if s.config.Storage.Metrics == "" { s.telset.Logger.Info("Metric storage not configured") diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 3f5dc3957a8..1a5effd21e1 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -36,6 +36,9 @@ import ( metricstoremocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type fakeFactory struct { @@ -63,14 +66,6 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) { return &spanstoremocks.Writer{}, nil } -func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { - return &spanstoremocks.Reader{}, nil -} - -func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - return &spanstoremocks.Writer{}, nil -} - func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error { if ff.name == "need-initialize-error" { return errors.New("test-error") @@ -104,11 +99,6 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil) func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) { if name == "need-factory-error" { return nil, false - } else if name == "no-archive" { - f := fakeFactory{name: name} - return struct { - storage.Factory - }{f}, true } return fakeFactory{name: name}, true @@ -193,7 +183,7 @@ func TestServerStart(t *testing.T) { TracesPrimary: "jaeger_storage", }, }, - expectedErr: "cannot find archive storage factory", + expectedErr: "cannot find traces archive storage factory", }, { name: "metrics storage error", @@ -296,19 +286,32 @@ func TestServerAddArchiveStorage(t *testing.T) { qSvcOpts: &querysvc.QueryServiceOptions{}, v2qSvcOpts: &v2querysvc.QueryServiceOptions{}, expectedOutput: "", - expectedErr: "cannot find archive storage factory: cannot find extension", + expectedErr: "cannot find traces archive storage factory: cannot find extension", }, { - name: "Archive storage not supported", + name: "Error in trace reader", config: &Config{ Storage: Storage{ - TracesArchive: "no-archive", + TracesArchive: "need-span-reader-error", }, }, qSvcOpts: &querysvc.QueryServiceOptions{}, v2qSvcOpts: &v2querysvc.QueryServiceOptions{}, extension: fakeStorageExt{}, - expectedOutput: "Archive storage not supported by the factory", + expectedOutput: "Cannot init traces archive storage reader", + expectedErr: "", + }, + { + name: "Error in trace writer", + config: &Config{ + Storage: Storage{ + TracesArchive: "need-span-writer-error", + }, + }, + qSvcOpts: &querysvc.QueryServiceOptions{}, + v2qSvcOpts: &v2querysvc.QueryServiceOptions{}, + extension: fakeStorageExt{}, + expectedOutput: "Cannot init traces archive storage writer", expectedErr: "", }, { @@ -350,6 +353,39 @@ func TestServerAddArchiveStorage(t *testing.T) { } } +func TestGetV1Adapters(t *testing.T) { + tests := []struct { + name string + reader tracestore.Reader + writer tracestore.Writer + expectedReader spanstore.Reader + expectedWriter spanstore.Writer + }{ + { + name: "native tracestore.Reader and tracestore.Writer", + reader: &tracestoremocks.Reader{}, + writer: &tracestoremocks.Writer{}, + expectedReader: v1adapter.NewSpanReader(&tracestoremocks.Reader{}), + expectedWriter: v1adapter.NewSpanWriter(&tracestoremocks.Writer{}), + }, + { + name: "wrapped spanstore.Reader and spanstore.Writer", + reader: v1adapter.NewTraceReader(&spanstoremocks.Reader{}), + writer: v1adapter.NewTraceWriter(&spanstoremocks.Writer{}), + expectedReader: &spanstoremocks.Reader{}, + expectedWriter: &spanstoremocks.Writer{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotReader, gotWriter := getV1Adapters(test.reader, test.writer) + require.Equal(t, test.expectedReader, gotReader) + require.Equal(t, test.expectedWriter, gotWriter) + }) + } +} + func TestServerAddMetricsStorage(t *testing.T) { host := componenttest.NewNopHost() diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 4598acb9924..ca241c71500 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -61,8 +61,8 @@ type TraceQueryParameters struct { // NewQueryService returns a new QueryService. func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService { - spanReader, err := v1adapter.GetV1Reader(traceReader) - if err != nil { + spanReader, ok := v1adapter.GetV1Reader(traceReader) + if !ok { // if the spanstore.Reader is not available, downgrade the native tracestore.Reader to // a spanstore.Reader spanReader = v1adapter.NewSpanReader(traceReader) diff --git a/storage_v2/v1adapter/tracereader.go b/storage_v2/v1adapter/tracereader.go index 67d7f00c126..1e704f55ad5 100644 --- a/storage_v2/v1adapter/tracereader.go +++ b/storage_v2/v1adapter/tracereader.go @@ -18,19 +18,17 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) -var ErrV1ReaderNotAvailable = errors.New("spanstore.Reader is not a wrapper around v1 reader") - var _ tracestore.Reader = (*TraceReader)(nil) type TraceReader struct { spanReader spanstore.Reader } -func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, error) { +func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, bool) { if tr, ok := reader.(*TraceReader); ok { - return tr.spanReader, nil + return tr.spanReader, ok } - return nil, ErrV1ReaderNotAvailable + return nil, false } func NewTraceReader(spanReader spanstore.Reader) *TraceReader { diff --git a/storage_v2/v1adapter/tracereader_test.go b/storage_v2/v1adapter/tracereader_test.go index 4ebceec57c6..d0cdda12495 100644 --- a/storage_v2/v1adapter/tracereader_test.go +++ b/storage_v2/v1adapter/tracereader_test.go @@ -31,15 +31,15 @@ func TestGetV1Reader_NoError(t *testing.T) { traceReader := &TraceReader{ spanReader: memstore, } - v1Reader, err := GetV1Reader(traceReader) - require.NoError(t, err) + v1Reader, ok := GetV1Reader(traceReader) + require.True(t, ok) require.Equal(t, memstore, v1Reader) } func TestGetV1Reader_Error(t *testing.T) { fr := new(tracestoremocks.Reader) - _, err := GetV1Reader(fr) - require.ErrorIs(t, err, ErrV1ReaderNotAvailable) + _, ok := GetV1Reader(fr) + require.False(t, ok) } func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) { diff --git a/storage_v2/v1adapter/tracewriter.go b/storage_v2/v1adapter/tracewriter.go index 59b6a1b408c..4c9c6f75281 100644 --- a/storage_v2/v1adapter/tracewriter.go +++ b/storage_v2/v1adapter/tracewriter.go @@ -13,17 +13,15 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) -var ErrV1WriterNotAvailable = errors.New("spanstore.Writer is not a wrapper around v1 writer") - type TraceWriter struct { spanWriter spanstore.Writer } -func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, error) { +func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, bool) { if tr, ok := writer.(*TraceWriter); ok { - return tr.spanWriter, nil + return tr.spanWriter, ok } - return nil, ErrV1WriterNotAvailable + return nil, false } func NewTraceWriter(spanWriter spanstore.Writer) *TraceWriter { diff --git a/storage_v2/v1adapter/tracewriter_test.go b/storage_v2/v1adapter/tracewriter_test.go index af9858c667a..3342f0fd9cb 100644 --- a/storage_v2/v1adapter/tracewriter_test.go +++ b/storage_v2/v1adapter/tracewriter_test.go @@ -62,15 +62,15 @@ func TestGetV1Writer_NoError(t *testing.T) { traceWriter := &TraceWriter{ spanWriter: memstore, } - v1Writer, err := GetV1Writer(traceWriter) - require.NoError(t, err) + v1Writer, ok := GetV1Writer(traceWriter) + require.True(t, ok) require.Equal(t, memstore, v1Writer) } func TestGetV1Writer_Error(t *testing.T) { w := new(tracestoremocks.Writer) - _, err := GetV1Writer(w) - require.ErrorIs(t, err, ErrV1WriterNotAvailable) + _, ok := GetV1Writer(w) + require.False(t, ok) } func makeTraces() ptrace.Traces {