-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathqueue.go
91 lines (80 loc) · 1.51 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package prosumer
import (
"context"
"sync"
"time"
)
type queue struct {
ch chan Element
rp RejectPolicy
waitClose *sync.WaitGroup
}
func (q *queue) enqueue(ctx context.Context, e Element) ([]Element, error) {
select {
case q.ch <- e:
return nil, nil
default:
switch q.rp {
case Block:
select {
case <-ctx.Done():
return []Element{e}, ctx.Err()
case q.ch <- e:
return nil, nil
}
case Discard:
return []Element{e}, ErrDiscard
case DiscardOldest:
var discarded []Element
for {
// when discard the oldest, other worker may preempt the slot,
// so may discard more than one element.
if v, ok := <-q.ch; ok {
discarded = append(discarded, v)
}
select {
case q.ch <- e:
return discarded, ErrDiscardOldest
case <-ctx.Done():
return discarded, ctx.Err()
default:
// default is required for nonblocking read
}
}
}
}
return nil, nil
}
func (q *queue) dequeue() (Element, bool) {
select {
case v, ok := <-q.ch:
return v, ok
default:
return nil, false
}
}
func (q *queue) size() int {
return len(q.ch)
}
func (q *queue) cap() int {
return cap(q.ch)
}
func (q *queue) close(graceful bool) {
close(q.ch)
if graceful {
for {
if q.size() == 0 {
break
}
time.Sleep(50 * time.Millisecond)
}
q.waitClose.Done()
}
}
func newQueue(bufferSize int, rp RejectPolicy, waitClose *sync.WaitGroup) *queue {
return &queue{
ch: make(chan Element, bufferSize),
rp: rp,
waitClose: waitClose,
}
}