From d6456fb13b16d4694bc3ba32fa21d7c6d302d9b2 Mon Sep 17 00:00:00 2001 From: Zhang Xin Date: Mon, 16 Dec 2024 02:28:20 +0800 Subject: [PATCH] [feat] Add time window for GetTrace in span store interface (#6242) ## Which problem is this PR solving? Part of https://github.com/jaegertracing/jaeger/issues/4150 ## Description of the changes Refactor trace get method in span store interface: - use struct instead of literal trace id - add time window in the new struct ## How was this change tested? - unit test ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: rim99 Signed-off-by: Zhang Xin Signed-off-by: Zhang Xin Signed-off-by: Yuri Shkuro Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- cmd/anonymizer/app/query/query.go | 8 +- cmd/anonymizer/app/query/query_test.go | 8 +- .../storageexporter/exporter_test.go | 3 +- .../internal/integration/span_reader.go | 6 +- cmd/query/app/apiv3/gateway_test.go | 15 +-- cmd/query/app/apiv3/grpc_handler.go | 7 +- cmd/query/app/apiv3/grpc_handler_test.go | 78 +++++++++---- cmd/query/app/apiv3/http_gateway.go | 10 +- cmd/query/app/apiv3/http_gateway_test.go | 2 +- cmd/query/app/grpc_handler.go | 14 ++- cmd/query/app/grpc_handler_test.go | 103 +++++++++++++----- cmd/query/app/handler_archive_test.go | 16 +-- cmd/query/app/http_handler.go | 18 ++- cmd/query/app/http_handler_test.go | 32 +++--- cmd/query/app/querysvc/query_service.go | 10 +- cmd/query/app/querysvc/query_service_test.go | 40 ++++--- cmd/query/app/server_test.go | 3 +- .../badger/spanstore/read_write_test.go | 10 +- plugin/storage/badger/spanstore/reader.go | 4 +- .../badger/spanstore/rw_internal_test.go | 4 +- plugin/storage/blackhole/blackhole.go | 2 +- plugin/storage/blackhole/blackhole_test.go | 2 +- .../storage/cassandra/savetracetest/main.go | 2 +- plugin/storage/cassandra/spanstore/reader.go | 6 +- .../cassandra/spanstore/reader_test.go | 4 +- plugin/storage/es/spanstore/reader.go | 5 +- plugin/storage/es/spanstore/reader_test.go | 26 +++-- plugin/storage/grpc/shared/archive.go | 6 +- plugin/storage/grpc/shared/archive_test.go | 8 +- plugin/storage/grpc/shared/grpc_client.go | 6 +- .../storage/grpc/shared/grpc_client_test.go | 45 ++++++-- plugin/storage/grpc/shared/grpc_handler.go | 12 +- .../storage/grpc/shared/grpc_handler_test.go | 14 +-- plugin/storage/integration/integration.go | 8 +- plugin/storage/integration/kafka_test.go | 4 +- plugin/storage/memory/memory.go | 4 +- plugin/storage/memory/memory_test.go | 26 +++-- storage/spanstore/interface.go | 9 +- storage/spanstore/mocks/Reader.go | 18 +-- .../spanstoremetrics/read_metrics.go | 4 +- .../spanstoremetrics/read_metrics_test.go | 8 +- storage_v2/factoryadapter/reader.go | 9 +- storage_v2/factoryadapter/reader_test.go | 3 +- storage_v2/factoryadapter/writer_test.go | 4 +- 44 files changed, 408 insertions(+), 218 deletions(-) diff --git a/cmd/anonymizer/app/query/query.go b/cmd/anonymizer/app/query/query.go index 755d8317d19..6cca46408ba 100644 --- a/cmd/anonymizer/app/query/query.go +++ b/cmd/anonymizer/app/query/query.go @@ -54,10 +54,12 @@ func (q *Query) QueryTrace(traceID string) ([]model.Span, error) { if err != nil { return nil, fmt.Errorf("failed to convert the provided trace id: %w", err) } - - stream, err := q.client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ + // TODO: add start time & end time + request := api_v2.GetTraceRequest{ TraceID: mTraceID, - }) + } + + stream, err := q.client.GetTrace(context.Background(), &request) if err != nil { return nil, unwrapNotFoundErr(err) } diff --git a/cmd/anonymizer/app/query/query_test.go b/cmd/anonymizer/app/query/query_test.go index 768dbc3c1fb..a4a7ed241dd 100644 --- a/cmd/anonymizer/app/query/query_test.go +++ b/cmd/anonymizer/app/query/query_test.go @@ -25,8 +25,8 @@ import ( ) var ( - matchContext = mock.AnythingOfType("*context.valueCtx") - matchTraceID = mock.AnythingOfType("model.TraceID") + matchContext = mock.AnythingOfType("*context.valueCtx") + matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters") mockInvalidTraceID = "xyz" mockTraceID = model.NewTraceID(0, 123456) @@ -108,7 +108,7 @@ func TestQueryTrace(t *testing.T) { defer q.Close() t.Run("No error", func(t *testing.T) { - s.spanReader.On("GetTrace", matchContext, matchTraceID).Return( + s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return( mockTraceGRPC, nil).Once() spans, err := q.QueryTrace(mockTraceID.String()) @@ -122,7 +122,7 @@ func TestQueryTrace(t *testing.T) { }) t.Run("Trace not found", func(t *testing.T) { - s.spanReader.On("GetTrace", matchContext, matchTraceID).Return( + s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return( nil, spanstore.ErrTraceNotFound).Once() spans, err := q.QueryTrace(mockTraceID.String()) diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go index 14f9ae29926..0e684a0bb2f 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go @@ -27,6 +27,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" + "github.com/jaegertracing/jaeger/storage/spanstore" ) type mockStorageExt struct { @@ -152,7 +153,7 @@ func TestExporter(t *testing.T) { spanReader, err := storageFactory.CreateSpanReader() require.NoError(t, err) requiredTraceID := model.NewTraceID(0, 1) // 00000000000000000000000000000001 - requiredTrace, err := spanReader.GetTrace(ctx, requiredTraceID) + requiredTrace, err := spanReader.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: requiredTraceID}) require.NoError(t, err) assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String()) diff --git a/cmd/jaeger/internal/integration/span_reader.go b/cmd/jaeger/internal/integration/span_reader.go index abd98ecd6aa..3aaf0c09ece 100644 --- a/cmd/jaeger/internal/integration/span_reader.go +++ b/cmd/jaeger/internal/integration/span_reader.go @@ -66,9 +66,11 @@ func unwrapNotFoundErr(err error) error { return err } -func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (r *spanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { stream, err := r.client.GetTrace(ctx, &api_v2.GetTraceRequest{ - TraceID: traceID, + TraceID: query.TraceID, + StartTime: query.StartTime, + EndTime: query.EndTime, }) if err != nil { return nil, unwrapNotFoundErr(err) diff --git a/cmd/query/app/apiv3/gateway_test.go b/cmd/query/app/apiv3/gateway_test.go index 6c942fcfb0f..22a08dc2641 100644 --- a/cmd/query/app/apiv3/gateway_test.go +++ b/cmd/query/app/apiv3/gateway_test.go @@ -80,8 +80,9 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) { require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj)) } -func makeTestTrace() (*model.Trace, model.TraceID) { +func makeTestTrace() (*model.Trace, spanstore.GetTraceParameters) { traceID := model.NewTraceID(150, 160) + query := spanstore.GetTraceParameters{TraceID: traceID} return &model.Trace{ Spans: []*model.Span{ { @@ -94,7 +95,7 @@ func makeTestTrace() (*model.Trace, model.TraceID) { }, }, }, - }, traceID + }, query } func runGatewayTests( @@ -140,18 +141,18 @@ func (gw *testGateway) runGatewayGetOperations(t *testing.T) { } func (gw *testGateway) runGatewayGetTrace(t *testing.T) { - trace, traceID := makeTestTrace() - gw.reader.On("GetTrace", matchContext, traceID).Return(trace, nil).Once() - gw.getTracesAndVerify(t, "/api/v3/traces/"+traceID.String(), traceID) + trace, query := makeTestTrace() + gw.reader.On("GetTrace", matchContext, query).Return(trace, nil).Once() + gw.getTracesAndVerify(t, "/api/v3/traces/"+query.TraceID.String(), query.TraceID) } func (gw *testGateway) runGatewayFindTraces(t *testing.T) { - trace, traceID := makeTestTrace() + trace, query := makeTestTrace() q, qp := mockFindQueries() gw.reader. On("FindTraces", matchContext, qp). Return([]*model.Trace{trace}, nil).Once() - gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID) + gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), query.TraceID) } func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID model.TraceID) { diff --git a/cmd/query/app/apiv3/grpc_handler.go b/cmd/query/app/apiv3/grpc_handler.go index 0e684f60406..b12342214c8 100644 --- a/cmd/query/app/apiv3/grpc_handler.go +++ b/cmd/query/app/apiv3/grpc_handler.go @@ -32,7 +32,12 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS return fmt.Errorf("malform trace ID: %w", err) } - trace, err := h.QueryService.GetTrace(stream.Context(), traceID) + query := spanstore.GetTraceParameters{ + TraceID: traceID, + StartTime: request.GetStartTime(), + EndTime: request.GetEndTime(), + } + trace, err := h.QueryService.GetTrace(stream.Context(), query) if err != nil { return fmt.Errorf("cannot retrieve trace: %w", err) } diff --git a/cmd/query/app/apiv3/grpc_handler_test.go b/cmd/query/app/apiv3/grpc_handler_test.go index dbd0c48c82b..ffadfbb9ac5 100644 --- a/cmd/query/app/apiv3/grpc_handler_test.go +++ b/cmd/query/app/apiv3/grpc_handler_test.go @@ -27,8 +27,8 @@ import ( ) var ( - matchContext = mock.AnythingOfType("*context.valueCtx") - matchTraceID = mock.AnythingOfType("model.TraceID") + matchContext = mock.AnythingOfType("*context.valueCtx") + matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters") ) func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) { @@ -79,33 +79,63 @@ func newTestServerClient(t *testing.T) *testServerClient { } func TestGetTrace(t *testing.T) { - tsc := newTestServerClient(t) - tsc.reader.On("GetTrace", matchContext, matchTraceID).Return( - &model.Trace{ - Spans: []*model.Span{ - { - OperationName: "foobar", - }, + traceId, _ := model.TraceIDFromString("156") + testCases := []struct { + name string + expectedQuery spanstore.GetTraceParameters + request api_v3.GetTraceRequest + }{ + { + "TestGetTrace", + spanstore.GetTraceParameters{ + TraceID: traceId, + StartTime: time.Time{}, + EndTime: time.Time{}, }, - }, nil).Once() - - getTraceStream, err := tsc.client.GetTrace(context.Background(), - &api_v3.GetTraceRequest{ - TraceId: "156", + api_v3.GetTraceRequest{TraceId: "156"}, }, - ) - require.NoError(t, err) - recv, err := getTraceStream.Recv() - require.NoError(t, err) - td := recv.ToTraces() - require.EqualValues(t, 1, td.SpanCount()) - assert.Equal(t, "foobar", - td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) + { + "TestGetTraceWithTimeWindow", + spanstore.GetTraceParameters{ + TraceID: traceId, + StartTime: time.Unix(1, 2).UTC(), + EndTime: time.Unix(3, 4).UTC(), + }, + api_v3.GetTraceRequest{ + TraceId: "156", + StartTime: time.Unix(1, 2).UTC(), + EndTime: time.Unix(3, 4).UTC(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tsc := newTestServerClient(t) + tsc.reader.On("GetTrace", matchContext, tc.expectedQuery).Return( + &model.Trace{ + Spans: []*model.Span{ + { + OperationName: "foobar", + }, + }, + }, nil).Once() + + getTraceStream, err := tsc.client.GetTrace(context.Background(), &tc.request) + require.NoError(t, err) + recv, err := getTraceStream.Recv() + require.NoError(t, err) + td := recv.ToTraces() + require.EqualValues(t, 1, td.SpanCount()) + assert.Equal(t, "foobar", + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) + }) + } } func TestGetTraceStorageError(t *testing.T) { tsc := newTestServerClient(t) - tsc.reader.On("GetTrace", matchContext, matchTraceID).Return( + tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return( nil, errors.New("storage_error")).Once() getTraceStream, err := tsc.client.GetTrace(context.Background(), &api_v3.GetTraceRequest{ @@ -119,7 +149,7 @@ func TestGetTraceStorageError(t *testing.T) { func TestGetTraceTraceIDError(t *testing.T) { tsc := newTestServerClient(t) - tsc.reader.On("GetTrace", matchContext, matchTraceID).Return( + tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return( &model.Trace{ Spans: []*model.Span{}, }, nil).Once() diff --git a/cmd/query/app/apiv3/http_gateway.go b/cmd/query/app/apiv3/http_gateway.go index 24f0e6c4396..e4e8874d748 100644 --- a/cmd/query/app/apiv3/http_gateway.go +++ b/cmd/query/app/apiv3/http_gateway.go @@ -27,7 +27,9 @@ import ( ) const ( - paramTraceID = "trace_id" // get trace by ID + paramTraceID = "trace_id" // get trace by ID + paramStartTime = "start_time" + paramEndTime = "end_time" paramServiceName = "query.service_name" // find traces paramOperationName = "query.operation_name" paramTimeMin = "query.start_time_min" @@ -133,7 +135,11 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) { if h.tryParamError(w, err, paramTraceID) { return } - trc, err := h.QueryService.GetTrace(r.Context(), traceID) + // TODO: add start time & end time + request := spanstore.GetTraceParameters{ + TraceID: traceID, + } + trc, err := h.QueryService.GetTrace(r.Context(), request) if h.tryHandleError(w, err, http.StatusInternalServerError) { return } diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index c395715b662..4dcdbbafa9d 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -103,7 +103,7 @@ func TestHTTPGatewayGetTraceErrors(t *testing.T) { // error from span reader const simErr = "simulated error" gw.reader. - On("GetTrace", matchContext, matchTraceID). + On("GetTrace", matchContext, matchGetTraceParameters). Return(nil, errors.New(simErr)).Once() r, err = http.NewRequest(http.MethodGet, "/api/v3/traces/123", nil) diff --git a/cmd/query/app/grpc_handler.go b/cmd/query/app/grpc_handler.go index 7cbeab083cb..0c562a4d5dc 100644 --- a/cmd/query/app/grpc_handler.go +++ b/cmd/query/app/grpc_handler.go @@ -89,7 +89,12 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer if r.TraceID == (model.TraceID{}) { return errUninitializedTraceID } - trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID) + query := spanstore.GetTraceParameters{ + TraceID: r.TraceID, + StartTime: r.StartTime, + EndTime: r.EndTime, + } + trace, err := g.queryService.GetTrace(stream.Context(), query) if errors.Is(err, spanstore.ErrTraceNotFound) { g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err)) return status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err) @@ -109,7 +114,12 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe if r.TraceID == (model.TraceID{}) { return nil, errUninitializedTraceID } - err := g.queryService.ArchiveTrace(ctx, r.TraceID) + query := spanstore.GetTraceParameters{ + TraceID: r.TraceID, + StartTime: r.StartTime, + EndTime: r.EndTime, + } + err := g.queryService.ArchiveTrace(ctx, query) if errors.Is(err, spanstore.ErrTraceNotFound) { g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err)) return nil, status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index a5c59638031..b3c8b93e5ba 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -198,19 +198,41 @@ func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, clien } func TestGetTraceSuccessGRPC(t *testing.T) { - withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). - Return(mockTrace, nil).Once() + inputs := []struct { + expectedQuery spanstore.GetTraceParameters + request api_v2.GetTraceRequest + }{ + { + spanstore.GetTraceParameters{TraceID: mockTraceID}, + api_v2.GetTraceRequest{TraceID: mockTraceID}, + }, + { + spanstore.GetTraceParameters{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }, + api_v2.GetTraceRequest{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }, + }, + } - res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ - TraceID: mockTraceID, - }) + for _, input := range inputs { + withServerAndClient(t, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), input.expectedQuery). + Return(mockTrace, nil).Once() - spanResChunk, _ := res.Recv() + res, err := client.GetTrace(context.Background(), &input.request) - require.NoError(t, err) - assert.Equal(t, spanResChunk.Spans[0].TraceID, mockTraceID) - }) + spanResChunk, _ := res.Recv() + + require.NoError(t, err) + assert.Equal(t, spanResChunk.Spans[0].TraceID, mockTraceID) + }) + } } func assertGRPCError(t *testing.T, err error, code codes.Code, msg string) { @@ -222,7 +244,7 @@ func assertGRPCError(t *testing.T, err error, code codes.Code, msg string) { func TestGetTraceEmptyTraceIDFailure_GRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ @@ -239,7 +261,7 @@ func TestGetTraceEmptyTraceIDFailure_GRPC(t *testing.T) { func TestGetTraceDBFailureGRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, errStorageGRPC).Once() res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ @@ -255,10 +277,10 @@ func TestGetTraceDBFailureGRPC(t *testing.T) { func TestGetTraceNotFoundGRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() - server.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ @@ -279,25 +301,46 @@ func TestGetTraceNilRequestOnHandlerGRPC(t *testing.T) { } func TestArchiveTraceSuccessGRPC(t *testing.T) { - withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). - Return(mockTrace, nil).Once() - server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). - Return(nil).Times(2) + inputs := []struct { + expectedQuery spanstore.GetTraceParameters + request api_v2.ArchiveTraceRequest + }{ + { + spanstore.GetTraceParameters{TraceID: mockTraceID}, + api_v2.ArchiveTraceRequest{TraceID: mockTraceID}, + }, + { + spanstore.GetTraceParameters{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }, + api_v2.ArchiveTraceRequest{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }, + }, + } + for _, input := range inputs { + withServerAndClient(t, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), input.expectedQuery). + Return(mockTrace, nil).Once() + server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). + Return(nil).Times(2) - _, err := client.ArchiveTrace(context.Background(), &api_v2.ArchiveTraceRequest{ - TraceID: mockTraceID, - }) + _, err := client.ArchiveTrace(context.Background(), &input.request) - require.NoError(t, err) - }) + require.NoError(t, err) + }) + } } func TestArchiveTraceNotFoundGRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() - server.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() _, err := client.ArchiveTrace(context.Background(), &api_v2.ArchiveTraceRequest{ @@ -326,7 +369,7 @@ func TestArchiveTraceNilRequestOnHandlerGRPC(t *testing.T) { func TestArchiveTraceFailureGRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(errStorageGRPC).Times(2) @@ -961,7 +1004,7 @@ func TestSearchTenancyGRPC(t *testing.T) { Enabled: true, }) withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() // First try without tenancy header @@ -1018,7 +1061,7 @@ func TestSearchTenancyGRPCExplicitList(t *testing.T) { Tenants: []string{"mercury", "venus", "mars"}, }) withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { - server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() for _, tc := range []struct { @@ -1130,7 +1173,7 @@ func TestTenancyContextFlowGRPC(t *testing.T) { return false } return true - }), mock.AnythingOfType("model.TraceID")).Return(trace, err).Once() + }), mock.AnythingOfType("spanstore.GetTraceParameters")).Return(trace, err).Once() } for tenant, expected := range allExpectedResults { diff --git a/cmd/query/app/handler_archive_test.go b/cmd/query/app/handler_archive_test.go index 78a0afba11f..87ae3e304b2 100644 --- a/cmd/query/app/handler_archive_test.go +++ b/cmd/query/app/handler_archive_test.go @@ -20,7 +20,7 @@ import ( func TestGetArchivedTrace_NotFound(t *testing.T) { mockReader := &spanstoremocks.Reader{} - mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() for _, tc := range []struct { name string @@ -37,7 +37,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { withTestServer(t, func(ts *testServer) { - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse err := getJSON(ts.server.URL+"/api/traces/"+mockTraceID.String(), &response) @@ -52,11 +52,11 @@ func TestGetArchivedTrace_NotFound(t *testing.T) { func TestGetArchivedTraceSuccess(t *testing.T) { traceID := model.NewTraceID(0, 123456) mockReader := &spanstoremocks.Reader{} - mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() withTestServer(t, func(ts *testServer) { // make main reader return NotFound - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredTraceResponse err := getJSON(ts.server.URL+"/api/traces/"+mockTraceID.String(), &response) @@ -79,13 +79,13 @@ func TestArchiveTrace_BadTraceID(t *testing.T) { // Test return of 404 when trace is not found in APIHandler.archive func TestArchiveTrace_TraceNotFound(t *testing.T) { mockReader := &spanstoremocks.Reader{} - mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() mockWriter := &spanstoremocks.Writer{} // Not actually going to write the trace, so no need to define mockWriter action withTestServer(t, func(ts *testServer) { // make main reader return NotFound - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response) @@ -106,7 +106,7 @@ func TestArchiveTrace_Success(t *testing.T) { mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(nil).Times(2) withTestServer(t, func(ts *testServer) { - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response) @@ -119,7 +119,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) { mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(errors.New("cannot save")).Times(2) withTestServer(t, func(ts *testServer) { - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response) diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index bed670240a9..157fc398626 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -266,7 +266,11 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID) var traceErrors []structuredError retMe := make([]*model.Trace, 0, len(traceIDs)) for _, traceID := range traceIDs { - if trc, err := aH.queryService.GetTrace(ctx, traceID); err != nil { + // TODO: add start time & end time + query := spanstore.GetTraceParameters{ + TraceID: traceID, + } + if trc, err := aH.queryService.GetTrace(ctx, query); err != nil { if !errors.Is(err, spanstore.ErrTraceNotFound) { return nil, nil, err } @@ -429,7 +433,11 @@ func (aH *APIHandler) getTrace(w http.ResponseWriter, r *http.Request) { if !ok { return } - trc, err := aH.queryService.GetTrace(r.Context(), traceID) + // TODO: add start time & end time + query := spanstore.GetTraceParameters{ + TraceID: traceID, + } + trc, err := aH.queryService.GetTrace(r.Context(), query) if errors.Is(err, spanstore.ErrTraceNotFound) { aH.handleError(w, err, http.StatusNotFound) return @@ -458,7 +466,11 @@ func (aH *APIHandler) archiveTrace(w http.ResponseWriter, r *http.Request) { } // QueryService.ArchiveTrace can now archive this traceID. - err := aH.queryService.ArchiveTrace(r.Context(), traceID) + // TODO: add start time & end time + query := spanstore.GetTraceParameters{ + TraceID: traceID, + } + err := aH.queryService.ArchiveTrace(r.Context(), query) if errors.Is(err, spanstore.ErrTraceNotFound) { aH.handleError(w, err, http.StatusNotFound) return diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index 511495193c3..6c09caa155f 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -66,6 +66,8 @@ var ( } mockTraceID = model.NewTraceID(0, 123456) + startTime = time.Date(2020, time.January, 1, 13, 0, 0, 0, time.UTC) + endTime = time.Date(2020, time.January, 1, 14, 0, 0, 0, time.UTC) mockTrace = &model.Trace{ Spans: []*model.Span{ { @@ -155,7 +157,7 @@ func withTestServer(t *testing.T, doTest func(s *testServer), queryOptions query func TestGetTraceSuccess(t *testing.T) { ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() var response structuredResponse @@ -182,7 +184,7 @@ func TestGetTraceDedupeSuccess(t *testing.T) { } ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(dupedMockTrace, nil).Once() var response structuredResponse @@ -324,7 +326,7 @@ func TestGetTrace(t *testing.T) { ts := initializeTestServer(t, HandlerOptions.Tracer(jTracer.OTEL)) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 0x123456abc)}). Return(makeMockTrace(t), nil).Once() var response structuredResponse @@ -341,7 +343,7 @@ func TestGetTrace(t *testing.T) { func TestGetTraceDBFailure(t *testing.T) { ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, errStorage).Once() var response structuredResponse @@ -351,7 +353,7 @@ func TestGetTraceDBFailure(t *testing.T) { func TestGetTraceNotFound(t *testing.T) { ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse @@ -368,7 +370,7 @@ func TestGetTraceAdjustmentFailure(t *testing.T) { }), }, ) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() var response structuredResponse @@ -399,7 +401,7 @@ func TestSearchSuccess(t *testing.T) { func TestSearchByTraceIDSuccess(t *testing.T) { ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Twice() var response structuredResponse @@ -414,9 +416,9 @@ func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { ts := initializeTestServerWithOptions(t, &tenancy.Manager{}, querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveReadMock, }) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Twice() - archiveReadMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + archiveReadMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Twice() var response structuredResponse @@ -428,7 +430,7 @@ func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { func TestSearchByTraceIDNotFound(t *testing.T) { ts := initializeTestServer(t) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse @@ -441,7 +443,7 @@ func TestSearchByTraceIDNotFound(t *testing.T) { func TestSearchByTraceIDFailure(t *testing.T) { ts := initializeTestServer(t) whatsamattayou := "whatsamattayou" - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, errors.New(whatsamattayou)).Once() var response structuredResponse @@ -915,7 +917,7 @@ func TestSearchTenancyHTTP(t *testing.T) { ts := initializeTestServerWithOptions(t, tenancy.NewManager(&tenancyOptions), querysvc.QueryServiceOptions{}) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Twice() var response structuredResponse @@ -939,7 +941,7 @@ func TestSearchTenancyRejectionHTTP(t *testing.T) { Enabled: true, } ts := initializeTestServerWithOptions(t, tenancy.NewManager(&tenancyOptions), querysvc.QueryServiceOptions{}) - ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Twice() req, err := http.NewRequest(http.MethodGet, ts.server.URL+`/api/traces?traceID=1&traceID=2`, nil) @@ -971,14 +973,14 @@ func TestSearchTenancyFlowTenantHTTP(t *testing.T) { return false } return true - }), mock.AnythingOfType("model.TraceID")).Return(mockTrace, nil).Twice() + }), mock.AnythingOfType("spanstore.GetTraceParameters")).Return(mockTrace, nil).Twice() ts.spanReader.On("GetTrace", mock.MatchedBy(func(v any) bool { ctx, ok := v.(context.Context) if !ok || tenancy.GetTenant(ctx) != "megacorp" { return false } return true - }), mock.AnythingOfType("model.TraceID")).Return(nil, errStorage).Once() + }), mock.AnythingOfType("spanstore.GetTraceParameters")).Return(nil, errStorage).Once() var responseAcme structuredResponse err := getJSONCustomHeaders( diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 1a34cd1e408..02abd07074a 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -62,17 +62,17 @@ func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Re } // GetTrace is the queryService implementation of spanstore.Reader.GetTrace -func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (qs QueryService) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) if err != nil { return nil, err } - trace, err := spanReader.GetTrace(ctx, traceID) + trace, err := spanReader.GetTrace(ctx, query) if errors.Is(err, spanstore.ErrTraceNotFound) { if qs.options.ArchiveSpanReader == nil { return nil, err } - trace, err = qs.options.ArchiveSpanReader.GetTrace(ctx, traceID) + trace, err = qs.options.ArchiveSpanReader.GetTrace(ctx, query) } return trace, err } @@ -108,11 +108,11 @@ func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQue } // ArchiveTrace is the queryService utility to archive traces. -func (qs QueryService) ArchiveTrace(ctx context.Context, traceID model.TraceID) error { +func (qs QueryService) ArchiveTrace(ctx context.Context, query spanstore.GetTraceParameters) error { if qs.options.ArchiveSpanWriter == nil { return errNoArchiveSpanStorage } - trace, err := qs.GetTrace(ctx, traceID) + trace, err := qs.GetTrace(ctx, query) if err != nil { return err } diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 96e07047cf7..10a3b21e5af 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -111,12 +111,13 @@ func initializeTestService(optionAppliers ...testOption) *testQueryService { // Test QueryService.GetTrace() func TestGetTraceSuccess(t *testing.T) { tqs := initializeTestService() - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() type contextKey string ctx := context.Background() - res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) require.NoError(t, err) assert.Equal(t, res, mockTrace) } @@ -124,12 +125,13 @@ func TestGetTraceSuccess(t *testing.T) { // Test QueryService.GetTrace() without ArchiveSpanReader func TestGetTraceNotFound(t *testing.T) { tqs := initializeTestService() - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() type contextKey string ctx := context.Background() - _, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + _, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) assert.Equal(t, err, spanstore.ErrTraceNotFound) } @@ -138,21 +140,23 @@ func TestGetTrace_V1ReaderNotFound(t *testing.T) { qs := QueryService{ traceReader: fr, } - _, err := qs.GetTrace(context.Background(), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + _, err := qs.GetTrace(context.Background(), query) require.Error(t, err) } // Test QueryService.GetTrace() with ArchiveSpanReader func TestGetTraceFromArchiveStorage(t *testing.T) { tqs := initializeTestService(withArchiveSpanReader()) - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() - tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() type contextKey string ctx := context.Background() - res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) require.NoError(t, err) assert.Equal(t, res, mockTrace) } @@ -252,35 +256,38 @@ func TestArchiveTraceNoOptions(t *testing.T) { type contextKey string ctx := context.Background() - err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) assert.Equal(t, errNoArchiveSpanStorage, err) } // Test QueryService.ArchiveTrace() with ArchiveSpanWriter but invalid traceID. func TestArchiveTraceWithInvalidTraceID(t *testing.T) { tqs := initializeTestService(withArchiveSpanReader(), withArchiveSpanWriter()) - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() - tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(nil, spanstore.ErrTraceNotFound).Once() type contextKey string ctx := context.Background() - err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) assert.Equal(t, spanstore.ErrTraceNotFound, err) } // Test QueryService.ArchiveTrace(), save error with ArchiveSpanWriter. func TestArchiveTraceWithArchiveWriterError(t *testing.T) { tqs := initializeTestService(withArchiveSpanWriter()) - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() tqs.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(errors.New("cannot save")).Times(2) type contextKey string ctx := context.Background() - joinErr := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + joinErr := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) // There are two spans in the mockTrace, ArchiveTrace should return a wrapped error. require.EqualError(t, joinErr, "cannot save\ncannot save") } @@ -288,14 +295,15 @@ func TestArchiveTraceWithArchiveWriterError(t *testing.T) { // Test QueryService.ArchiveTrace() with correctly configured ArchiveSpanWriter. func TestArchiveTraceSuccess(t *testing.T) { tqs := initializeTestService(withArchiveSpanWriter()) - tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")). Return(mockTrace, nil).Once() tqs.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(nil).Times(2) type contextKey string ctx := context.Background() - err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID) + query := spanstore.GetTraceParameters{TraceID: mockTraceID} + err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) require.NoError(t, err) } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index fc2c0d298f0..f12723439a6 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -39,6 +39,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" @@ -895,7 +896,7 @@ func TestServerHTTP_TracesRequest(t *testing.T) { tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy) querySvc := makeQuerySvc() - querySvc.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). + querySvc.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 0x123456abc)}). Return(makeMockTrace(t), nil).Once() telset := initTelSet(zaptest.NewLogger(t), &tracer, healthcheck.New()) diff --git a/plugin/storage/badger/spanstore/read_write_test.go b/plugin/storage/badger/spanstore/read_write_test.go index 9c4d8f22be7..b324a9ffccc 100644 --- a/plugin/storage/badger/spanstore/read_write_test.go +++ b/plugin/storage/badger/spanstore/read_write_test.go @@ -68,10 +68,10 @@ func TestWriteReadBack(t *testing.T) { } for i := 0; i < traces; i++ { - tr, err := sr.GetTrace(context.Background(), model.TraceID{ + tr, err := sr.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.TraceID{ Low: uint64(i), High: 1, - }) + }}) require.NoError(t, err) assert.Len(t, tr.Spans, spans) @@ -291,7 +291,7 @@ func TestFindNothing(t *testing.T) { require.NoError(t, err) assert.Empty(t, trs) - tr, err := sr.GetTrace(context.Background(), model.TraceID{High: 0, Low: 0}) + tr, err := sr.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.TraceID{Low: 0, High: 0}}) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, tr) }) @@ -418,10 +418,10 @@ func TestPersist(t *testing.T) { }) p(t, dir, func(t *testing.T, _ spanstore.Writer, sr spanstore.Reader) { - trace, err := sr.GetTrace(context.Background(), model.TraceID{ + trace, err := sr.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.TraceID{ Low: uint64(1), High: 1, - }) + }}) require.NoError(t, err) assert.Equal(t, "operation-p", trace.Spans[0].OperationName) diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 84c5212d775..810f774a2c2 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -146,8 +146,8 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (r *TraceReader) GetTrace(_ context.Context, traceID model.TraceID) (*model.Trace, error) { - traces, err := r.getTraces([]model.TraceID{traceID}) +func (r *TraceReader) GetTrace(_ context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { + traces, err := r.getTraces([]model.TraceID{query.TraceID}) if err != nil { return nil, err } diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index 60828a8b80a..01a78a820cd 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -31,7 +31,7 @@ func TestEncodingTypes(t *testing.T) { err := sw.WriteSpan(context.Background(), &testSpan) require.NoError(t, err) - tr, err := rw.GetTrace(context.Background(), model.TraceID{Low: 0, High: 1}) + tr, err := rw.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.TraceID{Low: 0, High: 1}}) require.NoError(t, err) assert.Len(t, tr.Spans, 1) }) @@ -74,7 +74,7 @@ func TestEncodingTypes(t *testing.T) { return nil }) - _, err = rw.GetTrace(context.Background(), model.TraceID{Low: 0, High: 1}) + _, err = rw.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.TraceID{Low: 0, High: 1}}) require.EqualError(t, err, "unknown encoding type: 0x04") }) } diff --git a/plugin/storage/blackhole/blackhole.go b/plugin/storage/blackhole/blackhole.go index 5ffa51bb3af..e75d2b1be17 100644 --- a/plugin/storage/blackhole/blackhole.go +++ b/plugin/storage/blackhole/blackhole.go @@ -36,7 +36,7 @@ func (*Store) WriteSpan(context.Context, *model.Span) error { } // GetTrace gets nothing. -func (*Store) GetTrace(context.Context, model.TraceID) (*model.Trace, error) { +func (*Store) GetTrace(context.Context, spanstore.GetTraceParameters) (*model.Trace, error) { return nil, spanstore.ErrTraceNotFound } diff --git a/plugin/storage/blackhole/blackhole_test.go b/plugin/storage/blackhole/blackhole_test.go index 236ce82406d..3c71fa9896d 100644 --- a/plugin/storage/blackhole/blackhole_test.go +++ b/plugin/storage/blackhole/blackhole_test.go @@ -37,7 +37,7 @@ func TestStoreWriteSpan(t *testing.T) { func TestStoreGetTrace(t *testing.T) { withBlackhole(func(store *Store) { - trace, err := store.GetTrace(context.Background(), model.NewTraceID(1, 2)) + trace, err := store.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: model.NewTraceID(1, 2)}) require.Error(t, err) assert.Nil(t, trace) }) diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index d6c2b1f3fa1..a208c44dc59 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -60,7 +60,7 @@ func main() { logger.Info("Saved span", zap.String("spanID", getSomeSpan().SpanID.String())) } s := getSomeSpan() - trace, err := spanReader.GetTrace(ctx, s.TraceID) + trace, err := spanReader.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: s.TraceID}) if err != nil { logger.Fatal("Failed to read", zap.Error(err)) } else { diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index 88ba7b8f335..e79d8224ae0 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -204,8 +204,8 @@ func (s *SpanReader) readTraceInSpan(_ context.Context, traceID dbmodel.TraceID) } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID)) +func (s *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { + return s.readTrace(ctx, dbmodel.TraceIDFromDomain(query.TraceID)) } func validateQuery(p *spanstore.TraceQueryParameters) error { @@ -238,7 +238,7 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace } var retMe []*model.Trace for _, traceID := range uniqueTraceIDs { - jTrace, err := s.GetTrace(ctx, traceID) + jTrace, err := s.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: traceID}) if err != nil { s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err)) continue diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 7dbef606ccb..afbcf48f0fb 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -171,7 +171,7 @@ func TestSpanReaderGetTrace(t *testing.T) { r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + trace, err := r.reader.GetTrace(context.Background(), spanstore.GetTraceParameters{}) if testCase.expectedErr == "" { require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) @@ -197,7 +197,7 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) { r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + trace, err := r.reader.GetTrace(context.Background(), spanstore.GetTraceParameters{}) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") assert.Nil(t, trace) require.EqualError(t, err, "trace not found") diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 622e3bcd01b..47bc32abf75 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -231,11 +231,12 @@ func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, en } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (s *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { ctx, span := s.tracer.Start(ctx, "GetTrace") defer span.End() currentTime := time.Now() - traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) + // TODO: use start time & end time in "query" struct + traces, err := s.multiRead(ctx, []model.TraceID{query.TraceID}, currentTime.Add(-s.maxSpanAge), currentTime) if err != nil { return nil, es.DetailedError(err) } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index f40041c2abd..d597dd35fb5 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -327,8 +327,8 @@ func TestSpanReader_GetTrace(t *testing.T) { {Hits: searchHits}, }, }, nil) - - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -443,7 +443,8 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, }, nil).Times(2) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -463,7 +464,8 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -483,7 +485,8 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -507,7 +510,8 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "invalid span") require.Nil(t, trace) @@ -532,7 +536,8 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + query := spanstore.GetTraceParameters{TraceID: model.NewTraceID(0, 1)} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) @@ -1269,8 +1274,8 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + query := spanstore.GetTraceParameters{} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) require.EqualError(t, err, "trace not found") @@ -1286,7 +1291,8 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + query := spanstore.GetTraceParameters{} + trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) require.EqualError(t, err, "trace not found") diff --git a/plugin/storage/grpc/shared/archive.go b/plugin/storage/grpc/shared/archive.go index 2bd14685d27..8eeb3b609b2 100644 --- a/plugin/storage/grpc/shared/archive.go +++ b/plugin/storage/grpc/shared/archive.go @@ -32,9 +32,11 @@ type archiveWriter struct { } // GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage -func (r *archiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (r *archiveReader) GetTrace(ctx context.Context, q spanstore.GetTraceParameters) (*model.Trace, error) { stream, err := r.client.GetArchiveTrace(ctx, &storage_v1.GetTraceRequest{ - TraceID: traceID, + TraceID: q.TraceID, + StartTime: q.StartTime, + EndTime: q.EndTime, }) if status.Code(err) == codes.NotFound { return nil, spanstore.ErrTraceNotFound diff --git a/plugin/storage/grpc/shared/archive_test.go b/plugin/storage/grpc/shared/archive_test.go index 75857f9e3cc..3f5128ff590 100644 --- a/plugin/storage/grpc/shared/archive_test.go +++ b/plugin/storage/grpc/shared/archive_test.go @@ -59,7 +59,9 @@ func TestArchiveReader_GetTrace(t *testing.T) { }).Return(traceClient, nil) reader := &archiveReader{client: archiveSpanReader} - trace, err := reader.GetTrace(context.Background(), mockTraceID) + trace, err := reader.GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + }) require.NoError(t, err) assert.Equal(t, expected, trace) } @@ -73,7 +75,9 @@ func TestArchiveReaderGetTrace_NoTrace(t *testing.T) { }).Return(nil, status.Errorf(codes.NotFound, "")) reader := &archiveReader{client: archiveSpanReader} - _, err := reader.GetTrace(context.Background(), mockTraceID) + _, err := reader.GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + }) assert.Equal(t, spanstore.ErrTraceNotFound, err) } diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index abe296b70e3..f6323be4568 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -81,9 +81,11 @@ func (c *GRPCClient) ArchiveSpanWriter() spanstore.Writer { } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (c *GRPCClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (c *GRPCClient) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { stream, err := c.readerClient.GetTrace(ctx, &storage_v1.GetTraceRequest{ - TraceID: traceID, + TraceID: query.TraceID, + StartTime: query.StartTime, + EndTime: query.EndTime, }) if status.Code(err) == codes.NotFound { return nil, spanstore.ErrTraceNotFound diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index a8bb88fcf34..85814a3163a 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -157,13 +157,17 @@ func TestGRPCClientGetOperationsV2(t *testing.T) { func TestGRPCClientGetTrace(t *testing.T) { withGRPCClient(func(r *grpcClientTest) { + startTime := time.Date(2020, time.January, 1, 13, 0, 0, 0, time.UTC) + endTime := time.Date(2020, time.January, 1, 14, 0, 0, 0, time.UTC) traceClient := new(grpcMocks.SpanReaderPlugin_GetTraceClient) traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{ Spans: mockTraceSpans, }, nil).Once() traceClient.On("Recv").Return(nil, io.EOF) r.spanReader.On("GetTrace", mock.Anything, &storage_v1.GetTraceRequest{ - TraceID: mockTraceID, + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, }).Return(traceClient, nil) var expectedSpans []*model.Span @@ -171,7 +175,11 @@ func TestGRPCClientGetTrace(t *testing.T) { expectedSpans = append(expectedSpans, &mockTraceSpans[i]) } - s, err := r.client.GetTrace(context.Background(), mockTraceID) + s, err := r.client.GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }) require.NoError(t, err) assert.Equal(t, &model.Trace{ Spans: expectedSpans, @@ -187,7 +195,7 @@ func TestGRPCClientGetTrace_StreamError(t *testing.T) { TraceID: mockTraceID, }).Return(traceClient, nil) - s, err := r.client.GetTrace(context.Background(), mockTraceID) + s, err := r.client.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: mockTraceID}) require.Error(t, err) assert.Nil(t, s) }) @@ -199,7 +207,7 @@ func TestGRPCClientGetTrace_NoTrace(t *testing.T) { TraceID: mockTraceID, }).Return(nil, status.Errorf(codes.NotFound, "")) - s, err := r.client.GetTrace(context.Background(), mockTraceID) + s, err := r.client.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: mockTraceID}) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, s) }) @@ -215,7 +223,7 @@ func TestGRPCClientGetTrace_StreamErrorTraceNotFound(t *testing.T) { TraceID: mockTraceID, }).Return(traceClient, nil) - s, err := r.client.GetTrace(context.Background(), mockTraceID) + s, err := r.client.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: mockTraceID}) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, s) }) @@ -364,13 +372,17 @@ func TestGrpcClientStreamWriterWriteSpan(t *testing.T) { func TestGrpcClientGetArchiveTrace(t *testing.T) { withGRPCClient(func(r *grpcClientTest) { + startTime := time.Date(2020, time.January, 1, 13, 0, 0, 0, time.UTC) + endTime := time.Date(2020, time.January, 1, 14, 0, 0, 0, time.UTC) traceClient := new(grpcMocks.ArchiveSpanReaderPlugin_GetArchiveTraceClient) traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{ Spans: mockTraceSpans, }, nil).Once() traceClient.On("Recv").Return(nil, io.EOF) r.archiveReader.On("GetArchiveTrace", mock.Anything, &storage_v1.GetTraceRequest{ - TraceID: mockTraceID, + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, }).Return(traceClient, nil) var expectedSpans []*model.Span @@ -378,7 +390,11 @@ func TestGrpcClientGetArchiveTrace(t *testing.T) { expectedSpans = append(expectedSpans, &mockTraceSpans[i]) } - s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID) + s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + StartTime: startTime, + EndTime: endTime, + }) require.NoError(t, err) assert.Equal(t, &model.Trace{ Spans: expectedSpans, @@ -394,7 +410,9 @@ func TestGrpcClientGetArchiveTrace_StreamError(t *testing.T) { TraceID: mockTraceID, }).Return(traceClient, nil) - s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID) + s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + }) require.Error(t, err) assert.Nil(t, s) }) @@ -406,7 +424,12 @@ func TestGrpcClientGetArchiveTrace_NoTrace(t *testing.T) { TraceID: mockTraceID, }).Return(nil, spanstore.ErrTraceNotFound) - s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID) + s, err := r.client.ArchiveSpanReader().GetTrace( + context.Background(), + spanstore.GetTraceParameters{ + TraceID: mockTraceID, + }, + ) require.Error(t, err) assert.Nil(t, s) }) @@ -420,7 +443,9 @@ func TestGrpcClientGetArchiveTrace_StreamErrorTraceNotFound(t *testing.T) { TraceID: mockTraceID, }).Return(traceClient, nil) - s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID) + s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), spanstore.GetTraceParameters{ + TraceID: mockTraceID, + }) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, s) }) diff --git a/plugin/storage/grpc/shared/grpc_handler.go b/plugin/storage/grpc/shared/grpc_handler.go index 564a625ebf0..28dc0adf8cb 100644 --- a/plugin/storage/grpc/shared/grpc_handler.go +++ b/plugin/storage/grpc/shared/grpc_handler.go @@ -151,7 +151,11 @@ func (s *GRPCHandler) Close(context.Context, *storage_v1.CloseWriterRequest) (*s // GetTrace takes a traceID and streams a Trace associated with that traceID func (s *GRPCHandler) GetTrace(r *storage_v1.GetTraceRequest, stream storage_v1.SpanReaderPlugin_GetTraceServer) error { - trace, err := s.impl.SpanReader().GetTrace(stream.Context(), r.TraceID) + trace, err := s.impl.SpanReader().GetTrace(stream.Context(), spanstore.GetTraceParameters{ + TraceID: r.TraceID, + StartTime: r.StartTime, + EndTime: r.EndTime, + }) if errors.Is(err, spanstore.ErrTraceNotFound) { return status.Error(codes.NotFound, spanstore.ErrTraceNotFound.Error()) } @@ -276,7 +280,11 @@ func (s *GRPCHandler) GetArchiveTrace(r *storage_v1.GetTraceRequest, stream stor if reader == nil { return status.Error(codes.Unimplemented, "not implemented") } - trace, err := reader.GetTrace(stream.Context(), r.TraceID) + trace, err := reader.GetTrace(stream.Context(), spanstore.GetTraceParameters{ + TraceID: r.TraceID, + StartTime: r.StartTime, + EndTime: r.EndTime, + }) if errors.Is(err, spanstore.ErrTraceNotFound) { return status.Error(codes.NotFound, spanstore.ErrTraceNotFound.Error()) } diff --git a/plugin/storage/grpc/shared/grpc_handler_test.go b/plugin/storage/grpc/shared/grpc_handler_test.go index 6fe228387c7..726ea30c6b4 100644 --- a/plugin/storage/grpc/shared/grpc_handler_test.go +++ b/plugin/storage/grpc/shared/grpc_handler_test.go @@ -136,7 +136,7 @@ func TestGRPCServerGetTrace(t *testing.T) { for i := range mockTraceSpans { traceSpans = append(traceSpans, &mockTraceSpans[i]) } - r.impl.spanReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(&model.Trace{Spans: traceSpans}, nil) err := r.server.GetTrace(&storage_v1.GetTraceRequest{ @@ -151,7 +151,7 @@ func TestGRPCServerGetTrace_NotFound(t *testing.T) { traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer) traceSteam.On("Context").Return(context.Background()) - r.impl.spanReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(nil, spanstore.ErrTraceNotFound) err := r.server.GetTrace(&storage_v1.GetTraceRequest{ @@ -284,7 +284,7 @@ func TestGRPCServerGetArchiveTrace(t *testing.T) { for i := range mockTraceSpans { traceSpans = append(traceSpans, &mockTraceSpans[i]) } - r.impl.archiveReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.archiveReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(&model.Trace{Spans: traceSpans}, nil) err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{ @@ -299,7 +299,7 @@ func TestGRPCServerGetArchiveTrace_NotFound(t *testing.T) { traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer) traceSteam.On("Context").Return(context.Background()) - r.impl.archiveReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.archiveReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(nil, spanstore.ErrTraceNotFound) err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{ @@ -314,7 +314,7 @@ func TestGRPCServerGetArchiveTrace_Error(t *testing.T) { traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer) traceSteam.On("Context").Return(context.Background()) - r.impl.archiveReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.archiveReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(nil, errors.New("some error")) err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{ @@ -329,7 +329,7 @@ func TestGRPCServerGetArchiveTrace_NoImpl(t *testing.T) { r.server.impl.ArchiveSpanReader = func() spanstore.Reader { return nil } traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer) - r.impl.archiveReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.archiveReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(nil, errors.New("some error")) err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{ @@ -350,7 +350,7 @@ func TestGRPCServerGetArchiveTrace_StreamError(t *testing.T) { for i := range mockTraceSpans { traceSpans = append(traceSpans, &mockTraceSpans[i]) } - r.impl.archiveReader.On("GetTrace", mock.Anything, mockTraceID). + r.impl.archiveReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}). Return(&model.Trace{Spans: traceSpans}, nil) err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{ diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index a778776e63f..35f7c6e6025 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -198,7 +198,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { var err error - actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), tID) + actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: tID}) return err == nil && len(actual.Spans) == 1 }) require.True(t, found) @@ -217,7 +217,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) + actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -290,7 +290,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) + actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) if err != nil { t.Log(err) } @@ -302,7 +302,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { t.Run("NotFound error", func(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} - trace, err := s.SpanReader.GetTrace(context.Background(), fakeTraceID) + trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, trace) }) diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 987f49057b1..24b2131993d 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -101,8 +101,8 @@ type ingester struct { traceStore *memory.Store } -func (r *ingester) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - return r.traceStore.GetTrace(ctx, traceID) +func (r *ingester) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { + return r.traceStore.GetTrace(ctx, query) } func (*ingester) GetServices(context.Context) ([]string, error) { diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 48276aabffb..a20e1a5afc6 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -182,11 +182,11 @@ func (st *Store) WriteSpan(ctx context.Context, span *model.Span) error { } // GetTrace gets a trace -func (st *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (st *Store) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { m := st.getTenant(tenancy.GetTenant(ctx)) m.RLock() defer m.RUnlock() - trace, ok := m.traces[traceID] + trace, ok := m.traces[query.TraceID] if !ok { return nil, spanstore.ErrTraceNotFound } diff --git a/plugin/storage/memory/memory_test.go b/plugin/storage/memory/memory_test.go index 7005c273b33..af130edd566 100644 --- a/plugin/storage/memory/memory_test.go +++ b/plugin/storage/memory/memory_test.go @@ -187,7 +187,8 @@ func TestStoreWithLimit(t *testing.T) { func TestStoreGetTraceSuccess(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - trace, err := store.GetTrace(context.Background(), testingSpan.TraceID) + query := spanstore.GetTraceParameters{TraceID: testingSpan.TraceID} + trace, err := store.GetTrace(context.Background(), query) require.NoError(t, err) assert.Len(t, trace.Spans, 1) assert.Equal(t, testingSpan, trace.Spans[0]) @@ -196,7 +197,8 @@ func TestStoreGetTraceSuccess(t *testing.T) { func TestStoreGetAndMutateTrace(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - trace, err := store.GetTrace(context.Background(), testingSpan.TraceID) + query := spanstore.GetTraceParameters{TraceID: testingSpan.TraceID} + trace, err := store.GetTrace(context.Background(), query) require.NoError(t, err) assert.Len(t, trace.Spans, 1) assert.Equal(t, testingSpan, trace.Spans[0]) @@ -204,7 +206,7 @@ func TestStoreGetAndMutateTrace(t *testing.T) { trace.Spans[0].Warnings = append(trace.Spans[0].Warnings, "the end is near") - trace, err = store.GetTrace(context.Background(), testingSpan.TraceID) + trace, err = store.GetTrace(context.Background(), query) require.NoError(t, err) assert.Len(t, trace.Spans, 1) assert.Equal(t, testingSpan, trace.Spans[0]) @@ -217,14 +219,16 @@ func TestStoreGetTraceError(t *testing.T) { store.getTenant("").traces[testingSpan.TraceID] = &model.Trace{ Spans: []*model.Span{nonSerializableSpan}, } - _, err := store.GetTrace(context.Background(), testingSpan.TraceID) + query := spanstore.GetTraceParameters{TraceID: testingSpan.TraceID} + _, err := store.GetTrace(context.Background(), query) require.Error(t, err) }) } func TestStoreGetTraceFailure(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - trace, err := store.GetTrace(context.Background(), model.TraceID{}) + query := spanstore.GetTraceParameters{} + trace, err := store.GetTrace(context.Background(), query) require.EqualError(t, err, spanstore.ErrTraceNotFound.Error()) assert.Nil(t, trace) }) @@ -448,12 +452,14 @@ func TestTenantStore(t *testing.T) { require.NoError(t, store.WriteSpan(ctxWonka, testingSpan2)) // Can we retrieve the spans with correct tenancy - trace1, err := store.GetTrace(ctxAcme, testingSpan.TraceID) + query := spanstore.GetTraceParameters{TraceID: testingSpan.TraceID} + trace1, err := store.GetTrace(ctxAcme, query) require.NoError(t, err) assert.Len(t, trace1.Spans, 1) assert.Equal(t, testingSpan, trace1.Spans[0]) - trace2, err := store.GetTrace(ctxWonka, testingSpan2.TraceID) + query2 := spanstore.GetTraceParameters{TraceID: testingSpan2.TraceID} + trace2, err := store.GetTrace(ctxWonka, query2) require.NoError(t, err) assert.Len(t, trace2.Spans, 1) assert.Equal(t, testingSpan2, trace2.Spans[0]) @@ -476,13 +482,13 @@ func TestTenantStore(t *testing.T) { assert.Equal(t, testingSpan2, traces2[0].Spans[0]) // Do the spans fail with incorrect tenancy? - _, err = store.GetTrace(ctxAcme, testingSpan2.TraceID) + _, err = store.GetTrace(ctxAcme, query2) require.Error(t, err) - _, err = store.GetTrace(ctxWonka, testingSpan.TraceID) + _, err = store.GetTrace(ctxWonka, query) require.Error(t, err) - _, err = store.GetTrace(context.Background(), testingSpan.TraceID) + _, err = store.GetTrace(context.Background(), query) require.Error(t, err) }) } diff --git a/storage/spanstore/interface.go b/storage/spanstore/interface.go index c4c29181502..3c4dd0dd4e7 100644 --- a/storage/spanstore/interface.go +++ b/storage/spanstore/interface.go @@ -25,7 +25,7 @@ type Reader interface { // GetTrace retrieves the trace with a given id. // // If no spans are stored for this trace, it returns ErrTraceNotFound. - GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) + GetTrace(ctx context.Context, query GetTraceParameters) (*model.Trace, error) // GetServices returns all service names known to the backend from spans // within its retention period. @@ -50,6 +50,13 @@ type Reader interface { FindTraceIDs(ctx context.Context, query *TraceQueryParameters) ([]model.TraceID, error) } +// GetTraceParameters contains parameters of a trace get. +type GetTraceParameters struct { + TraceID model.TraceID + StartTime time.Time // optional + EndTime time.Time // optional +} + // TraceQueryParameters contains parameters of a trace query. type TraceQueryParameters struct { ServiceName string diff --git a/storage/spanstore/mocks/Reader.go b/storage/spanstore/mocks/Reader.go index d48658937c3..29d745996f3 100644 --- a/storage/spanstore/mocks/Reader.go +++ b/storage/spanstore/mocks/Reader.go @@ -141,9 +141,9 @@ func (_m *Reader) GetServices(ctx context.Context) ([]string, error) { return r0, r1 } -// GetTrace provides a mock function with given fields: ctx, traceID -func (_m *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - ret := _m.Called(ctx, traceID) +// GetTrace provides a mock function with given fields: ctx, query +func (_m *Reader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { + ret := _m.Called(ctx, query) if len(ret) == 0 { panic("no return value specified for GetTrace") @@ -151,19 +151,19 @@ func (_m *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.T var r0 *model.Trace var r1 error - if rf, ok := ret.Get(0).(func(context.Context, model.TraceID) (*model.Trace, error)); ok { - return rf(ctx, traceID) + if rf, ok := ret.Get(0).(func(context.Context, spanstore.GetTraceParameters) (*model.Trace, error)); ok { + return rf(ctx, query) } - if rf, ok := ret.Get(0).(func(context.Context, model.TraceID) *model.Trace); ok { - r0 = rf(ctx, traceID) + if rf, ok := ret.Get(0).(func(context.Context, spanstore.GetTraceParameters) *model.Trace); ok { + r0 = rf(ctx, query) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*model.Trace) } } - if rf, ok := ret.Get(1).(func(context.Context, model.TraceID) error); ok { - r1 = rf(ctx, traceID) + if rf, ok := ret.Get(1).(func(context.Context, spanstore.GetTraceParameters) error); ok { + r1 = rf(ctx, query) } else { r1 = ret.Error(1) } diff --git a/storage/spanstore/spanstoremetrics/read_metrics.go b/storage/spanstore/spanstoremetrics/read_metrics.go index 0f4ca13d0c0..b87453cc053 100644 --- a/storage/spanstore/spanstoremetrics/read_metrics.go +++ b/storage/spanstore/spanstoremetrics/read_metrics.go @@ -78,9 +78,9 @@ func (m *ReadMetricsDecorator) FindTraceIDs(ctx context.Context, traceQuery *spa } // GetTrace implements spanstore.Reader#GetTrace -func (m *ReadMetricsDecorator) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (m *ReadMetricsDecorator) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { start := time.Now() - retMe, err := m.spanReader.GetTrace(ctx, traceID) + retMe, err := m.spanReader.GetTrace(ctx, query) m.getTraceMetrics.emit(err, time.Since(start), 1) return retMe, err } diff --git a/storage/spanstore/spanstoremetrics/read_metrics_test.go b/storage/spanstore/spanstoremetrics/read_metrics_test.go index 8f9c40863b5..8581cbf9875 100644 --- a/storage/spanstore/spanstoremetrics/read_metrics_test.go +++ b/storage/spanstore/spanstoremetrics/read_metrics_test.go @@ -29,8 +29,8 @@ func TestSuccessfulUnderlyingCalls(t *testing.T) { mockReader.On("GetOperations", context.Background(), operationQuery). Return([]spanstore.Operation{}, nil) mrs.GetOperations(context.Background(), operationQuery) - mockReader.On("GetTrace", context.Background(), model.TraceID{}).Return(&model.Trace{}, nil) - mrs.GetTrace(context.Background(), model.TraceID{}) + mockReader.On("GetTrace", context.Background(), spanstore.GetTraceParameters{}).Return(&model.Trace{}, nil) + mrs.GetTrace(context.Background(), spanstore.GetTraceParameters{}) mockReader.On("FindTraces", context.Background(), &spanstore.TraceQueryParameters{}). Return([]*model.Trace{}, nil) mrs.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) @@ -97,9 +97,9 @@ func TestFailingUnderlyingCalls(t *testing.T) { mockReader.On("GetOperations", context.Background(), operationQuery). Return(nil, errors.New("Failure")) mrs.GetOperations(context.Background(), operationQuery) - mockReader.On("GetTrace", context.Background(), model.TraceID{}). + mockReader.On("GetTrace", context.Background(), spanstore.GetTraceParameters{}). Return(nil, errors.New("Failure")) - mrs.GetTrace(context.Background(), model.TraceID{}) + mrs.GetTrace(context.Background(), spanstore.GetTraceParameters{}) mockReader.On("FindTraces", context.Background(), &spanstore.TraceQueryParameters{}). Return(nil, errors.New("Failure")) mrs.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) diff --git a/storage_v2/factoryadapter/reader.go b/storage_v2/factoryadapter/reader.go index dd3f51041a1..d90a31479c8 100644 --- a/storage_v2/factoryadapter/reader.go +++ b/storage_v2/factoryadapter/reader.go @@ -46,9 +46,12 @@ func (tr *TraceReader) GetTraces( ) iter.Seq2[[]ptrace.Traces, error] { return func(yield func([]ptrace.Traces, error) bool) { for _, idParams := range traceIDs { - // TODO start/end times are not supported by v1 reader - // https://github.com/jaegertracing/jaeger/pull/6242 - t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(idParams.TraceID)) + query := spanstore.GetTraceParameters{ + TraceID: model.TraceIDFromOTEL(idParams.TraceID), + StartTime: idParams.Start, + EndTime: idParams.End, + } + t, err := tr.spanReader.GetTrace(ctx, query) if err != nil { if errors.Is(err, spanstore.ErrTraceNotFound) { continue diff --git a/storage_v2/factoryadapter/reader_test.go b/storage_v2/factoryadapter/reader_test.go index ddab5192bfe..c870b6a72d9 100644 --- a/storage_v2/factoryadapter/reader_test.go +++ b/storage_v2/factoryadapter/reader_test.go @@ -58,7 +58,8 @@ func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) { }, }, } - sr.On("GetTrace", mock.Anything, model.NewTraceID(2, 3)).Return(modelTrace, nil) + expectedQuery := spanstore.GetTraceParameters{TraceID: model.NewTraceID(2, 3)} + sr.On("GetTrace", mock.Anything, expectedQuery).Return(modelTrace, nil) traceReader := &TraceReader{ spanReader: sr, } diff --git a/storage_v2/factoryadapter/writer_test.go b/storage_v2/factoryadapter/writer_test.go index df5966cd33f..68a09af6e4c 100644 --- a/storage_v2/factoryadapter/writer_test.go +++ b/storage_v2/factoryadapter/writer_test.go @@ -16,6 +16,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage/spanstore" spanstoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -32,7 +33,8 @@ func TestWriteTraces(t *testing.T) { tdID := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() traceID, err := model.TraceIDFromBytes(tdID[:]) require.NoError(t, err) - trace, err := memstore.GetTrace(context.Background(), traceID) + query := spanstore.GetTraceParameters{TraceID: traceID} + trace, err := memstore.GetTrace(context.Background(), query) require.NoError(t, err) require.NotNil(t, trace) assert.Len(t, trace.Spans, 1)