diff --git a/client.go b/client.go index 8b5fda0..6e13c90 100644 --- a/client.go +++ b/client.go @@ -34,7 +34,7 @@ type Client[T any] interface { // MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) // RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). - RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) + RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error) // GetMessage get a specific message from a DynamoDB-based queue. GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error) // GetQueueStats is a method for obtaining statistical information about a DynamoDB-based queue. @@ -529,17 +529,14 @@ type RedriveMessageInput struct { ID string } -type RedriveMessageOutput struct { - ID string `json:"id"` - Status Status `json:"status"` - UpdatedAt string `json:"updated_at"` - Version int `json:"version"` +type RedriveMessageOutput[T any] struct { + RedroveMessage *Message[T] } // RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). // It locates the message based on the specified message ID and marks it as restored from the DLQ to the standard queue. // This process is essential for reprocessing messages that have failed to be processed and is a crucial function in error handling within the message queue system. -func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) { +func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error) { if params == nil { params = &RedriveMessageInput{} } @@ -547,15 +544,15 @@ func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessa ID: params.ID, }) if err != nil { - return &RedriveMessageOutput{}, err + return &RedriveMessageOutput[T]{}, err } if retrieved.Message == nil { - return &RedriveMessageOutput{}, &IDNotFoundError{} + return &RedriveMessageOutput[T]{}, &IDNotFoundError{} } message := retrieved.Message err = message.markAsRestoredFromDLQ(c.clock.Now()) if err != nil { - return &RedriveMessageOutput{}, err + return &RedriveMessageOutput[T]{}, err } builder := expression.NewBuilder(). WithUpdate(expression.Add( @@ -582,13 +579,10 @@ func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessa } updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr) if err != nil { - return &RedriveMessageOutput{}, err + return &RedriveMessageOutput[T]{}, err } - return &RedriveMessageOutput{ - ID: updated.ID, - Status: updated.GetStatus(c.clock.Now()), - UpdatedAt: updated.UpdatedAt, - Version: updated.Version, + return &RedriveMessageOutput[T]{ + RedroveMessage: updated, }, nil } diff --git a/client_test.go b/client_test.go index 88df662..d96a927 100644 --- a/client_test.go +++ b/client_test.go @@ -486,7 +486,7 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) { type args struct { id string } - tests := []ClientTestCase[args, *dynamomq.RedriveMessageOutput]{ + tests := []ClientTestCase[args, *dynamomq.RedriveMessageOutput[test.MessageData]]{ { name: "should succeed when id is found and status is ready", setup: NewSetupFunc(newPutRequestWithDLQItem("A-101", test.DefaultTestDate)), @@ -496,11 +496,13 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) { args: args{ id: "A-101", }, - want: &dynamomq.RedriveMessageOutput{ - ID: "A-101", - Status: dynamomq.StatusReady, - UpdatedAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(10 * time.Second)), - Version: 2, + want: &dynamomq.RedriveMessageOutput[test.MessageData]{ + RedroveMessage: func() *dynamomq.Message[test.MessageData] { + m := NewTestMessageItemAsDLQ("A-101", test.DefaultTestDate) + MarkAsRestoredFromDLQ(m, test.DefaultTestDate.Add(10*time.Second)) + m.Version = 2 + return m + }(), }, }, { @@ -512,7 +514,7 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) { args: args{ id: "A-101", }, - want: &dynamomq.RedriveMessageOutput{}, + want: &dynamomq.RedriveMessageOutput[test.MessageData]{}, wantErr: dynamomq.InvalidStateTransitionError{ Msg: "can only redrive messages from DLQ", Operation: "mark as restored from DLQ", @@ -534,7 +536,7 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) { args: args{ id: "A-101", }, - want: &dynamomq.RedriveMessageOutput{}, + want: &dynamomq.RedriveMessageOutput[test.MessageData]{}, wantErr: dynamomq.InvalidStateTransitionError{ Msg: "can only redrive messages from READY", Operation: "mark as restored from DLQ", @@ -542,8 +544,8 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) { }, }, } - runTestsParallel[args, *dynamomq.RedriveMessageOutput](t, "RedriveMessage()", tests, - func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.RedriveMessageOutput, error) { + runTestsParallel[args, *dynamomq.RedriveMessageOutput[test.MessageData]](t, "RedriveMessage()", tests, + func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.RedriveMessageOutput[test.MessageData], error) { return client.RedriveMessage(context.Background(), &dynamomq.RedriveMessageInput{ ID: args.id, }) diff --git a/dynamomq_test.go b/dynamomq_test.go index 74105e4..42784ac 100644 --- a/dynamomq_test.go +++ b/dynamomq_test.go @@ -26,6 +26,16 @@ func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) { m.InvisibleUntilAt = "" } +func MarkAsRestoredFromDLQ[T any](m *dynamomq.Message[T], now time.Time) { + ts := clock.FormatRFC3339Nano(now) + m.QueueType = dynamomq.QueueTypeStandard + m.ReceiveCount = 0 + m.UpdatedAt = ts + m.SentAt = ts + m.ReceivedAt = "" + m.InvisibleUntilAt = "" +} + func NewTestMessageItemAsReady(id string, now time.Time) *dynamomq.Message[test.MessageData] { return dynamomq.NewMessage[test.MessageData](id, test.NewMessageData(id), now) } diff --git a/internal/mock/mock.go b/internal/mock/mock.go index 900617e..68c963f 100644 --- a/internal/mock/mock.go +++ b/internal/mock/mock.go @@ -17,7 +17,7 @@ type Client[T any] struct { ChangeMessageVisibilityFunc func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error) DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error) - RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) + RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[T], error) GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error) GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) GetDLQStatsFunc func(ctx context.Context, params *dynamomq.GetDLQStatsInput) (*dynamomq.GetDLQStatsOutput, error) @@ -60,7 +60,7 @@ func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMe return nil, ErrNotImplemented } -func (m Client[T]) RedriveMessage(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) { +func (m Client[T]) RedriveMessage(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[T], error) { if m.RedriveMessageFunc != nil { return m.RedriveMessageFunc(ctx, params) } @@ -124,8 +124,8 @@ var SuccessfulMockClient = &Client[any]{ MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) { return &dynamomq.MoveMessageToDLQOutput[any]{}, nil }, - RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) { - return &dynamomq.RedriveMessageOutput{}, nil + RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[any], error) { + return &dynamomq.RedriveMessageOutput[any]{}, nil }, GetMessageFunc: func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[any], error) { return &dynamomq.GetMessageOutput[any]{