forked from mna/redisc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry_conn.go
133 lines (113 loc) · 3.29 KB
/
retry_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package redisc
import (
"errors"
"time"
"github.com/garyburd/redigo/redis"
)
// RetryConn wraps the connection c (which must be a *Conn)
// into a connection that automatically handles cluster redirections
// (MOVED and ASK replies) and retries for TRYAGAIN errors.
// Only Do, Close and Err can be called on that connection,
// all other methods return an error.
//
// The maxAtt parameter indicates the maximum number of attempts
// to successfully execute the command. The tryAgainDelay is the
// duration to wait before retrying a TRYAGAIN error.
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
cc, ok := c.(*Conn)
if !ok {
return nil, errors.New("redisc: connection is not a *Conn")
}
return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
}
type retryConn struct {
c *Conn
maxAttempts int
tryAgainDelay time.Duration
}
func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
return rc.do(cmd, args...)
}
func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
var att int
var asking bool
cluster := rc.c.cluster
for rc.maxAttempts <= 0 || att < rc.maxAttempts {
if asking {
if err := rc.c.Send("ASKING"); err != nil {
return nil, err
}
asking = false
}
v, err := rc.c.Do(cmd, args...)
re := ParseRedir(err)
if re == nil {
if IsTryAgain(err) {
// handle retry
time.Sleep(rc.tryAgainDelay)
att++
continue
}
// not a retry error nor a redirection, return result
return v, err
}
// handle redirection
rc.c.mu.Lock()
readOnly := rc.c.readOnly
connAddr := rc.c.boundAddr
rc.c.mu.Unlock()
if readOnly {
// check if the connection was already made to that slot, meaning
// that the redirection is because the command can't be served
// by the replica and a non-readonly connection must be made to
// the slot's master. If that's not the case, then keep the
// readonly flag to true, meaning that it will attempt a connection
// to a replica for the new slot.
cluster.mu.Lock()
slotMappings := cluster.mapping[re.NewSlot]
cluster.mu.Unlock()
if isIn(slotMappings, connAddr) {
readOnly = false
}
}
// forceDial doesn't require locking (immutable)
conn, addr, err := cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
if err != nil {
// could not get connection to that node, return that error
return nil, err
}
rc.c.mu.Lock()
// close and replace the old connection (close must come before assignments)
rc.c.closeLocked()
rc.c.rc = conn
rc.c.boundAddr = addr
rc.c.readOnly = readOnly
rc.c.mu.Unlock()
asking = re.Type == "ASK"
att++
}
return nil, errors.New("redisc: too many attempts")
}
func (rc *retryConn) Err() error {
return rc.c.Err()
}
func (rc *retryConn) Close() error {
return rc.c.Close()
}
func (rc *retryConn) Send(cmd string, args ...interface{}) error {
return errors.New("redisc: unsupported call to Send")
}
func (rc *retryConn) Receive() (interface{}, error) {
return nil, errors.New("redisc: unsupported call to Receive")
}
func (rc *retryConn) Flush() error {
return errors.New("redisc: unsupported call to Flush")
}
func isIn(list []string, v string) bool {
for _, vv := range list {
if v == vv {
return true
}
}
return false
}