-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtypes.go
120 lines (97 loc) · 2.37 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package prosumer
import (
"errors"
"log"
"time"
)
var (
ErrDiscard = errors.New("discard current element")
ErrDiscardOldest = errors.New("discard oldest element")
)
// Consumer process elements from queue
type Element interface{}
type Consumer func(lst []Element) error
type Callback func([]Element, error)
// rejectPolicy control which elements get discarded when the queue is full
type RejectPolicy int
const (
// Block current goroutine, no elements discarded
Block RejectPolicy = iota
// Discard current element
Discard
// DiscardOldest remove the oldest to make room for new element
DiscardOldest
)
// Config defines params for Coordinator
type Config struct {
bufferSize int
rp RejectPolicy
batchSize int
batchInterval time.Duration
cb Callback
consumer Consumer
numConsumer int
}
// SetBufferSize defines inner buffer queue's size
func SetBufferSize(bufferSize int) Option {
return func(config *Config) {
config.bufferSize = bufferSize
}
}
// SetRejectPolicy defines which elements get discarded when the queue is full
func SetRejectPolicy(rp RejectPolicy) Option {
return func(config *Config) {
config.rp = rp
}
}
func SetBatchSize(batchSize int) Option {
return func(config *Config) {
config.batchSize = batchSize
}
}
func SetBatchInterval(interval time.Duration) Option {
return func(config *Config) {
config.batchInterval = interval
}
}
// SetCallback defines callback invoked with elements and err returned from consumer
func SetCallback(cb Callback) Option {
return func(config *Config) {
config.cb = cb
}
}
func SetNumConsumer(numConsumer int) Option {
return func(config *Config) {
config.numConsumer = numConsumer
}
}
// Option constructs a Config
type Option func(*Config)
// NewConfig returns a config with well-defined defaults
// Warn: default rejectPolicy is Block.
func NewConfig(c Consumer, opts ...Option) Config {
conf := Config{consumer: c}
for _, opt := range opts {
opt(&conf)
}
if conf.bufferSize == 0 {
conf.bufferSize = 10000
}
if conf.batchSize == 0 {
conf.batchSize = 100
}
if conf.batchInterval == 0 {
conf.batchInterval = 500 * time.Millisecond
}
if conf.cb == nil {
conf.cb = func(lst []Element, err error) {
if err != nil {
log.Printf("consumer failed. list: %v, err: %v", lst, err)
}
}
}
if conf.numConsumer == 0 {
conf.numConsumer = 2
}
return conf
}