-
Notifications
You must be signed in to change notification settings - Fork 103
/
Copy pathtask.go
119 lines (99 loc) · 2.64 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package taskq
import (
"context"
"fmt"
"time"
)
var unknownTaskOpt *TaskOptions
func init() {
SetUnknownTaskOptions(&TaskOptions{
Name: "unknown",
})
}
func SetUnknownTaskOptions(opt *TaskOptions) {
opt.init()
unknownTaskOpt = opt
}
type TaskOptions struct {
// Task name.
Name string
// Function called to process a message.
// There are three permitted types of signature:
// 1. A zero-argument function
// 2. A function whose arguments are assignable in type from those which are passed in the message
// 3. A function which takes a single `*Message` argument
// The handler function may also optionally take a Context as a first argument and may optionally return an error.
// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
Handler interface{}
// Function called to process failed message after the specified number of retries have all failed.
// The FallbackHandler accepts the same types of function as the Handler.
FallbackHandler interface{}
// Optional function used by Consumer with defer statement
// to recover from panics.
DeferFunc func()
// Number of tries/releases after which the message fails permanently
// and is deleted.
// Default is 64 retries.
RetryLimit int
// Minimum backoff time between retries.
// Default is 30 seconds.
MinBackoff time.Duration
// Maximum backoff time between retries.
// Default is 30 minutes.
MaxBackoff time.Duration
inited bool
}
func (opt *TaskOptions) init() {
if opt.inited {
return
}
opt.inited = true
if opt.Name == "" {
panic("TaskOptions.Name is required")
}
if opt.RetryLimit == 0 {
opt.RetryLimit = 64
}
if opt.MinBackoff == 0 {
opt.MinBackoff = 30 * time.Second
}
if opt.MaxBackoff == 0 {
opt.MaxBackoff = 30 * time.Minute
}
}
type Task struct {
opt *TaskOptions
handler Handler
fallbackHandler Handler
}
func RegisterTask(opt *TaskOptions) *Task {
task, err := Tasks.Register(opt)
if err != nil {
panic(err)
}
return task
}
func (t *Task) Name() string {
return t.opt.Name
}
func (t *Task) String() string {
return fmt.Sprintf("task=%q", t.Name())
}
func (t *Task) Options() *TaskOptions {
return t.opt
}
func (t *Task) HandleMessage(msg *Message) error {
if msg.Err != nil {
if t.fallbackHandler != nil {
return t.fallbackHandler.HandleMessage(msg)
}
return nil
}
return t.handler.HandleMessage(msg)
}
func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message {
msg := NewMessage(ctx, args...)
msg.TaskName = t.opt.Name
return msg
}