Skip to content

Commit

Permalink
[v2][query] Create archive reader/writer using regular factory methods (
Browse files Browse the repository at this point in the history
#6519)

## Which problem is this PR solving?
- Towards #6065

## Description of the changes
- This PR changes the jaegerquery extension to remove usages of
`CreateArchiveSpanReader` and `CreateArchiveSpanWriter` and replace them
with their primary storage counterparts from the v2 storage API
`CreateTraceReader` and `CreateTraceWriter`.
- 🛑 **This PR contains a breaking changes for users of jaeger-v2 that
have an ElasticSearch storage configured for `traces_archive`** 🛑
- The distinction between primary and archive storage was removed which
causes some breaking changes that can be remediated. Refer to Step 9 of
the test report below for remediation steps.

## Test Report

### 1. Establish Ground Truth on `main`
To begin, configure the storage and query settings in the
`all-in-one.yaml` file as follows:

```yaml
jaeger_storage:
  backends:
    some_storage:
      elasticsearch:
        indices:
          index_prefix: "jaeger-main"
          spans:
            date_layout: "2006-01-02"
            rollover_frequency: "day"
            shards: 5
            replicas: 1
          services:
            date_layout: "2006-01-02"
            rollover_frequency: "day"
            shards: 5
            replicas: 1
          dependencies:
            date_layout: "2006-01-02"
            rollover_frequency: "day"
            shards: 5
            replicas: 1
          sampling:
            date_layout: "2006-01-02"
            rollover_frequency: "day"
            shards: 5
            replicas: 1
    another_storage:
      elasticsearch:
        indices:
          index_prefix: "jaeger-archive"
```

### 2. Spin Up Elasticsearch Using Docker
To bring up Elasticsearch, run the following commands:

```zsh
jaeger % cd docker-compose/elasticsearch/v8 
v8 % docker compose up
```

### 3. Start Jaeger
Start the Jaeger service:

```zsh
jaeger % go run ./cmd/jaeger
```

### 4. Archive a Trace
From the Jaeger UI, select a trace and archive it.

**traceID**: `0dc3e460bd9b8e0dddfa29a2f751cfb9`

![Trace
Screenshot](https://github.com/user-attachments/assets/c9ef72a6-30da-4496-9356-20b5db42663f)

### 5. Update `index_prefix`
Stop Jaeger and modify the `index_prefix` in the primary configuration
to `jaeger-main-1`. This ensures the query for the same trace is no
longer found in the primary storage but will be found in the archive
storage.

### 6. Query for the Same Trace
```zsh
curl -s http://localhost:16686/api/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq '.data[].spans[] | {traceID, operationName}'
{
  "traceID": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "operationName": "/api/services"
}
{
  "traceID": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "operationName": "GetService"
}
```
```zsh
curl -s http://localhost:16686/api/v3/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq '.result.resourceSpans[].scopeSpans[].spans[] | {traceId, name}'
{
  "traceId": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "name": "GetService"
}
{
  "traceId": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "name": "/api/services"
}
```

### 7. Test Changes from This PR
Stop Jaeger, checkout this PR, and restart Jaeger:

```zsh
gh pr checkout 6519
go run ./cmd/jaeger
```

### 8. Query for the Same Trace After PR Changes
```zsh
curl -s http://localhost:16686/api/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq .
{
  "data": null,
  "total": 0,
  "limit": 0,
  "offset": 0,
  "errors": [
    {
      "code": 404,
      "msg": "trace not found"
    }
  ]
}
```
```zsh
curl -s http://localhost:16686/api/v3/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq .
{
  "error": {
    "httpCode": 404,
    "message": "No traces found"
  }
}
```

🛑 **This is where the breaking change occurs** 🛑

### 9. Mitigation for Users

#### Set `use_aliases` for Archive Storage
To mitigate the issue, set the `use_aliases` configuration for your
archive storage to `true`. Update the configuration as follows:

```yaml
another_storage:
  elasticsearch:
    indices:
      index_prefix: "jaeger-archive"
    use_aliases: true
```

#### Add Alias from Old Index to New Index
To ensure backwards compatibility, add an alias from the old index to
the new index. You can query the current set of aliases in Elasticsearch
with:

```bash
curl -X GET "http://localhost:9200/_aliases?pretty"
```
```json
{
  "jaeger-main-1-jaeger-span-2025-01-16": { "aliases": {} },
  "jaeger-main-2-jaeger-span-2025-01-16": { "aliases": {} },
  "jaeger-archive-jaeger-span-archive": { "aliases": {} },
  "jaeger-main-1-jaeger-service-2025-01-16": { "aliases": {} }
}
```

To link the new index (`jaeger-archive-jaeger-span-read`) with the old
index (`jaeger-archive-jaeger-span-archive`), run the following:

```bash
curl -X POST "http://localhost:9200/_aliases" -H 'Content-Type: application/json' -d'
{
  "actions": [
    {
      "add": {
        "index": "jaeger-archive-jaeger-span-archive",
        "alias": "jaeger-archive-jaeger-span-read"
      }
    }
  ]
}'
```

Confirm that the alias has been added:

```bash
curl -X GET "http://localhost:9200/_aliases?pretty"
```
```json
{
  "jaeger-main-1-jaeger-span-2025-01-16": { "aliases": {} },
  "jaeger-archive-jaeger-span-archive": { "aliases": { "jaeger-archive-jaeger-span-read": {} } },
  "jaeger-main-2-jaeger-span-2025-01-16": { "aliases": {} },
  "jaeger-main-2-jaeger-service-2025-01-16": { "aliases": {} },
  "jaeger-main-1-jaeger-service-2025-01-16": { "aliases": {} }
}
```

Note that if you already had `use_aliases` set to true for your archive
storage before the upgrade, then Jaeger would've been using an index
name with -read suffix: `jaeger-archive-jaeger-span-archive-read`. Then
for the example above, you would create an alias
`jaeger-archive-jaeger-span-read` pointing to
`jaeger-archive-jaeger-span-archive-read`.

### 10. Restart Jaeger and Retry the Query
Finally, restart Jaeger and run the same trace queries again:

```zsh
curl -s http://localhost:16686/api/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq '.data[].spans[] | {traceID, operationName}'
{
  "traceID": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "operationName": "/api/services"
}
{
  "traceID": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "operationName": "GetService"
}
```

```zsh
curl -s http://localhost:16686/api/v3/traces/0dc3e460bd9b8e0dddfa29a2f751cfb9 | jq '.result.resourceSpans[].scopeSpans[].spans[] | {traceId, name}'
{
  "traceId": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "name": "/api/services"
}
{
  "traceId": "0dc3e460bd9b8e0dddfa29a2f751cfb9",
  "name": "GetService"
}
```

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
mahadzaryab1 and yurishkuro authored Jan 17, 2025
1 parent 463f5f3 commit 7f16f49
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 52 deletions.
3 changes: 1 addition & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) {
}

func (sp *spanProcessor) writeSpan(ctx context.Context, span *model.Span) error {
spanWriter, err := v1adapter.GetV1Writer(sp.traceWriter)
if err == nil {
if spanWriter, ok := v1adapter.GetV1Writer(sp.traceWriter); ok {
return spanWriter.WriteSpan(ctx, span)
}
traces := v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}})
Expand Down
58 changes: 46 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
Expand All @@ -22,7 +23,9 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/storage/metricstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -94,7 +97,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

var opts querysvc.QueryServiceOptions
var v2opts v2querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
}
Expand Down Expand Up @@ -140,26 +142,58 @@ func (s *server) addArchiveStorage(
return nil
}

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesArchive, host)
f, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesArchive, host)
if err != nil {
return fmt.Errorf("cannot find archive storage factory: %w", err)
return fmt.Errorf("cannot find traces archive storage factory: %w", err)
}

if !opts.InitArchiveStorage(f, s.telset.Logger) {
s.telset.Logger.Info("Archive storage not initialized")
traceReader, traceWriter := s.initArchiveStorage(f)
if traceReader == nil || traceWriter == nil {
return nil
}

ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger)
if ar != nil && aw != nil {
v2opts.ArchiveTraceReader = ar
v2opts.ArchiveTraceWriter = aw
} else {
s.telset.Logger.Info("Archive storage not initialized")
}
v2opts.ArchiveTraceReader = traceReader
v2opts.ArchiveTraceWriter = traceWriter

spanReader, spanWriter := getV1Adapters(traceReader, traceWriter)

opts.ArchiveSpanReader = spanReader
opts.ArchiveSpanWriter = spanWriter

return nil
}

func (s *server) initArchiveStorage(f tracestore.Factory) (tracestore.Reader, tracestore.Writer) {
reader, err := f.CreateTraceReader()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage reader", zap.Error(err))
return nil, nil
}
writer, err := f.CreateTraceWriter()
if err != nil {
s.telset.Logger.Error("Cannot init traces archive storage writer", zap.Error(err))
return nil, nil
}
return reader, writer
}

func getV1Adapters(
reader tracestore.Reader,
writer tracestore.Writer,
) (spanstore.Reader, spanstore.Writer) {
v1Reader, ok := v1adapter.GetV1Reader(reader)
if !ok {
v1Reader = v1adapter.NewSpanReader(reader)
}

v1Writer, ok := v1adapter.GetV1Writer(writer)
if !ok {
v1Writer = v1adapter.NewSpanWriter(writer)
}

return v1Reader, v1Writer
}

func (s *server) createMetricReader(host component.Host) (metricstore.Reader, error) {
if s.config.Storage.Metrics == "" {
s.telset.Logger.Info("Metric storage not configured")
Expand Down
72 changes: 54 additions & 18 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
metricstoremocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type fakeFactory struct {
Expand Down Expand Up @@ -63,14 +66,6 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return &spanstoremocks.Reader{}, nil
}

func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
Expand Down Expand Up @@ -104,11 +99,6 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) {
if name == "need-factory-error" {
return nil, false
} else if name == "no-archive" {
f := fakeFactory{name: name}
return struct {
storage.Factory
}{f}, true
}

return fakeFactory{name: name}, true
Expand Down Expand Up @@ -193,7 +183,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "jaeger_storage",
},
},
expectedErr: "cannot find archive storage factory",
expectedErr: "cannot find traces archive storage factory",
},
{
name: "metrics storage error",
Expand Down Expand Up @@ -296,19 +286,32 @@ func TestServerAddArchiveStorage(t *testing.T) {
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
expectedErr: "cannot find traces archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
name: "Error in trace reader",
config: &Config{
Storage: Storage{
TracesArchive: "no-archive",
TracesArchive: "need-span-reader-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedOutput: "Cannot init traces archive storage reader",
expectedErr: "",
},
{
name: "Error in trace writer",
config: &Config{
Storage: Storage{
TracesArchive: "need-span-writer-error",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Cannot init traces archive storage writer",
expectedErr: "",
},
{
Expand Down Expand Up @@ -350,6 +353,39 @@ func TestServerAddArchiveStorage(t *testing.T) {
}
}

func TestGetV1Adapters(t *testing.T) {
tests := []struct {
name string
reader tracestore.Reader
writer tracestore.Writer
expectedReader spanstore.Reader
expectedWriter spanstore.Writer
}{
{
name: "native tracestore.Reader and tracestore.Writer",
reader: &tracestoremocks.Reader{},
writer: &tracestoremocks.Writer{},
expectedReader: v1adapter.NewSpanReader(&tracestoremocks.Reader{}),
expectedWriter: v1adapter.NewSpanWriter(&tracestoremocks.Writer{}),
},
{
name: "wrapped spanstore.Reader and spanstore.Writer",
reader: v1adapter.NewTraceReader(&spanstoremocks.Reader{}),
writer: v1adapter.NewTraceWriter(&spanstoremocks.Writer{}),
expectedReader: &spanstoremocks.Reader{},
expectedWriter: &spanstoremocks.Writer{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotReader, gotWriter := getV1Adapters(test.reader, test.writer)
require.Equal(t, test.expectedReader, gotReader)
require.Equal(t, test.expectedWriter, gotWriter)
})
}
}

func TestServerAddMetricsStorage(t *testing.T) {
host := componenttest.NewNopHost()

Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type TraceQueryParameters struct {

// NewQueryService returns a new QueryService.
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
spanReader, err := v1adapter.GetV1Reader(traceReader)
if err != nil {
spanReader, ok := v1adapter.GetV1Reader(traceReader)
if !ok {
// if the spanstore.Reader is not available, downgrade the native tracestore.Reader to
// a spanstore.Reader
spanReader = v1adapter.NewSpanReader(traceReader)
Expand Down
8 changes: 3 additions & 5 deletions storage_v2/v1adapter/tracereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var ErrV1ReaderNotAvailable = errors.New("spanstore.Reader is not a wrapper around v1 reader")

var _ tracestore.Reader = (*TraceReader)(nil)

type TraceReader struct {
spanReader spanstore.Reader
}

func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, error) {
func GetV1Reader(reader tracestore.Reader) (spanstore.Reader, bool) {
if tr, ok := reader.(*TraceReader); ok {
return tr.spanReader, nil
return tr.spanReader, ok
}
return nil, ErrV1ReaderNotAvailable
return nil, false
}

func NewTraceReader(spanReader spanstore.Reader) *TraceReader {
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/tracereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestGetV1Reader_NoError(t *testing.T) {
traceReader := &TraceReader{
spanReader: memstore,
}
v1Reader, err := GetV1Reader(traceReader)
require.NoError(t, err)
v1Reader, ok := GetV1Reader(traceReader)
require.True(t, ok)
require.Equal(t, memstore, v1Reader)
}

func TestGetV1Reader_Error(t *testing.T) {
fr := new(tracestoremocks.Reader)
_, err := GetV1Reader(fr)
require.ErrorIs(t, err, ErrV1ReaderNotAvailable)
_, ok := GetV1Reader(fr)
require.False(t, ok)
}

func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions storage_v2/v1adapter/tracewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var ErrV1WriterNotAvailable = errors.New("spanstore.Writer is not a wrapper around v1 writer")

type TraceWriter struct {
spanWriter spanstore.Writer
}

func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, error) {
func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, bool) {
if tr, ok := writer.(*TraceWriter); ok {
return tr.spanWriter, nil
return tr.spanWriter, ok
}
return nil, ErrV1WriterNotAvailable
return nil, false
}

func NewTraceWriter(spanWriter spanstore.Writer) *TraceWriter {
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/tracewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func TestGetV1Writer_NoError(t *testing.T) {
traceWriter := &TraceWriter{
spanWriter: memstore,
}
v1Writer, err := GetV1Writer(traceWriter)
require.NoError(t, err)
v1Writer, ok := GetV1Writer(traceWriter)
require.True(t, ok)
require.Equal(t, memstore, v1Writer)
}

func TestGetV1Writer_Error(t *testing.T) {
w := new(tracestoremocks.Writer)
_, err := GetV1Writer(w)
require.ErrorIs(t, err, ErrV1WriterNotAvailable)
_, ok := GetV1Writer(w)
require.False(t, ok)
}

func makeTraces() ptrace.Traces {
Expand Down

0 comments on commit 7f16f49

Please sign in to comment.