-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathgcp_multiendpoint.go
408 lines (368 loc) · 12.7 KB
/
gcp_multiendpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcgcp
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
)
var gmeCounter uint32
type contextMEKey int
var meKey contextMEKey
// NewMEContext returns a new Context that carries Multiendpoint name.
func NewMEContext(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, meKey, name)
}
// FromMEContext returns the MultiEndpoint name stored in ctx, if any.
func FromMEContext(ctx context.Context) (string, bool) {
name, ok := ctx.Value(meKey).(string)
return name, ok
}
// GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection.
//
// The purposes of GCPMultiEndpoint are:
//
// - Fallback to an alternative endpoint (host:port) of a gRPC service when the original
// endpoint is completely unavailable.
// - Be able to route an RPC call to a specific group of endpoints.
// - Be able to reconfigure endpoints in runtime.
//
// A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints
// where priority is defined by the position in the list with the first endpoint having top
// priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an
// RPC call, it picks the top priority endpoint that is currently available. More information on the
// [multiendpoint.MultiEndpoint].
//
// GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary
// string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name
// can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext].
//
// GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration.
// An updated configuration can be provided at any time later using [UpdateMultiEndpoints].
//
// Example:
//
// Let's assume we have a service with read and write operations and the following backends:
//
// - service.example.com -- the main set of backends supporting all operations
// - service-fallback.example.com -- read-write replica supporting all operations
// - ro-service.example.com -- read-only replica supporting only read operations
//
// Example configuration:
//
// - MultiEndpoint named "default" with endpoints:
//
// 1. service.example.com:443
//
// 2. service-fallback.example.com:443
//
// - MultiEndpoint named "read" with endpoints:
//
// 1. ro-service.example.com:443
//
// 2. service-fallback.example.com:443
//
// 3. service.example.com:443
//
// With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by
// default. It means that RPC calls by default will use the main endpoint and if it is not available
// then the read-write replica.
//
// To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the
// context. Then these calls will use the read-only replica endpoint and if it is not available
// then the read-write replica and if it is also not available then the main endpoint.
//
// GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique
// endpoint. For the example above three connection pools will be created.
//
// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
// as a [grpc.ClientConn] when creating gRPC clients.
type GCPMultiEndpoint struct {
mu sync.RWMutex
defaultName string
mes map[string]multiendpoint.MultiEndpoint
pools map[string]*monitoredConn
opts []grpc.DialOption
gcpConfig *pb.ApiConfig
dialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
log grpclog.LoggerV2
grpc.ClientConnInterface
}
// Make sure GCPMultiEndpoint implements grpc.ClientConnInterface.
var _ grpc.ClientConnInterface = (*GCPMultiEndpoint)(nil)
func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...)
}
func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...)
}
func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn {
name, ok := FromMEContext(ctx)
me, ook := gme.mes[name]
if !ok || !ook {
me = gme.mes[gme.defaultName]
}
return gme.pools[me.Current()].conn
}
func (gme *GCPMultiEndpoint) Close() error {
var errs multiError
for e, mc := range gme.pools {
mc.stopMonitoring()
if err := mc.conn.Close(); err != nil {
errs = append(errs, err)
gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
if gme.log.V(FINE) {
gme.log.Infof("closed channel pool for %q endpoint.", e)
}
}
return errs.Combine()
}
func (gme *GCPMultiEndpoint) GCPConfig() *pb.ApiConfig {
return proto.Clone(gme.gcpConfig).(*pb.ApiConfig)
}
// GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client
// connection.
type GCPMultiEndpointOptions struct {
// Regular gRPC-GCP configuration to be applied to every endpoint.
GRPCgcpConfig *pb.ApiConfig
// Map of MultiEndpoints where key is the MultiEndpoint name.
MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions
// Name of the default MultiEndpoint.
Default string
// Func to dial grpc ClientConn.
DialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
}
// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client
// connection.
//
// Deprecated: use NewGCPMultiEndpoint.
func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) {
return NewGCPMultiEndpoint(meOpts, opts...)
}
// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client
// connection.
//
// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
// as a [grpc.ClientConn] when creating gRPC clients.
func NewGCPMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) {
// Read config, create multiendpoints and pools.
o, err := makeOpts(meOpts, opts)
if err != nil {
return nil, err
}
gme := &GCPMultiEndpoint{
mes: make(map[string]multiendpoint.MultiEndpoint),
pools: make(map[string]*monitoredConn),
defaultName: meOpts.Default,
opts: o,
gcpConfig: proto.Clone(meOpts.GRPCgcpConfig).(*pb.ApiConfig),
dialFunc: meOpts.DialFunc,
log: NewGCPLogger(compLogger, fmt.Sprintf("[GCPMultiEndpoint #%d]", atomic.AddUint32(&gmeCounter, 1))),
}
if gme.dialFunc == nil {
gme.dialFunc = func(_ context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.Dial(target, opts...)
}
}
if err := gme.UpdateMultiEndpoints(meOpts); err != nil {
return nil, err
}
return gme, nil
}
func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) {
grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig)
if err != nil {
return nil, err
}
o := append([]grpc.DialOption{}, opts...)
o = append(o, []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))),
grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor),
grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor),
}...)
return o, nil
}
type monitoredConn struct {
endpoint string
conn *grpc.ClientConn
gme *GCPMultiEndpoint
cancel context.CancelFunc
}
func newMonitoredConn(endpoint string, conn *grpc.ClientConn, gme *GCPMultiEndpoint) (mc *monitoredConn) {
ctx, cancel := context.WithCancel(context.Background())
mc = &monitoredConn{
endpoint: endpoint,
conn: conn,
gme: gme,
cancel: cancel,
}
go mc.monitor(ctx)
return
}
func (mc *monitoredConn) notify(state connectivity.State) {
if mc.gme.log.V(FINE) {
mc.gme.log.Infof("%q endpoint state changed to %v", mc.endpoint, state)
}
// Inform all multiendpoints.
mc.gme.mu.RLock()
for _, me := range mc.gme.mes {
me.SetEndpointAvailability(mc.endpoint, state == connectivity.Ready)
}
mc.gme.mu.RUnlock()
}
func (mc *monitoredConn) monitor(ctx context.Context) {
for {
currentState := mc.conn.GetState()
mc.notify(currentState)
if !mc.conn.WaitForStateChange(ctx, currentState) {
break
}
}
}
func (mc *monitoredConn) stopMonitoring() {
mc.cancel()
}
// UpdateMultiEndpoints reconfigures MultiEndpoints.
//
// MultiEndpoints are matched with the current ones by name.
//
// - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be
// removed.
// - A new MultiEndpoint will be created for every new name in the list.
// - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout
// change).
//
// Endpoints are matched by the endpoint address (usually in the form of address:port).
//
// - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the
// connection poll for this endpoint will be shutdown.
// - A connection pool will be created for every new endpoint.
// - For an existing endpoint nothing will change (the connection pool will not be re-created,
// thus no connection credentials change, nor connection configuration change).
func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error {
gme.mu.Lock()
defer gme.mu.Unlock()
if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok {
return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default)
}
validPools := make(map[string]bool)
for _, meo := range meOpts.MultiEndpoints {
for _, e := range meo.Endpoints {
validPools[e] = true
}
}
// Add missing pools.
for e := range validPools {
if _, ok := gme.pools[e]; !ok {
// This creates a ClientConn with the gRPC-GCP balancer managing connection pool.
conn, err := gme.dialFunc(context.Background(), e, gme.opts...)
if err != nil {
return err
}
if gme.log.V(FINE) {
gme.log.Infof("created new channel pool for %q endpoint.", e)
}
gme.pools[e] = newMonitoredConn(e, conn, gme)
}
}
// Add new multi-endpoints and update existing.
for name, meo := range meOpts.MultiEndpoints {
if me, ok := gme.mes[name]; ok {
// Updating existing MultiEndpoint.
me.SetEndpoints(meo.Endpoints)
continue
}
// Add new MultiEndpoint.
if gme.log.V(FINE) {
gme.log.Infof("creating new %q multiendpoint.", name)
}
me, err := multiendpoint.NewMultiEndpoint(meo)
if err != nil {
return err
}
gme.mes[name] = me
}
gme.defaultName = meOpts.Default
// Remove obsolete MultiEndpoints.
for name := range gme.mes {
if _, ok := meOpts.MultiEndpoints[name]; !ok {
delete(gme.mes, name)
if gme.log.V(FINE) {
gme.log.Infof("removed obsolete %q multiendpoint.", name)
}
}
}
// Remove obsolete pools.
for e, mc := range gme.pools {
if _, ok := validPools[e]; !ok {
if err := mc.conn.Close(); err != nil {
gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
if gme.log.V(FINE) {
gme.log.Infof("closed channel pool for %q endpoint.", e)
}
mc.stopMonitoring()
delete(gme.pools, e)
}
}
// Trigger status update.
for e, mc := range gme.pools {
s := mc.conn.GetState()
for _, me := range gme.mes {
me.SetEndpointAvailability(e, s == connectivity.Ready)
}
}
return nil
}
type multiError []error
func (m multiError) Error() string {
s, n := "", 0
for _, e := range m {
if e != nil {
if n == 0 {
s = e.Error()
}
n++
}
}
switch n {
case 0:
return "(0 errors)"
case 1:
return s
case 2:
return s + " (and 1 other error)"
}
return fmt.Sprintf("%s (and %d other errors)", s, n-1)
}
func (m multiError) Combine() error {
if len(m) == 0 {
return nil
}
return m
}