Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GRPC configuration option, rework grpc config a bit #917

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Config interface {
// incoming events over gRPC
GetGRPCListenAddr() (string, error)

// Returns the entire GRPC config block
GetGRPCConfig() GRPCServerParameters

// IsAPIKeyValid checks if the given API key is valid according to the rules
IsAPIKeyValid(key string) bool

Expand Down Expand Up @@ -164,16 +167,6 @@ type Config interface {
// GetQueryAuthToken returns the token that must be used to access the /query endpoints
GetQueryAuthToken() string

GetGRPCMaxConnectionIdle() time.Duration

GetGRPCMaxConnectionAge() time.Duration

GetGRPCMaxConnectionAgeGrace() time.Duration

GetGRPCKeepAlive() time.Duration

GetGRPCKeepAliveTimeout() time.Duration

GetPeerTimeout() time.Duration

GetAdditionalErrorFields() []string
Expand Down
42 changes: 37 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,36 @@ func TestHoneycombLoggerConfigDefaults(t *testing.T) {
assert.Equal(t, 10, loggerConfig.SamplerThroughput)
}

func TestHoneycombGRPCConfigDefaults(t *testing.T) {
cm := makeYAML(
"General.ConfigurationVersion", 2,
"GRPCServerParameters.Enabled", true,
"GRPCServerParameters.ListenAddr", "localhost:4343",
)
rm := makeYAML("ConfigVersion", 2)
config, rules := createTempConfigs(t, cm, rm)
defer os.Remove(rules)
defer os.Remove(config)
c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules})
assert.NoError(t, err)

grpcConfig := c.GetGRPCConfig()

assert.NoError(t, err)
kentquirk marked this conversation as resolved.
Show resolved Hide resolved

assert.Equal(t, true, c.GetGRPCEnabled())
a, err := c.GetGRPCListenAddr()
assert.NoError(t, err)
assert.Equal(t, "localhost:4343", a)
assert.Equal(t, true, grpcConfig.Enabled)
assert.Equal(t, "localhost:4343", grpcConfig.ListenAddr)
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionIdle))
assert.Equal(t, 3*time.Minute, time.Duration(grpcConfig.MaxConnectionAge))
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionAgeGrace))
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.KeepAlive))
assert.Equal(t, 20*time.Second, time.Duration(grpcConfig.KeepAliveTimeout))
}

func TestStdoutLoggerConfig(t *testing.T) {
cm := makeYAML(
"General.ConfigurationVersion", 2,
Expand Down Expand Up @@ -730,11 +760,13 @@ func TestGRPCServerParameters(t *testing.T) {
c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules})
assert.NoError(t, err)

assert.Equal(t, 1*time.Minute, c.GetGRPCMaxConnectionIdle())
assert.Equal(t, 2*time.Minute, c.GetGRPCMaxConnectionAge())
assert.Equal(t, 3*time.Minute, c.GetGRPCMaxConnectionAgeGrace())
assert.Equal(t, 4*time.Minute, c.GetGRPCKeepAlive())
assert.Equal(t, 5*time.Minute, c.GetGRPCKeepAliveTimeout())
gc := c.GetGRPCConfig()

assert.Equal(t, 1*time.Minute, time.Duration(gc.MaxConnectionIdle))
assert.Equal(t, 2*time.Minute, time.Duration(gc.MaxConnectionAge))
assert.Equal(t, 3*time.Minute, time.Duration(gc.MaxConnectionAgeGrace))
assert.Equal(t, 4*time.Minute, time.Duration(gc.KeepAlive))
assert.Equal(t, 5*time.Minute, time.Duration(gc.KeepAliveTimeout))
assert.Equal(t, true, c.GetGRPCEnabled())
addr, err := c.GetGRPCListenAddr()
assert.NoError(t, err)
Expand Down
58 changes: 16 additions & 42 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,15 @@ type IDFieldsConfig struct {
// by refinery's own GRPC server:
// https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters
type GRPCServerParameters struct {
Enabled bool `yaml:"Enabled" default:"true"`
ListenAddr string `yaml:"ListenAddr" cmdenv:"GRPCListenAddr"`
MaxConnectionIdle Duration `yaml:"MaxConnectionIdle" default:"1m"`
MaxConnectionAge Duration `yaml:"MaxConnectionAge" default:"3m"`
MaxConnectionAgeGrace Duration `yaml:"MaxConnectionAgeGrace" default:"1m"`
KeepAlive Duration `yaml:"KeepAlive" default:"1m"`
KeepAliveTimeout Duration `yaml:"KeepAliveTimeout" default:"20s"`
Enabled bool `yaml:"Enabled" default:"true"`
ListenAddr string `yaml:"ListenAddr" cmdenv:"GRPCListenAddr"`
MaxConnectionIdle Duration `yaml:"MaxConnectionIdle" default:"1m"`
MaxConnectionAge Duration `yaml:"MaxConnectionAge" default:"3m"`
MaxConnectionAgeGrace Duration `yaml:"MaxConnectionAgeGrace" default:"1m"`
KeepAlive Duration `yaml:"KeepAlive" default:"1m"`
KeepAliveTimeout Duration `yaml:"KeepAliveTimeout" default:"20s"`
MaxSendMsgSize MemorySize `yaml:"MaxSendMsgSize" default:"5MB"`
MaxRecvMsgSize MemorySize `yaml:"MaxRecvMsgSize" default:"5MB"`
}

type SampleCacheConfig struct {
Expand Down Expand Up @@ -490,6 +492,13 @@ func (f *fileConfig) GetGRPCListenAddr() (string, error) {
return f.mainConfig.GRPCServerParameters.ListenAddr, nil
}

func (f *fileConfig) GetGRPCConfig() GRPCServerParameters {
f.mux.RLock()
defer f.mux.RUnlock()

return f.mainConfig.GRPCServerParameters
}

func (f *fileConfig) IsAPIKeyValid(key string) bool {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down Expand Up @@ -799,41 +808,6 @@ func (f *fileConfig) GetQueryAuthToken() string {
return f.mainConfig.Debugging.QueryAuthToken
}

func (f *fileConfig) GetGRPCMaxConnectionIdle() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionIdle)
}

func (f *fileConfig) GetGRPCMaxConnectionAge() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionAge)
}

func (f *fileConfig) GetGRPCMaxConnectionAgeGrace() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionAgeGrace)
}

func (f *fileConfig) GetGRPCKeepAlive() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.KeepAlive)
}

func (f *fileConfig) GetGRPCKeepAliveTimeout() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.KeepAliveTimeout)
}

func (f *fileConfig) GetPeerTimeout() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down
34 changes: 34 additions & 0 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,40 @@ groups:
activity, then it pings the client to see if the transport is still
alive. "0s" sets duration to 20 seconds.

- name: MaxSendMsgSize
type: memorysize
valuetype: memorysize
default: 6MiB
reload: false
firstversion: v2.2
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
validations:
- type: minimum
arg: 1MB
- type: maximum
arg: 1GiB
summary: is the maximum message size the server can send.
description: >
The server enforces a maximum message size to avoid exhausting the
memory available to the process by a single request. The size is
expressed in bytes.

- name: MaxRecvMsgSize
type: memorysize
valuetype: memorysize
default: 7MiB
reload: false
firstversion: v2.2
validations:
- type: minimum
arg: 1MB
- type: maximum
arg: 1GiB
summary: is the maximum message size the server can receive.
description: >
The server enforces a maximum message size to avoid exhausting the
memory available to the process by a single request. The size is
expressed in bytes.

- name: SampleCache
title: "Sample Cache"
description: >
Expand Down
38 changes: 3 additions & 35 deletions config/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MockConfig struct {
GetGRPCEnabledVal bool
GetGRPCListenAddrErr error
GetGRPCListenAddrVal string
GetGRPCServerParameters GRPCServerParameters
GetLoggerTypeErr error
GetLoggerTypeVal string
GetHoneycombLoggerConfigErr error
Expand Down Expand Up @@ -77,11 +78,6 @@ type MockConfig struct {
EnvironmentCacheTTL time.Duration
DatasetPrefix string
QueryAuthToken string
GRPCMaxConnectionIdle time.Duration
GRPCMaxConnectionAge time.Duration
GRPCMaxConnectionAgeGrace time.Duration
GRPCTime time.Duration
GRPCTimeout time.Duration
PeerTimeout time.Duration
AdditionalErrorFields []string
AddSpanCountToRoot bool
Expand Down Expand Up @@ -454,39 +450,11 @@ func (f *MockConfig) GetQueryAuthToken() string {
return f.QueryAuthToken
}

func (f *MockConfig) GetGRPCMaxConnectionIdle() time.Duration {
func (f *MockConfig) GetGRPCConfig() GRPCServerParameters {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionIdle
}

func (f *MockConfig) GetGRPCMaxConnectionAge() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionAge
}

func (f *MockConfig) GetGRPCMaxConnectionAgeGrace() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionAgeGrace
}

func (f *MockConfig) GetGRPCKeepAlive() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCTime
}

func (f *MockConfig) GetGRPCKeepAliveTimeout() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCTimeout
return f.GetGRPCServerParameters
}

func (f *MockConfig) GetPeerTimeout() time.Duration {
Expand Down
6 changes: 6 additions & 0 deletions config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func asFloat(v any) (float64, string) {
if err == nil {
return float64(f.Milliseconds()), ""
}
// can we interpret it as a memory size?
var m MemorySize
err = m.UnmarshalText([]byte(v.(string)))
if err == nil {
return float64(m), ""
}
default:
}
return 0, fmt.Sprintf("%#v (%T) cannot be interpreted as a quantity", v, v)
Expand Down
11 changes: 7 additions & 4 deletions config/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func Test_asFloat(t *testing.T) {
{"Duration2", "1s", 1000, ""},
{"Duration3", "1m", 60000, ""},
{"Duration4", "1h", 3600000, ""},
{"string1", "1", 0, `"1" (string) cannot be interpreted as a quantity`},
{"MemorySize1", "1", 1, ""},
{"MemorySize1K", "1K", 1000, ""},
{"MemorySize1KiB", "1Kib", 1024, ""},
{"MemorySize1G", "1G", 1_000_000_000, ""},
{"nil", nil, 0, `<nil> (<nil>) cannot be interpreted as a quantity`},
}
for _, tt := range tests {
Expand Down Expand Up @@ -291,10 +294,10 @@ func Test_validate(t *testing.T) {
{"good require A", mm("RequireTest.FieldA", 1, "RequireTest.FieldC", 3), ""},
{"good require B", mm("RequireTest.FieldB", 2, "RequireTest.FieldC", 3, "RequireTest.FieldD", 4), ""},
{"good require C", mm("RequireTest.FieldC", 3), ""},
{"good require E with default", mm("RequireTest.FieldA", 2, "RequireTest.FieldC", 3), ""},
{"good require E with default", mm("RequireTest.FieldA", 2, "RequireTest.FieldC", 3), ""},
{"bad require", mm("RequireTest.FieldA", 1), "the group RequireTest is missing its required field FieldC"},
{"bad conflicts with A", mm("ConflictTest.FieldA", 2, "ConflictTest.FieldB", 3), "the group ConflictTest includes FieldA, which conflicts with FieldB"},
{"good conflicts with A", mm("ConflictTest.FieldA", 2), ""},
{"bad conflicts with A", mm("ConflictTest.FieldA", 2, "ConflictTest.FieldB", 3), "the group ConflictTest includes FieldA, which conflicts with FieldB"},
{"good conflicts with A", mm("ConflictTest.FieldA", 2), ""},
{"good slice elementType", mm("Traces.AStringArray", []any{"0.0.0.0:8080", "192.168.1.1:8080"}), ""},
{"bad slice elementType", mm("Traces.AStringArray", []any{"0.0.0.0"}), "field Traces.AStringArray[0] (0.0.0.0) must be a hostport: address 0.0.0.0: missing port in address"},
{"good map elementType", mm("Traces.AStringMap", map[string]any{"k": "v"}), ""},
Expand Down
15 changes: 8 additions & 7 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,16 @@ func (r *Router) LnS(incomingOrPeer string) {
}

r.iopLogger.Info().Logf("gRPC listening on %s", grpcAddr)
grpcConfig := r.Config.GetGRPCConfig()
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(GRPCMessageSizeMax), // default is math.MaxInt32
grpc.MaxRecvMsgSize(GRPCMessageSizeMax), // default is 4MB
grpc.MaxSendMsgSize(int(grpcConfig.MaxSendMsgSize)),
grpc.MaxRecvMsgSize(int(grpcConfig.MaxRecvMsgSize)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: r.Config.GetGRPCMaxConnectionIdle(),
MaxConnectionAge: r.Config.GetGRPCMaxConnectionAge(),
MaxConnectionAgeGrace: r.Config.GetGRPCMaxConnectionAgeGrace(),
Time: r.Config.GetGRPCKeepAlive(),
Timeout: r.Config.GetGRPCKeepAliveTimeout(),
MaxConnectionIdle: time.Duration(grpcConfig.MaxConnectionIdle),
MaxConnectionAge: time.Duration(grpcConfig.MaxConnectionAge),
MaxConnectionAgeGrace: time.Duration(grpcConfig.MaxConnectionAgeGrace),
Time: time.Duration(grpcConfig.KeepAlive),
Timeout: time.Duration(grpcConfig.KeepAliveTimeout),
}),
}
traceServer := NewTraceServer(r)
Expand Down