Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][grpc] Disable tracing in grpc storage writer clients #6125

Merged
merged 8 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
package integration

import (
"fmt"
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
)

type GRPCStorageIntegration struct {
Expand All @@ -32,8 +30,6 @@ func TestGRPCStorage(t *testing.T) {
s := &GRPCStorageIntegration{
E2EStorageIntegration: E2EStorageIntegration{
ConfigFile: "../../config-remote-storage.yaml",
// TODO this should be removed in favor of default health check endpoint
HealthCheckEndpoint: fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP),
},
}
s.CleanUp = s.cleanUp
Expand Down
89 changes: 55 additions & 34 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand All @@ -42,13 +43,14 @@

// Factory implements storage.Factory and creates storage components backed by a storage plugin.
type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider
config Config
services *ClientPluginServices
remoteConn *grpc.ClientConn
host component.Host
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider
config Config
services *ClientPluginServices
tracedRemoteConn *grpc.ClientConn
untracedRemoteConn *grpc.ClientConn
host component.Host
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -91,15 +93,9 @@
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: f.tracerProvider,
// TODO needs to be joined with the metricsFactory
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
},
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
tracedTelset := getTelset(logger, f.tracerProvider)
untracedTelset := getTelset(logger, noop.NewTracerProvider())
newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
clientOpts := make([]configgrpc.ToClientConnOption, 0)
for _, opt := range opts {
clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt))
Expand All @@ -108,40 +104,51 @@
}

var err error
f.services, err = f.newRemoteStorage(telset, newClientFn)
f.services, err = f.newRemoteStorage(tracedTelset, untracedTelset, newClientFn)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}
logger.Info("Remote storage configuration", zap.Any("configuration", f.config))
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)
type newClientFn func(telset component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
func (f *Factory) newRemoteStorage(
tracedTelset component.TelemetrySettings,
untracedTelset component.TelemetrySettings,
newClient newClientFn,
) (*ClientPluginServices, error) {
c := f.config
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}

tenancyMgr := tenancy.NewManager(&c.Tenancy)
baseOpts := []grpc.DialOption{}
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
baseOpts = append(baseOpts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
baseOpts = append(baseOpts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

opts = append(opts, grpc.WithUnaryInterceptor(bearertoken.NewUnaryClientInterceptor()))
opts = append(opts, grpc.WithStreamInterceptor(bearertoken.NewStreamClientInterceptor()))

remoteConn, err := newClient(opts...)
baseOpts = append(baseOpts, grpc.WithUnaryInterceptor(bearertoken.NewUnaryClientInterceptor()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure declaring two options for an interceptor type won't have latest-wins effect? In the pr that introduced bearer interceptors we used a composite interceptor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro looks like grpc.NewClient chains the interceptors into one (ref: https://github.com/grpc/grpc-go/blob/cb329375b14e0ddd4e454e18405a732f7c2ccd72/clientconn.go#L170-L171)

	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that, but the way the functional options are processed is they just override the only interceptor slot: https://github.com/grpc/grpc-go/blob/cb329375b14e0ddd4e454e18405a732f7c2ccd72/dialoptions.go#L535

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chained the unary and stream interceptors together

baseOpts = append(baseOpts, grpc.WithStreamInterceptor(bearertoken.NewStreamClientInterceptor()))
opts := append(baseOpts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracedTelset.TracerProvider))))
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
tracedRemoteConn, err := newClient(tracedTelset, opts...)
if err != nil {
return nil, fmt.Errorf("error creating traced remote storage client: %w", err)
}
f.tracedRemoteConn = tracedRemoteConn
untracedOpts := append(
baseOpts,
grpc.WithStatsHandler(
otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(untracedTelset.TracerProvider))))
untracedRemoteConn, err := newClient(tracedTelset, untracedOpts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
return nil, fmt.Errorf("error creating untraced remote storage client: %w", err)

Check warning on line 148 in plugin/storage/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/factory.go#L148

Added line #L148 was not covered by tests
}
f.remoteConn = remoteConn
grpcClient := shared.NewGRPCClient(remoteConn)
f.untracedRemoteConn = untracedRemoteConn
grpcClient := shared.NewGRPCClient(tracedRemoteConn, untracedRemoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down Expand Up @@ -205,8 +212,22 @@
// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
if f.tracedRemoteConn != nil {
errs = append(errs, f.tracedRemoteConn.Close())
}
if f.untracedRemoteConn != nil {
errs = append(errs, f.untracedRemoteConn.Close())
}
return errors.Join(errs...)
}

func getTelset(logger *zap.Logger, tracerProvider trace.TracerProvider) component.TelemetrySettings {
return component.TelemetrySettings{
Logger: logger,
TracerProvider: tracerProvider,
// TODO needs to be joined with the metricsFactory
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
6 changes: 3 additions & 3 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func TestNewFactoryError(t *testing.T) {
f, err := NewFactoryWithConfig(Config{}, metrics.NullFactory, zap.NewNop(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
}
_, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn)
_, err = f.newRemoteStorage(component.TelemetrySettings{}, component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
require.Contains(t, err.Error(), "error creating traced remote storage client")
})
}

Expand Down
8 changes: 4 additions & 4 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type GRPCClient struct {
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *GRPCClient {
func NewGRPCClient(c *grpc.ClientConn, noTracingConn *grpc.ClientConn) *GRPCClient {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
return &GRPCClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(noTracingConn),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(noTracingConn),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(noTracingConn),
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) {

func TestNewGRPCClient(t *testing.T) {
conn := &grpc.ClientConn{}
client := NewGRPCClient(conn)
client := NewGRPCClient(conn, conn)
assert.NotNil(t, client)

assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)
Expand Down
Loading