Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][query] Create archive reader/writer using regular factory methods #6519

Merged
merged 15 commits into from
Jan 17, 2025
Merged
3 changes: 1 addition & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) {
}

func (sp *spanProcessor) writeSpan(ctx context.Context, span *model.Span) error {
spanWriter, err := v1adapter.GetV1Writer(sp.traceWriter)
if err == nil {
if spanWriter, ok := v1adapter.GetV1Writer(sp.traceWriter); ok {
return spanWriter.WriteSpan(ctx, span)
}
traces := v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}})
Expand Down
58 changes: 46 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
Expand All @@ -22,7 +23,9 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/storage/metricstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -94,7 +97,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

var opts querysvc.QueryServiceOptions
var v2opts v2querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
}
Expand Down Expand Up @@ -140,26 +142,58 @@ func (s *server) addArchiveStorage(
return nil
}

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesArchive, host)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this method anywhere? If yes, can that code be upgraded as well? If no, I would remove it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let's book child tickets to address that

f, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesArchive, host)
if err != nil {
return fmt.Errorf("cannot find archive storage factory: %w", err)
return fmt.Errorf("cannot find traces archive storage factory: %w", err)
}

if !opts.InitArchiveStorage(f, s.telset.Logger) {
s.telset.Logger.Info("Archive storage not initialized")
traceReader, traceWriter := s.initArchiveStorage(f)
if traceReader == nil || traceWriter == nil {
return nil
}

ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger)
if ar != nil && aw != nil {
v2opts.ArchiveTraceReader = ar
v2opts.ArchiveTraceWriter = aw
} else {
s.telset.Logger.Info("Archive storage not initialized")
}
v2opts.ArchiveTraceReader = traceReader
v2opts.ArchiveTraceWriter = traceWriter

spanReader, spanWriter := getV1Adapters(traceReader, traceWriter)

opts.ArchiveSpanReader = spanReader
opts.ArchiveSpanWriter = spanWriter

return nil
}

func (s *server) initArchiveStorage(f tracestore.Factory) (tracestore.Reader, tracestore.Writer) {
reader, err := f.CreateTraceReader()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage reader", zap.Error(err))
return nil, nil
}
writer, err := f.CreateTraceWriter()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage writer", zap.Error(err))
return nil, nil
}
return reader, writer
}

func getV1Adapters(
reader tracestore.Reader,
writer tracestore.Writer,
) (spanstore.Reader, spanstore.Writer) {
v1Reader, ok := v1adapter.GetV1Reader(reader)
if !ok {
v1Reader = v1adapter.NewSpanReader(reader)
}

v1Writer, ok := v1adapter.GetV1Writer(writer)
if !ok {
v1Writer = v1adapter.NewSpanWriter(writer)
}

return v1Reader, v1Writer
}

func (s *server) createMetricReader(host component.Host) (metricstore.Reader, error) {
if s.config.Storage.Metrics == "" {
s.telset.Logger.Info("Metric storage not configured")
Expand Down
72 changes: 54 additions & 18 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
metricstoremocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type fakeFactory struct {
Expand Down Expand Up @@ -63,14 +66,6 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return &spanstoremocks.Reader{}, nil
}

func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
Expand Down Expand Up @@ -104,11 +99,6 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) {
if name == "need-factory-error" {
return nil, false
} else if name == "no-archive" {
f := fakeFactory{name: name}
return struct {
storage.Factory
}{f}, true
}

return fakeFactory{name: name}, true
Expand Down Expand Up @@ -193,7 +183,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "jaeger_storage",
},
},
expectedErr: "cannot find archive storage factory",
expectedErr: "cannot find traces archive storage factory",
},
{
name: "metrics storage error",
Expand Down Expand Up @@ -296,19 +286,32 @@ func TestServerAddArchiveStorage(t *testing.T) {
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
expectedErr: "cannot find traces archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
name: "Error in trace reader",
config: &Config{
Storage: Storage{
TracesArchive: "no-archive",
TracesArchive: "need-span-reader-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedOutput: "Cannot init traces archive storage reader",
expectedErr: "",
},
{
name: "Error in trace writer",
config: &Config{
Storage: Storage{
TracesArchive: "need-span-writer-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Cannot init traces archive storage writer",
expectedErr: "",
},
{
Expand Down Expand Up @@ -350,6 +353,39 @@ func TestServerAddArchiveStorage(t *testing.T) {
}
}

func TestGetV1Adapters(t *testing.T) {
tests := []struct {
name string
reader tracestore.Reader
writer tracestore.Writer
expectedReader spanstore.Reader
expectedWriter spanstore.Writer
}{
{
name: "native tracestore.Reader and tracestore.Writer",
reader: &tracestoremocks.Reader{},
writer: &tracestoremocks.Writer{},
expectedReader: v1adapter.NewSpanReader(&tracestoremocks.Reader{}),
expectedWriter: v1adapter.NewSpanWriter(&tracestoremocks.Writer{}),
},
{
name: "wrapped spanstore.Reader and spanstore.Writer",
reader: v1adapter.NewTraceReader(&spanstoremocks.Reader{}),
writer: v1adapter.NewTraceWriter(&spanstoremocks.Writer{}),
expectedReader: &spanstoremocks.Reader{},
expectedWriter: &spanstoremocks.Writer{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotReader, gotWriter := getV1Adapters(test.reader, test.writer)
require.Equal(t, test.expectedReader, gotReader)
require.Equal(t, test.expectedWriter, gotWriter)
})
}
}

func TestServerAddMetricsStorage(t *testing.T) {
host := componenttest.NewNopHost()

Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type TraceQueryParameters struct {

// NewQueryService returns a new QueryService.
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
spanReader, err := v1adapter.GetV1Reader(traceReader)
if err != nil {
spanReader, ok := v1adapter.GetV1Reader(traceReader)
if !ok {
// if the spanstore.Reader is not available, downgrade the native tracestore.Reader to
// a spanstore.Reader
spanReader = v1adapter.NewSpanReader(traceReader)
Expand Down
8 changes: 3 additions & 5 deletions storage_v2/v1adapter/tracereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var ErrV1ReaderNotAvailable = errors.New("spanstore.Reader is not a wrapper around v1 reader")

var _ tracestore.Reader = (*TraceReader)(nil)

type TraceReader struct {
spanReader spanstore.Reader
}

func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, error) {
func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, bool) {
if tr, ok := reader.(*TraceReader); ok {
return tr.spanReader, nil
return tr.spanReader, ok
}
return nil, ErrV1ReaderNotAvailable
return nil, false
}

func NewTraceReader(spanReader spanstore.Reader) *TraceReader {
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/tracereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestGetV1Reader_NoError(t *testing.T) {
traceReader := &TraceReader{
spanReader: memstore,
}
v1Reader, err := GetV1Reader(traceReader)
require.NoError(t, err)
v1Reader, ok := GetV1Reader(traceReader)
require.True(t, ok)
require.Equal(t, memstore, v1Reader)
}

func TestGetV1Reader_Error(t *testing.T) {
fr := new(tracestoremocks.Reader)
_, err := GetV1Reader(fr)
require.ErrorIs(t, err, ErrV1ReaderNotAvailable)
_, ok := GetV1Reader(fr)
require.False(t, ok)
}

func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions storage_v2/v1adapter/tracewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var ErrV1WriterNotAvailable = errors.New("spanstore.Writer is not a wrapper around v1 writer")

type TraceWriter struct {
spanWriter spanstore.Writer
}

func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, error) {
func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, bool) {
if tr, ok := writer.(*TraceWriter); ok {
return tr.spanWriter, nil
return tr.spanWriter, ok
}
return nil, ErrV1WriterNotAvailable
return nil, false
}

func NewTraceWriter(spanWriter spanstore.Writer) *TraceWriter {
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/tracewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func TestGetV1Writer_NoError(t *testing.T) {
traceWriter := &TraceWriter{
spanWriter: memstore,
}
v1Writer, err := GetV1Writer(traceWriter)
require.NoError(t, err)
v1Writer, ok := GetV1Writer(traceWriter)
require.True(t, ok)
require.Equal(t, memstore, v1Writer)
}

func TestGetV1Writer_Error(t *testing.T) {
w := new(tracestoremocks.Writer)
_, err := GetV1Writer(w)
require.ErrorIs(t, err, ErrV1WriterNotAvailable)
_, ok := GetV1Writer(w)
require.False(t, ok)
}

func makeTraces() ptrace.Traces {
Expand Down
Loading