-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackground.go
254 lines (215 loc) · 6.83 KB
/
background.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package background
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"go.strv.io/background/observer"
"go.strv.io/background/task"
"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)
var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
)
// Manager keeps track of running goroutines and provides mechanisms to wait for them to finish or cancel their
// execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track
// of the tasks in the observer.
type Manager struct {
stalledThreshold time.Duration
observer observer.Observer
retry task.Retry
taskmgr taskmgr
loopmgr loopmgr
}
// Options provides a means for configuring the background manager and providing the observer to it.
type Options struct {
// StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note
// that no effort is made to actually stop or kill the task.
StalledThreshold time.Duration
// Observer allows you to register monitoring functions that are called when something happens with the tasks that you
// execute. These are useful for logging, monitoring, etc.
Observer observer.Observer
// Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several
// strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
Retry task.Retry
}
// NewManager creates a new instance of Manager with default options and no observer.
func NewManager() *Manager {
return NewManagerWithOptions(Options{})
}
// NewManagerWithOptions creates a new instance of Manager with the provided options and observer.
func NewManagerWithOptions(options Options) *Manager {
o := options.Observer
if o == nil {
o = observer.Default{}
}
return &Manager{
stalledThreshold: options.StalledThreshold,
retry: options.Retry,
observer: o,
loopmgr: mkloopmgr(),
}
}
// Run executes the provided function once in a goroutine.
func (m *Manager) Run(ctx context.Context, fn task.Fn) {
definition := task.Task{Fn: fn}
m.RunTask(ctx, definition)
}
// RunTask executes the provided task in a goroutine. The task will be executed according to its type; by default, only
// once (TaskTypeOneOff).
func (m *Manager) RunTask(ctx context.Context, definition task.Task) {
ctx = context.WithoutCancel(ctx)
done := make(chan error, 1)
m.observer.OnTaskAdded(ctx, definition)
switch definition.Type {
case task.TypeOneOff:
m.taskmgr.start()
go m.observe(ctx, definition, done)
go m.run(ctx, definition, done)
case task.TypeLoop:
m.loopmgr.start()
go m.loop(ctx, definition, done)
default:
m.observer.OnTaskFailed(ctx, definition, ErrUnknownType)
}
}
// Wait blocks until all running one-off tasks have finished. Adding more one-off tasks will prolong the wait time.
func (m *Manager) Wait() {
m.taskmgr.group.Wait()
}
// Cancel blocks until all loop tasks finish their current loop and stops looping further. The tasks' context is not
// cancelled. Adding a new loop task after calling Cancel() will cause the task to be ignored and not run.
func (m *Manager) Cancel() {
m.loopmgr.cancel()
}
// Close is a convenience method that calls Wait() and Cancel() in parallel. It blocks until all tasks have finished.
func (m *Manager) Close() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
m.Wait()
wg.Done()
}()
go func() {
m.Cancel()
wg.Done()
}()
wg.Wait()
}
// CountOf returns the number of tasks of the specified type that are currently running. When the TaskType is invalid it
// returns 0.
func (m *Manager) CountOf(t task.Type) int {
switch t {
case task.TypeOneOff:
return int(m.taskmgr.count.Load())
case task.TypeLoop:
return int(m.loopmgr.count.Load())
default:
return 0
}
}
func (m *Manager) run(ctx context.Context, definition task.Task, done chan<- error) {
strategies := mkstrategies(m.retry, definition.Retry)
done <- retry.Do(ctx, definition.Fn, strategies...)
}
func (m *Manager) loop(ctx context.Context, definition task.Task, done chan error) {
defer m.loopmgr.finish()
for {
if m.loopmgr.ctx.Err() != nil {
return
}
m.run(ctx, definition, done)
err := <-done
if err != nil {
m.observer.OnTaskFailed(ctx, definition, err)
}
}
}
func (m *Manager) observe(ctx context.Context, definition task.Task, done <-chan error) {
timeout := mktimeout(m.stalledThreshold)
defer m.taskmgr.finish()
for {
select {
case <-timeout:
m.observer.OnTaskStalled(ctx, definition)
case err := <-done:
if err != nil {
m.observer.OnTaskFailed(ctx, definition, err)
} else {
m.observer.OnTaskSucceeded(ctx, definition)
}
return
}
}
}
// MARK: Internal
// taskmgr is used internally for task tracking and synchronization.
type taskmgr struct {
group sync.WaitGroup
count atomic.Int32
}
// start tells the taskmgr that a new task has started.
func (m *taskmgr) start() {
m.group.Add(1)
m.count.Add(1)
}
// finish tells the taskmgr that a task has finished.
func (m *taskmgr) finish() {
m.group.Done()
m.count.Add(-1)
}
// loopmgr is used internally for loop tracking and synchronization and cancellation of the loops.
type loopmgr struct {
group sync.WaitGroup
count atomic.Int32
ctx context.Context
cancelfn context.CancelFunc
}
func mkloopmgr() loopmgr {
ctx, cancelfn := context.WithCancel(context.Background())
return loopmgr{
ctx: ctx,
cancelfn: cancelfn,
}
}
// start tells the loopmgr that a new loop has started.
func (m *loopmgr) start() {
m.group.Add(1)
m.count.Add(1)
}
// cancel tells the loopmgr that a loop has finished.
func (m *loopmgr) finish() {
m.group.Done()
m.count.Add(-1)
}
func (m *loopmgr) cancel() {
m.cancelfn()
m.group.Wait()
}
// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0,
// the channel will never receive any message.
func mktimeout(duration time.Duration) <-chan time.Time {
if duration == 0 {
return make(<-chan time.Time)
}
return time.After(duration)
}
// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides), 1))
if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}
// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}
return result
}