forked from mna/redisc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.go
393 lines (341 loc) · 9.82 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package redisc
import (
"errors"
"math/rand"
"strconv"
"sync"
"time"
"github.com/garyburd/redigo/redis"
)
const hashSlots = 16384
// Cluster manages a redis cluster. If the CreatePool field is not nil,
// a redis.Pool is used for each node in the cluster to get connections
// via Get. If it is nil or if Dial is called, redis.Dial
// is used to get the connection.
type Cluster struct {
// StartupNodes is the list of initial nodes that make up
// the cluster. The values are expected as "address:port"
// (e.g.: "127.0.0.1:6379"). Only master nodes should be
// specified.
StartupNodes []string
// DialOptions is the list of options to set on each new connection.
DialOptions []redis.DialOption
// CreatePool is the function to call to create a redis.Pool for
// the specified TCP address, using the provided options
// as set in DialOptions. If this field is not nil, a
// redis.Pool is created for each node in the cluster and the
// pool is used to manage the connections returned by Get.
CreatePool func(address string, options ...redis.DialOption) (*redis.Pool, error)
mu sync.Mutex // protects following fields
err error // broken connection error
pools map[string]*redis.Pool // created pools per node
masters map[string]bool // set of known active master nodes, kept up-to-date
replicas map[string]bool // set of known active replica nodes, kept up-to-date
mapping [hashSlots][]string // hash slot number to master and replica(s) server addresses, master is always at [0]
refreshing bool // indicates if there's a refresh in progress
}
// Refresh updates the cluster's internal mapping of hash slots
// to redis node. It calls CLUSTER SLOTS on each known node until one
// of them succeeds.
//
// It should typically be called after creating the Cluster and before
// using it. The cluster automatically keeps its mapping up-to-date
// afterwards, based on the redis commands' MOVED responses.
func (c *Cluster) Refresh() error {
c.mu.Lock()
err := c.err
if err == nil {
c.refreshing = true
}
c.mu.Unlock()
if err != nil {
return err
}
return c.refresh()
}
func (c *Cluster) refresh() error {
addrs := c.getNodeAddrs(false)
for _, addr := range addrs {
m, err := c.getClusterSlots(addr)
if err == nil {
// succeeded, save as mapping
c.mu.Lock()
// mark all current nodes as false
for k := range c.masters {
c.masters[k] = false
}
for k := range c.replicas {
c.replicas[k] = false
}
for _, sm := range m {
for i, node := range sm.nodes {
if node != "" {
target := c.masters
if i > 0 {
target = c.replicas
}
target[node] = true
}
}
for ix := sm.start; ix <= sm.end; ix++ {
c.mapping[ix] = sm.nodes
}
}
// remove all nodes that are gone from the cluster
for _, nodes := range []map[string]bool{c.masters, c.replicas} {
for k, ok := range nodes {
if !ok {
delete(nodes, k)
// close and remove all existing pools for removed nodes
if p := c.pools[k]; p != nil {
p.Close()
delete(c.pools, k)
}
}
}
}
// mark that no refresh is needed until another MOVED
c.refreshing = false
c.mu.Unlock()
return nil
}
}
// reset the refreshing flag
c.mu.Lock()
c.refreshing = false
c.mu.Unlock()
return errors.New("redisc: all nodes failed")
}
// needsRefresh handles automatic update of the mapping.
func (c *Cluster) needsRefresh(re *RedirError) {
c.mu.Lock()
if re != nil {
// update the mapping only if the address has changed, so that if
// a READONLY replica read returns a MOVED to a master, it doesn't
// overwrite that slot's replicas by setting just the master (i.e. this
// is not a MOVED because the cluster is updating, it is a MOVED
// because the replica cannot serve that key). Same goes for a request
// to a random connection that gets a MOVED, should not overwrite
// the moved-to slot's configuration if the master's address is the same.
if current := c.mapping[re.NewSlot]; len(current) == 0 || current[0] != re.Addr {
c.mapping[re.NewSlot] = []string{re.Addr}
}
}
if !c.refreshing {
// refreshing is reset to only once the goroutine has
// finished updating the mapping, so a new refresh goroutine
// will only be started if none is running.
c.refreshing = true
go c.refresh()
}
c.mu.Unlock()
}
type slotMapping struct {
start, end int
nodes []string // master is always at [0]
}
func (c *Cluster) getClusterSlots(addr string) ([]slotMapping, error) {
conn, err := c.getConnForAddr(addr, false)
if err != nil {
return nil, err
}
defer conn.Close()
vals, err := redis.Values(conn.Do("CLUSTER", "SLOTS"))
if err != nil {
return nil, err
}
m := make([]slotMapping, 0, len(vals))
for len(vals) > 0 {
var slotRange []interface{}
vals, err = redis.Scan(vals, &slotRange)
if err != nil {
return nil, err
}
var start, end int
slotRange, err = redis.Scan(slotRange, &start, &end)
if err != nil {
return nil, err
}
sm := slotMapping{start: start, end: end}
// store the master address and all replicas
for len(slotRange) > 0 {
var nodes []interface{}
slotRange, err = redis.Scan(slotRange, &nodes)
if err != nil {
return nil, err
}
var addr string
var port int
if _, err = redis.Scan(nodes, &addr, &port); err != nil {
return nil, err
}
sm.nodes = append(sm.nodes, addr+":"+strconv.Itoa(port))
}
m = append(m, sm)
}
return m, nil
}
func (c *Cluster) getConnForAddr(addr string, forceDial bool) (redis.Conn, error) {
// non-pooled doesn't require a lock
if c.CreatePool == nil || forceDial {
return redis.Dial("tcp", addr, c.DialOptions...)
}
c.mu.Lock()
p := c.pools[addr]
if p == nil {
c.mu.Unlock()
pool, err := c.CreatePool(addr, c.DialOptions...)
if err != nil {
return nil, err
}
c.mu.Lock()
// check again, concurrent request may have set the pool in the meantime
if p = c.pools[addr]; p == nil {
if c.pools == nil {
c.pools = make(map[string]*redis.Pool, len(c.StartupNodes))
}
c.pools[addr] = pool
p = pool
} else {
// Don't assume CreatePool just returned the pool struct, it may have
// used a connection or something - always match CreatePool with Close.
// Do it in a defer to keep lock time short.
defer pool.Close()
}
}
c.mu.Unlock()
conn := p.Get()
return conn, conn.Err()
}
var errNoNodeForSlot = errors.New("redisc: no node for slot")
func (c *Cluster) getConnForSlot(slot int, forceDial, readOnly bool) (redis.Conn, string, error) {
c.mu.Lock()
addrs := c.mapping[slot]
c.mu.Unlock()
if len(addrs) == 0 {
return nil, "", errNoNodeForSlot
}
// mapping slices are never altered, they are replaced when refreshing
// or on a MOVED response, so it's non-racy to read them outside the lock.
addr := addrs[0]
if readOnly && len(addrs) > 1 {
// get the address of a replica
if len(addrs) == 2 {
addr = addrs[1]
} else {
rnd.Lock()
ix := rnd.Intn(len(addrs) - 1)
rnd.Unlock()
addr = addrs[ix+1] // +1 because 0 is the master
}
} else {
readOnly = false
}
conn, err := c.getConnForAddr(addr, forceDial)
if err == nil && readOnly {
conn.Do("READONLY")
}
return conn, addr, err
}
// a *rand.Rand is not safe for concurrent access
var rnd = struct {
sync.Mutex
*rand.Rand
}{Rand: rand.New(rand.NewSource(time.Now().UnixNano()))}
func (c *Cluster) getRandomConn(forceDial, readOnly bool) (redis.Conn, string, error) {
addrs := c.getNodeAddrs(readOnly)
rnd.Lock()
perms := rnd.Perm(len(addrs))
rnd.Unlock()
for _, ix := range perms {
addr := addrs[ix]
conn, err := c.getConnForAddr(addr, forceDial)
if err == nil {
if readOnly {
conn.Do("READONLY")
}
return conn, addr, nil
}
}
return nil, "", errors.New("redisc: failed to get a connection")
}
func (c *Cluster) getConn(preferredSlot int, forceDial, readOnly bool) (conn redis.Conn, addr string, err error) {
if preferredSlot >= 0 {
conn, addr, err = c.getConnForSlot(preferredSlot, forceDial, readOnly)
if err == errNoNodeForSlot {
c.needsRefresh(nil)
}
}
if preferredSlot < 0 || err != nil {
conn, addr, err = c.getRandomConn(forceDial, readOnly)
}
return conn, addr, err
}
func (c *Cluster) getNodeAddrs(preferReplicas bool) []string {
c.mu.Lock()
// populate nodes lazily, only once
if c.masters == nil {
c.masters = make(map[string]bool)
c.replicas = make(map[string]bool)
// StartupNodes should be masters
for _, n := range c.StartupNodes {
c.masters[n] = true
}
}
from := c.masters
if preferReplicas && len(c.replicas) > 0 {
from = c.replicas
}
// grab a slice of addresses
addrs := make([]string, 0, len(from))
for addr := range from {
addrs = append(addrs, addr)
}
c.mu.Unlock()
return addrs
}
// Dial returns a connection the same way as Get, but
// it guarantees that the connection will not be managed by the
// pool, even if CreatePool is set. The actual returned
// type is *Conn, see its documentation for details.
func (c *Cluster) Dial() (redis.Conn, error) {
c.mu.Lock()
err := c.err
c.mu.Unlock()
if err != nil {
return nil, err
}
return &Conn{
cluster: c,
forceDial: true,
}, nil
}
// Get returns a redis.Conn interface that can be used to call
// redis commands on the cluster. The application must close the
// returned connection. The actual returned type is *Conn,
// see its documentation for details.
func (c *Cluster) Get() redis.Conn {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return &Conn{
cluster: c,
err: err,
}
}
// Close releases the resources used by the cluster. It closes all the
// pools that were created, if any.
func (c *Cluster) Close() error {
c.mu.Lock()
err := c.err
if err == nil {
c.err = errors.New("redisc: closed")
for _, p := range c.pools {
if e := p.Close(); e != nil && err == nil {
err = e
}
}
}
c.mu.Unlock()
return err
}