Skip to content

Commit

Permalink
Fix GRPC configuration and add memory size
Browse files Browse the repository at this point in the history
validation
  • Loading branch information
kentquirk committed Nov 30, 2023
1 parent b975a87 commit 0a8f9cb
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 103 deletions.
13 changes: 3 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Config interface {
// incoming events over gRPC
GetGRPCListenAddr() (string, error)

// Returns the entire GRPC config block
GetGRPCConfig() GRPCServerParameters

// IsAPIKeyValid checks if the given API key is valid according to the rules
IsAPIKeyValid(key string) bool

Expand Down Expand Up @@ -164,16 +167,6 @@ type Config interface {
// GetQueryAuthToken returns the token that must be used to access the /query endpoints
GetQueryAuthToken() string

GetGRPCMaxConnectionIdle() time.Duration

GetGRPCMaxConnectionAge() time.Duration

GetGRPCMaxConnectionAgeGrace() time.Duration

GetGRPCKeepAlive() time.Duration

GetGRPCKeepAliveTimeout() time.Duration

GetPeerTimeout() time.Duration

GetAdditionalErrorFields() []string
Expand Down
42 changes: 37 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,36 @@ func TestHoneycombLoggerConfigDefaults(t *testing.T) {
assert.Equal(t, 10, loggerConfig.SamplerThroughput)
}

func TestHoneycombGRPCConfigDefaults(t *testing.T) {
cm := makeYAML(
"General.ConfigurationVersion", 2,
"GRPCServerParameters.Enabled", true,
"GRPCServerParameters.ListenAddr", "localhost:4343",
)
rm := makeYAML("ConfigVersion", 2)
config, rules := createTempConfigs(t, cm, rm)
defer os.Remove(rules)
defer os.Remove(config)
c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules})
assert.NoError(t, err)

grpcConfig := c.GetGRPCConfig()

assert.NoError(t, err)

assert.Equal(t, true, c.GetGRPCEnabled())
a, err := c.GetGRPCListenAddr()
assert.NoError(t, err)
assert.Equal(t, "localhost:4343", a)
assert.Equal(t, true, grpcConfig.Enabled)
assert.Equal(t, "localhost:4343", grpcConfig.ListenAddr)
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionIdle))
assert.Equal(t, 3*time.Minute, time.Duration(grpcConfig.MaxConnectionAge))
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionAgeGrace))
assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.KeepAlive))
assert.Equal(t, 20*time.Second, time.Duration(grpcConfig.KeepAliveTimeout))
}

func TestStdoutLoggerConfig(t *testing.T) {
cm := makeYAML(
"General.ConfigurationVersion", 2,
Expand Down Expand Up @@ -730,11 +760,13 @@ func TestGRPCServerParameters(t *testing.T) {
c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules})
assert.NoError(t, err)

assert.Equal(t, 1*time.Minute, c.GetGRPCMaxConnectionIdle())
assert.Equal(t, 2*time.Minute, c.GetGRPCMaxConnectionAge())
assert.Equal(t, 3*time.Minute, c.GetGRPCMaxConnectionAgeGrace())
assert.Equal(t, 4*time.Minute, c.GetGRPCKeepAlive())
assert.Equal(t, 5*time.Minute, c.GetGRPCKeepAliveTimeout())
gc := c.GetGRPCConfig()

assert.Equal(t, 1*time.Minute, time.Duration(gc.MaxConnectionIdle))
assert.Equal(t, 2*time.Minute, time.Duration(gc.MaxConnectionAge))
assert.Equal(t, 3*time.Minute, time.Duration(gc.MaxConnectionAgeGrace))
assert.Equal(t, 4*time.Minute, time.Duration(gc.KeepAlive))
assert.Equal(t, 5*time.Minute, time.Duration(gc.KeepAliveTimeout))
assert.Equal(t, true, c.GetGRPCEnabled())
addr, err := c.GetGRPCListenAddr()
assert.NoError(t, err)
Expand Down
58 changes: 16 additions & 42 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,15 @@ type IDFieldsConfig struct {
// by refinery's own GRPC server:
// https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters
type GRPCServerParameters struct {
Enabled bool `yaml:"Enabled" default:"true"`
ListenAddr string `yaml:"ListenAddr" cmdenv:"GRPCListenAddr"`
MaxConnectionIdle Duration `yaml:"MaxConnectionIdle" default:"1m"`
MaxConnectionAge Duration `yaml:"MaxConnectionAge" default:"3m"`
MaxConnectionAgeGrace Duration `yaml:"MaxConnectionAgeGrace" default:"1m"`
KeepAlive Duration `yaml:"KeepAlive" default:"1m"`
KeepAliveTimeout Duration `yaml:"KeepAliveTimeout" default:"20s"`
Enabled bool `yaml:"Enabled" default:"true"`
ListenAddr string `yaml:"ListenAddr" cmdenv:"GRPCListenAddr"`
MaxConnectionIdle Duration `yaml:"MaxConnectionIdle" default:"1m"`
MaxConnectionAge Duration `yaml:"MaxConnectionAge" default:"3m"`
MaxConnectionAgeGrace Duration `yaml:"MaxConnectionAgeGrace" default:"1m"`
KeepAlive Duration `yaml:"KeepAlive" default:"1m"`
KeepAliveTimeout Duration `yaml:"KeepAliveTimeout" default:"20s"`
MaxSendMsgSize MemorySize `yaml:"MaxSendMsgSize" default:"5MB"`
MaxRecvMsgSize MemorySize `yaml:"MaxRecvMsgSize" default:"5MB"`
}

type SampleCacheConfig struct {
Expand Down Expand Up @@ -490,6 +492,13 @@ func (f *fileConfig) GetGRPCListenAddr() (string, error) {
return f.mainConfig.GRPCServerParameters.ListenAddr, nil
}

func (f *fileConfig) GetGRPCConfig() GRPCServerParameters {
f.mux.RLock()
defer f.mux.RUnlock()

return f.mainConfig.GRPCServerParameters
}

func (f *fileConfig) IsAPIKeyValid(key string) bool {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down Expand Up @@ -799,41 +808,6 @@ func (f *fileConfig) GetQueryAuthToken() string {
return f.mainConfig.Debugging.QueryAuthToken
}

func (f *fileConfig) GetGRPCMaxConnectionIdle() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionIdle)
}

func (f *fileConfig) GetGRPCMaxConnectionAge() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionAge)
}

func (f *fileConfig) GetGRPCMaxConnectionAgeGrace() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.MaxConnectionAgeGrace)
}

func (f *fileConfig) GetGRPCKeepAlive() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.KeepAlive)
}

func (f *fileConfig) GetGRPCKeepAliveTimeout() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return time.Duration(f.mainConfig.GRPCServerParameters.KeepAliveTimeout)
}

func (f *fileConfig) GetPeerTimeout() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down
34 changes: 34 additions & 0 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,40 @@ groups:
activity, then it pings the client to see if the transport is still
alive. "0s" sets duration to 20 seconds.
- name: MaxSendMsgSize
type: memorysize
valuetype: memorysize
default: 6MiB
reload: false
firstversion: v2.2
validations:
- type: minimum
arg: 1MB
- type: maximum
arg: 1GiB
summary: is the maximum message size the server can send.
description: >
The server enforces a maximum message size to avoid exhausting the
memory available to the process by a single request. The size is
expressed in bytes.
- name: MaxRecvMsgSize
type: memorysize
valuetype: memorysize
default: 7MiB
reload: false
firstversion: v2.2
validations:
- type: minimum
arg: 1MB
- type: maximum
arg: 1GiB
summary: is the maximum message size the server can receive.
description: >
The server enforces a maximum message size to avoid exhausting the
memory available to the process by a single request. The size is
expressed in bytes.
- name: SampleCache
title: "Sample Cache"
description: >
Expand Down
38 changes: 3 additions & 35 deletions config/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MockConfig struct {
GetGRPCEnabledVal bool
GetGRPCListenAddrErr error
GetGRPCListenAddrVal string
GetGRPCServerParameters GRPCServerParameters
GetLoggerTypeErr error
GetLoggerTypeVal string
GetHoneycombLoggerConfigErr error
Expand Down Expand Up @@ -77,11 +78,6 @@ type MockConfig struct {
EnvironmentCacheTTL time.Duration
DatasetPrefix string
QueryAuthToken string
GRPCMaxConnectionIdle time.Duration
GRPCMaxConnectionAge time.Duration
GRPCMaxConnectionAgeGrace time.Duration
GRPCTime time.Duration
GRPCTimeout time.Duration
PeerTimeout time.Duration
AdditionalErrorFields []string
AddSpanCountToRoot bool
Expand Down Expand Up @@ -454,39 +450,11 @@ func (f *MockConfig) GetQueryAuthToken() string {
return f.QueryAuthToken
}

func (f *MockConfig) GetGRPCMaxConnectionIdle() time.Duration {
func (f *MockConfig) GetGRPCConfig() GRPCServerParameters {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionIdle
}

func (f *MockConfig) GetGRPCMaxConnectionAge() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionAge
}

func (f *MockConfig) GetGRPCMaxConnectionAgeGrace() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCMaxConnectionAgeGrace
}

func (f *MockConfig) GetGRPCKeepAlive() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCTime
}

func (f *MockConfig) GetGRPCKeepAliveTimeout() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()

return f.GRPCTimeout
return f.GetGRPCServerParameters
}

func (f *MockConfig) GetPeerTimeout() time.Duration {
Expand Down
6 changes: 6 additions & 0 deletions config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func asFloat(v any) (float64, string) {
if err == nil {
return float64(f.Milliseconds()), ""
}
// can we interpret it as a memory size?
var m MemorySize
err = m.UnmarshalText([]byte(v.(string)))
if err == nil {
return float64(m), ""
}
default:
}
return 0, fmt.Sprintf("%#v (%T) cannot be interpreted as a quantity", v, v)
Expand Down
11 changes: 7 additions & 4 deletions config/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func Test_asFloat(t *testing.T) {
{"Duration2", "1s", 1000, ""},
{"Duration3", "1m", 60000, ""},
{"Duration4", "1h", 3600000, ""},
{"string1", "1", 0, `"1" (string) cannot be interpreted as a quantity`},
{"MemorySize1", "1", 1, ""},
{"MemorySize1K", "1K", 1000, ""},
{"MemorySize1KiB", "1Kib", 1024, ""},
{"MemorySize1G", "1G", 1_000_000_000, ""},
{"nil", nil, 0, `<nil> (<nil>) cannot be interpreted as a quantity`},
}
for _, tt := range tests {
Expand Down Expand Up @@ -291,10 +294,10 @@ func Test_validate(t *testing.T) {
{"good require A", mm("RequireTest.FieldA", 1, "RequireTest.FieldC", 3), ""},
{"good require B", mm("RequireTest.FieldB", 2, "RequireTest.FieldC", 3, "RequireTest.FieldD", 4), ""},
{"good require C", mm("RequireTest.FieldC", 3), ""},
{"good require E with default", mm("RequireTest.FieldA", 2, "RequireTest.FieldC", 3), ""},
{"good require E with default", mm("RequireTest.FieldA", 2, "RequireTest.FieldC", 3), ""},
{"bad require", mm("RequireTest.FieldA", 1), "the group RequireTest is missing its required field FieldC"},
{"bad conflicts with A", mm("ConflictTest.FieldA", 2, "ConflictTest.FieldB", 3), "the group ConflictTest includes FieldA, which conflicts with FieldB"},
{"good conflicts with A", mm("ConflictTest.FieldA", 2), ""},
{"bad conflicts with A", mm("ConflictTest.FieldA", 2, "ConflictTest.FieldB", 3), "the group ConflictTest includes FieldA, which conflicts with FieldB"},
{"good conflicts with A", mm("ConflictTest.FieldA", 2), ""},
{"good slice elementType", mm("Traces.AStringArray", []any{"0.0.0.0:8080", "192.168.1.1:8080"}), ""},
{"bad slice elementType", mm("Traces.AStringArray", []any{"0.0.0.0"}), "field Traces.AStringArray[0] (0.0.0.0) must be a hostport: address 0.0.0.0: missing port in address"},
{"good map elementType", mm("Traces.AStringMap", map[string]any{"k": "v"}), ""},
Expand Down
15 changes: 8 additions & 7 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,16 @@ func (r *Router) LnS(incomingOrPeer string) {
}

r.iopLogger.Info().Logf("gRPC listening on %s", grpcAddr)
grpcConfig := r.Config.GetGRPCConfig()
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(GRPCMessageSizeMax), // default is math.MaxInt32
grpc.MaxRecvMsgSize(GRPCMessageSizeMax), // default is 4MB
grpc.MaxSendMsgSize(int(grpcConfig.MaxSendMsgSize)),
grpc.MaxRecvMsgSize(int(grpcConfig.MaxRecvMsgSize)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: r.Config.GetGRPCMaxConnectionIdle(),
MaxConnectionAge: r.Config.GetGRPCMaxConnectionAge(),
MaxConnectionAgeGrace: r.Config.GetGRPCMaxConnectionAgeGrace(),
Time: r.Config.GetGRPCKeepAlive(),
Timeout: r.Config.GetGRPCKeepAliveTimeout(),
MaxConnectionIdle: time.Duration(grpcConfig.MaxConnectionIdle),
MaxConnectionAge: time.Duration(grpcConfig.MaxConnectionAge),
MaxConnectionAgeGrace: time.Duration(grpcConfig.MaxConnectionAgeGrace),
Time: time.Duration(grpcConfig.KeepAlive),
Timeout: time.Duration(grpcConfig.KeepAliveTimeout),
}),
}
traceServer := NewTraceServer(r)
Expand Down

0 comments on commit 0a8f9cb

Please sign in to comment.