Skip to content

Commit

Permalink
Revert "[fix][grpc] Disable tracing in grpc storage writer clients (j…
Browse files Browse the repository at this point in the history
…aegertracing#6125)"

This reverts commit a4efe57.
  • Loading branch information
yurishkuro committed Nov 2, 2024
1 parent c0f3995 commit d7fb346
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 77 deletions.
4 changes: 4 additions & 0 deletions cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package integration

import (
"fmt"
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
)

type GRPCStorageIntegration struct {
Expand All @@ -30,6 +32,8 @@ func TestGRPCStorage(t *testing.T) {
s := &GRPCStorageIntegration{
E2EStorageIntegration: E2EStorageIntegration{
ConfigFile: "../../config-remote-storage.yaml",
// TODO this should be removed in favor of default health check endpoint
HealthCheckEndpoint: fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP),
},
}
s.CleanUp = s.cleanUp
Expand Down
98 changes: 33 additions & 65 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand All @@ -43,14 +42,13 @@ var ( // interface comformance checks

// Factory implements storage.Factory and creates storage components backed by a storage plugin.
type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider
config Config
services *ClientPluginServices
tracedRemoteConn *grpc.ClientConn
untracedRemoteConn *grpc.ClientConn
host component.Host
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider
config Config
services *ClientPluginServices
remoteConn *grpc.ClientConn
host component.Host
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -93,9 +91,15 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

tracedTelset := getTelset(logger, f.tracerProvider)
untracedTelset := getTelset(logger, noop.NewTracerProvider())
newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: f.tracerProvider,
// TODO needs to be joined with the metricsFactory
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
},
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
clientOpts := make([]configgrpc.ToClientConnOption, 0)
for _, opt := range opts {
clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt))
Expand All @@ -104,62 +108,40 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}

var err error
f.services, err = f.newRemoteStorage(tracedTelset, untracedTelset, newClientFn)
f.services, err = f.newRemoteStorage(telset, newClientFn)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}
logger.Info("Remote storage configuration", zap.Any("configuration", f.config))
return nil
}

type newClientFn func(telset component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error)
type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(
tracedTelset component.TelemetrySettings,
untracedTelset component.TelemetrySettings,
newClient newClientFn,
) (*ClientPluginServices, error) {
func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.config
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}

tenancyMgr := tenancy.NewManager(&c.Tenancy)
unaryInterceptors := []grpc.UnaryClientInterceptor{
bearertoken.NewUnaryClientInterceptor(),
}
streamInterceptors := []grpc.StreamClientInterceptor{
tenancy.NewClientStreamInterceptor(tenancyMgr),
}
if tenancyMgr.Enabled {
unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr))
streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr))
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

baseOpts := append(
[]grpc.DialOption{},
grpc.WithChainUnaryInterceptor(unaryInterceptors...),
grpc.WithChainStreamInterceptor(streamInterceptors...),
)
opts := append([]grpc.DialOption{}, baseOpts...)
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracedTelset.TracerProvider))))
opts = append(opts, grpc.WithUnaryInterceptor(bearertoken.NewUnaryClientInterceptor()))
opts = append(opts, grpc.WithStreamInterceptor(bearertoken.NewStreamClientInterceptor()))

tracedRemoteConn, err := newClient(tracedTelset, opts...)
remoteConn, err := newClient(opts...)
if err != nil {
return nil, fmt.Errorf("error creating traced remote storage client: %w", err)
}
f.tracedRemoteConn = tracedRemoteConn
untracedOpts := append([]grpc.DialOption{}, baseOpts...)
untracedOpts = append(
untracedOpts,
grpc.WithStatsHandler(
otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(untracedTelset.TracerProvider))))
untracedRemoteConn, err := newClient(tracedTelset, untracedOpts...)
if err != nil {
return nil, fmt.Errorf("error creating untraced remote storage client: %w", err)
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}
f.untracedRemoteConn = untracedRemoteConn
grpcClient := shared.NewGRPCClient(tracedRemoteConn, untracedRemoteConn)
f.remoteConn = remoteConn
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down Expand Up @@ -223,22 +205,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
if f.tracedRemoteConn != nil {
errs = append(errs, f.tracedRemoteConn.Close())
}
if f.untracedRemoteConn != nil {
errs = append(errs, f.untracedRemoteConn.Close())
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
}
return errors.Join(errs...)
}

func getTelset(logger *zap.Logger, tracerProvider trace.TracerProvider) component.TelemetrySettings {
return component.TelemetrySettings{
Logger: logger,
TracerProvider: tracerProvider,
// TODO needs to be joined with the metricsFactory
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
},
}
}
6 changes: 3 additions & 3 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func TestNewFactoryError(t *testing.T) {
f, err := NewFactoryWithConfig(Config{}, metrics.NullFactory, zap.NewNop(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
}
_, err = f.newRemoteStorage(component.TelemetrySettings{}, component.TelemetrySettings{}, newClientFn)
_, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating traced remote storage client")
require.Contains(t, err.Error(), "error creating remote storage client")
})
}

Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type GRPCClient struct {
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
}

func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *GRPCClient {
func NewGRPCClient(c *grpc.ClientConn) *GRPCClient {
return &GRPCClient{
readerClient: storage_v1.NewSpanReaderPluginClient(tracedConn),
writerClient: storage_v1.NewSpanWriterPluginClient(untracedConn),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(tracedConn),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(untracedConn),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn),
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) {

func TestNewGRPCClient(t *testing.T) {
conn := &grpc.ClientConn{}
client := NewGRPCClient(conn, conn)
client := NewGRPCClient(conn)
assert.NotNil(t, client)

assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)
Expand Down

0 comments on commit d7fb346

Please sign in to comment.