From 91874f3789e4f1cd74089e571ab64986974fc303 Mon Sep 17 00:00:00 2001 From: vvatanabe Date: Sun, 10 Dec 2023 20:50:10 +0900 Subject: [PATCH] refactor: change MoveMessageToDLQOutput to be generic --- client.go | 35 ++++++++++++-------------------- client_test.go | 24 ++++++++-------------- consumer_test.go | 9 +++----- internal/cmd/interactive_test.go | 4 ++-- internal/mock/mock.go | 8 ++++---- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/client.go b/client.go index eee1f7d..8b5fda0 100644 --- a/client.go +++ b/client.go @@ -32,7 +32,7 @@ type Client[T any] interface { // DeleteMessage deletes a specific message from a DynamoDB-based queue. DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error) // MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). - MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error) + MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) // RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) // GetMessage get a specific message from a DynamoDB-based queue. @@ -475,17 +475,14 @@ type MoveMessageToDLQInput struct { ID string } -type MoveMessageToDLQOutput struct { - ID string `json:"id"` - Status Status `json:"status"` - UpdatedAt string `json:"updated_at"` - Version int `json:"version"` +type MoveMessageToDLQOutput[T any] struct { + MovedMessage *Message[T] } // MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). // It locates the message based on the specified message ID and marks it for the DLQ. // Moving a message to the DLQ allows for the isolation of failed message processing, facilitating later analysis and reprocessing. -func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error) { +func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) { if params == nil { params = &MoveMessageToDLQInput{} } @@ -493,19 +490,16 @@ func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessag ID: params.ID, }) if err != nil { - return &MoveMessageToDLQOutput{}, err + return &MoveMessageToDLQOutput[T]{}, err } if retrieved.Message == nil { - return &MoveMessageToDLQOutput{}, &IDNotFoundError{} + return &MoveMessageToDLQOutput[T]{}, &IDNotFoundError{} } message := retrieved.Message if markedErr := message.markAsMovedToDLQ(c.clock.Now()); markedErr != nil { //lint:ignore nilerr reason - return &MoveMessageToDLQOutput{ - ID: params.ID, - Status: message.GetStatus(c.clock.Now()), - UpdatedAt: message.UpdatedAt, - Version: message.Version, + return &MoveMessageToDLQOutput[T]{ + MovedMessage: message, }, nil } builder := expression.NewBuilder(). @@ -515,22 +509,19 @@ func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessag Set(expression.Name("queue_type"), expression.Value(message.QueueType)). Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)). Set(expression.Name("sent_at"), expression.Value(message.SentAt)). - Set(expression.Name("received_at"), expression.Value(message.SentAt)). + Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)). Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { - return &MoveMessageToDLQOutput{}, BuildingExpressionError{Cause: err} + return &MoveMessageToDLQOutput[T]{}, BuildingExpressionError{Cause: err} } updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr) if err != nil { - return &MoveMessageToDLQOutput{}, err + return &MoveMessageToDLQOutput[T]{}, err } - return &MoveMessageToDLQOutput{ - ID: params.ID, - Status: updated.GetStatus(c.clock.Now()), - UpdatedAt: updated.UpdatedAt, - Version: updated.Version, + return &MoveMessageToDLQOutput[T]{ + MovedMessage: updated, }, nil } diff --git a/client_test.go b/client_test.go index ceee1bd..88df662 100644 --- a/client_test.go +++ b/client_test.go @@ -438,20 +438,17 @@ func TestDynamoMQClientDeleteMessage(t *testing.T) { func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) { t.Parallel() - tests := []ClientTestCase[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput]{ + tests := []ClientTestCase[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput[test.MessageData]]{ { name: "should succeed when id is found and queue type is standard", setup: NewSetupFunc(newPutRequestWithDLQItem("A-101", test.DefaultTestDate)), args: &dynamomq.MoveMessageToDLQInput{ ID: "A-101", }, - want: func() *dynamomq.MoveMessageToDLQOutput { + want: func() *dynamomq.MoveMessageToDLQOutput[test.MessageData] { s := NewTestMessageItemAsDLQ("A-101", test.DefaultTestDate) - r := &dynamomq.MoveMessageToDLQOutput{ - ID: s.ID, - Status: dynamomq.StatusReady, - UpdatedAt: s.UpdatedAt, - Version: s.Version, + r := &dynamomq.MoveMessageToDLQOutput[test.MessageData]{ + MovedMessage: s, } return r }(), @@ -466,23 +463,20 @@ func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) { args: &dynamomq.MoveMessageToDLQInput{ ID: "A-101", }, - want: func() *dynamomq.MoveMessageToDLQOutput { + want: func() *dynamomq.MoveMessageToDLQOutput[test.MessageData] { m := NewTestMessageItemAsReady("A-101", test.DefaultTestDate) MarkAsMovedToDLQ(m, test.DefaultTestDate.Add(10*time.Second)) m.Version = 2 - r := &dynamomq.MoveMessageToDLQOutput{ - ID: m.ID, - Status: dynamomq.StatusReady, - UpdatedAt: m.UpdatedAt, - Version: m.Version, + r := &dynamomq.MoveMessageToDLQOutput[test.MessageData]{ + MovedMessage: m, } return r }(), wantErr: nil, }, } - runTestsParallel[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput](t, "MoveMessageToDLQ()", tests, - func(client dynamomq.Client[test.MessageData], args *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { + runTestsParallel[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput[test.MessageData]](t, "MoveMessageToDLQ()", tests, + func(client dynamomq.Client[test.MessageData], args *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[test.MessageData], error) { return client.MoveMessageToDLQ(context.Background(), args) }) } diff --git a/consumer_test.go b/consumer_test.go index 5e18ecc..d50d1a0 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -294,18 +294,15 @@ func NewClientForConsumerTest(queue, dlq chan *dynamomq.Message[test.MessageData } return &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{}, nil }, - MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { + MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[test.MessageData], error) { if cfg.SimulateMoveMessageToDLQError { return nil, test.ErrTest } v, _ := store.Load(params.ID) msg, _ := v.(*dynamomq.Message[test.MessageData]) dlq <- msg - return &dynamomq.MoveMessageToDLQOutput{ - ID: msg.ID, - Status: dynamomq.StatusReady, - UpdatedAt: msg.UpdatedAt, - Version: 2, + return &dynamomq.MoveMessageToDLQOutput[test.MessageData]{ + MovedMessage: msg, }, nil }, } diff --git a/internal/cmd/interactive_test.go b/internal/cmd/interactive_test.go index edafa0f..7b8c215 100644 --- a/internal/cmd/interactive_test.go +++ b/internal/cmd/interactive_test.go @@ -431,8 +431,8 @@ func TestRunInteractiveFailReturnError(t *testing.T) { func TestRunInteractiveInvalidShouldReturnError(t *testing.T) { c := &cmd.Interactive{ Client: mock.Client[any]{ - MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { - return &dynamomq.MoveMessageToDLQOutput{}, nil + MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) { + return &dynamomq.MoveMessageToDLQOutput[any]{}, nil }, GetQueueStatsFunc: func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) { return &dynamomq.GetQueueStatsOutput{}, test.ErrTest diff --git a/internal/mock/mock.go b/internal/mock/mock.go index 6965f74..900617e 100644 --- a/internal/mock/mock.go +++ b/internal/mock/mock.go @@ -16,7 +16,7 @@ type Client[T any] struct { ReceiveMessageFunc func(ctx context.Context, params *dynamomq.ReceiveMessageInput) (*dynamomq.ReceiveMessageOutput[T], error) ChangeMessageVisibilityFunc func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error) DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) - MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) + MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error) RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error) GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) @@ -53,7 +53,7 @@ func (m Client[T]) DeleteMessage(ctx context.Context, params *dynamomq.DeleteMes return nil, ErrNotImplemented } -func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { +func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error) { if m.MoveMessageToDLQFunc != nil { return m.MoveMessageToDLQFunc(ctx, params) } @@ -121,8 +121,8 @@ var SuccessfulMockClient = &Client[any]{ DeleteMessageFunc: func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) { return &dynamomq.DeleteMessageOutput{}, nil }, - MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { - return &dynamomq.MoveMessageToDLQOutput{}, nil + MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) { + return &dynamomq.MoveMessageToDLQOutput[any]{}, nil }, RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) { return &dynamomq.RedriveMessageOutput{}, nil