-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriber_scheduler.go
127 lines (114 loc) · 3.78 KB
/
subscriber_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package streams
import (
"context"
"sync"
"github.com/hashicorp/go-multierror"
)
// A SubscriberScheduler is a high-level component used to manage and schedule Reader tasks.
//
// Zero value is NOT ready to use.
type SubscriberScheduler struct {
reader Reader
eventReg EventRegistry
reg []*ReadTask
baseCtx context.Context
baseCtxCancel context.CancelFunc
inFlightWorkers sync.WaitGroup
isShutdown bool
}
// NewSubscriberScheduler allocates a new SubscriberScheduler instance ready to be used.
// Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func NewSubscriberScheduler(r Reader, eventReg EventRegistry) SubscriberScheduler {
return SubscriberScheduler{
reader: r,
eventReg: eventReg,
reg: make([]*ReadTask, 0),
baseCtx: nil,
baseCtxCancel: nil,
inFlightWorkers: sync.WaitGroup{},
}
}
// SubscribeTopic registers a stream reading job to a specific topic.
// Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (r *SubscriberScheduler) SubscribeTopic(topic string, handler ReaderHandleFunc) *ReadTask {
task := &ReadTask{
Stream: topic,
Handler: handler,
}
r.reg = append(r.reg, task)
return task
}
// Subscribe registers a stream reading job using Event registered topic from EventRegistry.
// This routine will append a new entry to EventRegistry if Event was not found at first try, automating
// event-topic registration.
//
// Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (r *SubscriberScheduler) Subscribe(topic string, event Event, handler ReaderHandleFunc) *ReadTask {
topicEvent, err := r.eventReg.GetEventTopic(event)
if err != nil {
r.eventReg.RegisterEvent(event, topic)
topicEvent = topic
}
task := &ReadTask{
Stream: topicEvent,
Handler: handler,
}
r.reg = append(r.reg, task)
return task
}
// SubscribeEvent registers a stream reading job using Event registered topic from EventRegistry.
// This routine will panic if Event was not previously registered.
//
// Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (r *SubscriberScheduler) SubscribeEvent(event Event, handler ReaderHandleFunc) *ReadTask {
task, err := r.SubscribeEventSafe(event, handler)
if err != nil {
panic(err)
}
return task
}
// SubscribeEventSafe registers a stream reading job using Event registered topic from EventRegistry.
// Returns ErrEventNotFound if Event was not previously registered.
func (r *SubscriberScheduler) SubscribeEventSafe(event Event, handler ReaderHandleFunc) (*ReadTask, error) {
topic, err := r.eventReg.GetEventTopic(event)
if err != nil {
return nil, err
}
task := &ReadTask{
Stream: topic,
Handler: handler,
}
r.reg = append(r.reg, task)
return task, nil
}
// Start schedules and spins up a worker for each registered ReadTask(s).
func (r *SubscriberScheduler) Start() error {
if r.isShutdown {
return ErrBusIsShutdown
}
errs := &multierror.Error{}
errMu := sync.Mutex{}
r.baseCtx, r.baseCtxCancel = context.WithCancel(context.Background())
r.inFlightWorkers.Add(len(r.reg))
for _, readerTask := range r.reg {
go func(task ReadTask) {
defer r.inFlightWorkers.Done()
if errRead := r.reader.Read(r.baseCtx, task); errRead != nil {
errMu.Lock()
errs = multierror.Append(errRead, errs)
errMu.Unlock()
}
}(*readerTask)
}
return errs.ErrorOrNil()
}
// Shutdown triggers graceful shutdown of running ReadTask(s) worker(s). This routine will block I/O until
// all workers have been properly shutdown.
func (r *SubscriberScheduler) Shutdown() error {
r.isShutdown = true
if r.baseCtxCancel != nil {
r.baseCtxCancel()
}
r.inFlightWorkers.Wait()
return nil
}