diff --git a/components/cqrs/command_processor_test.go b/components/cqrs/command_processor_test.go index 4064e78d..3ac8a465 100644 --- a/components/cqrs/command_processor_test.go +++ b/components/cqrs/command_processor_test.go @@ -178,8 +178,10 @@ func TestCommandProcessor_multiple_same_command_handlers(t *testing.T) { } type mockSubscriber struct { - MessagesToSend []*message.Message - out chan *message.Message + MessagesToSend []*message.Message + WaitForAckBeforeSendingNext bool + + out chan *message.Message } func (m *mockSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { @@ -188,6 +190,10 @@ func (m *mockSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *m go func() { for _, msg := range m.MessagesToSend { m.out <- msg + + if m.WaitForAckBeforeSendingNext { + <-msg.Acked() + } } }() diff --git a/components/cqrs/event_processor_group_test.go b/components/cqrs/event_processor_group_test.go index 48f7880e..f1676f8e 100644 --- a/components/cqrs/event_processor_group_test.go +++ b/components/cqrs/event_processor_group_test.go @@ -298,6 +298,7 @@ func TestEventProcessor_handler_group(t *testing.T) { msg1, msg2, }, + WaitForAckBeforeSendingNext: true, } var handlersCalls []int