Skip to content

Commit

Permalink
feat: Add concurrency option to ConsumerOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 4, 2023
1 parent 20d5f48 commit 3fee838
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
46 changes: 33 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
defaultPollingInterval = time.Second
defaultMaximumReceives = 0 // unlimited
defaultQueueType = QueueTypeStandard
defaultConcurrency = 3
)

func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) {
Expand All @@ -22,6 +23,12 @@ func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions)
}
}

func WithConcurrency(concurrency int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.Concurrency = concurrency
}
}

func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.MaximumReceives = maximumReceives
Expand All @@ -48,6 +55,7 @@ func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) {

type ConsumerOptions struct {
PollingInterval time.Duration
Concurrency int
MaximumReceives int
QueueType QueueType
// errorLog specifies an optional logger for errors accepting
Expand All @@ -61,6 +69,7 @@ type ConsumerOptions struct {
func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] {
o := &ConsumerOptions{
PollingInterval: defaultPollingInterval,
Concurrency: defaultConcurrency,
MaximumReceives: defaultMaximumReceives,
QueueType: defaultQueueType,
}
Expand All @@ -71,6 +80,7 @@ func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ..
client: client,
messageProcessor: processor,
pollingInterval: o.PollingInterval,
concurrency: o.Concurrency,
maximumReceives: o.MaximumReceives,
queueType: o.QueueType,
errorLog: o.ErrorLog,
Expand All @@ -96,12 +106,12 @@ func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error {
type Consumer[T any] struct {
client Client[T]
messageProcessor MessageProcessor[T]

pollingInterval time.Duration
maximumReceives int
queueType QueueType
errorLog *log.Logger
onShutdown []func()
concurrency int
pollingInterval time.Duration
maximumReceives int
queueType QueueType
errorLog *log.Logger
onShutdown []func()

inShutdown int32
mu sync.Mutex
Expand All @@ -113,10 +123,22 @@ type Consumer[T any] struct {
var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed")

func (c *Consumer[T]) StartConsuming() error {
msgChan := make(chan *Message[T], c.concurrency)
defer close(msgChan)

for i := 0; i < c.concurrency; i++ {
go func() {
for msg := range msgChan {
c.trackAndProcessMessage(context.Background(), msg)
}
}()
}

for {
ctx := context.Background()
r, err := c.client.ReceiveMessage(ctx, &ReceiveMessageInput{
QueueType: c.queueType,
QueueType: c.queueType,
VisibilityTimeout: DefaultVisibilityTimeoutInSeconds,
})
if err != nil {
if c.shuttingDown() {
Expand All @@ -128,9 +150,9 @@ func (c *Consumer[T]) StartConsuming() error {
time.Sleep(c.pollingInterval)
continue
}
go c.trackAndProcessMessage(ctx, r.PeekedMessageObject)
time.Sleep(c.pollingInterval)
msgChan <- r.PeekedMessageObject
}

}

func (c *Consumer[T]) trackAndProcessMessage(ctx context.Context, msg *Message[T]) {
Expand Down Expand Up @@ -196,10 +218,8 @@ func (c *Consumer[T]) trackMessage(msg *Message[T], add bool) {
c.mu.Lock()
defer c.mu.Unlock()
if add {
if !c.shuttingDown() {
c.activeMessages[msg] = struct{}{}
c.activeMessagesWG.Add(1)
}
c.activeMessages[msg] = struct{}{}
c.activeMessagesWG.Add(1)
} else {
delete(c.activeMessages, msg)
c.activeMessagesWG.Done()
Expand Down
2 changes: 2 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestConsumerStartConsuming(t *testing.T) {
}
consumer := dynamomq.NewConsumer[test.MessageData](client, processor,
dynamomq.WithPollingInterval(0),
dynamomq.WithConcurrency(3),
dynamomq.WithMaximumReceives(tt.MaximumReceives),
dynamomq.WithQueueType(tt.QueueType),
dynamomq.WithErrorLog(log.New(os.Stderr, "", 0)),
Expand Down Expand Up @@ -190,6 +191,7 @@ func TestConsumerShutdown(t *testing.T) {
}
consumer := dynamomq.NewConsumer[test.MessageData](client, processor,
dynamomq.WithPollingInterval(0),
dynamomq.WithConcurrency(3),
dynamomq.WithMaximumReceives(1),
dynamomq.WithQueueType(dynamomq.QueueTypeStandard),
dynamomq.WithErrorLog(log.New(os.Stderr, "", 0)),
Expand Down

0 comments on commit 3fee838

Please sign in to comment.