Skip to content

Commit

Permalink
refactor: change MoveMessageToDLQOutput to be generic
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 10, 2023
1 parent e10c0b0 commit 91874f3
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 49 deletions.
35 changes: 13 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Client[T any] interface {
// DeleteMessage deletes a specific message from a DynamoDB-based queue.
DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)
// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error)
MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error)
// GetMessage get a specific message from a DynamoDB-based queue.
Expand Down Expand Up @@ -475,37 +475,31 @@ type MoveMessageToDLQInput struct {
ID string
}

type MoveMessageToDLQOutput struct {
ID string `json:"id"`
Status Status `json:"status"`
UpdatedAt string `json:"updated_at"`
Version int `json:"version"`
type MoveMessageToDLQOutput[T any] struct {
MovedMessage *Message[T]
}

// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
// It locates the message based on the specified message ID and marks it for the DLQ.
// Moving a message to the DLQ allows for the isolation of failed message processing, facilitating later analysis and reprocessing.
func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error) {
func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) {
if params == nil {
params = &MoveMessageToDLQInput{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &MoveMessageToDLQOutput{}, err
return &MoveMessageToDLQOutput[T]{}, err
}
if retrieved.Message == nil {
return &MoveMessageToDLQOutput{}, &IDNotFoundError{}
return &MoveMessageToDLQOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
if markedErr := message.markAsMovedToDLQ(c.clock.Now()); markedErr != nil {
//lint:ignore nilerr reason
return &MoveMessageToDLQOutput{
ID: params.ID,
Status: message.GetStatus(c.clock.Now()),
UpdatedAt: message.UpdatedAt,
Version: message.Version,
return &MoveMessageToDLQOutput[T]{
MovedMessage: message,
}, nil
}
builder := expression.NewBuilder().
Expand All @@ -515,22 +509,19 @@ func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessag
Set(expression.Name("queue_type"), expression.Value(message.QueueType)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("sent_at"), expression.Value(message.SentAt)).
Set(expression.Name("received_at"), expression.Value(message.SentAt)).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
return &MoveMessageToDLQOutput{}, BuildingExpressionError{Cause: err}
return &MoveMessageToDLQOutput[T]{}, BuildingExpressionError{Cause: err}
}
updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr)
if err != nil {
return &MoveMessageToDLQOutput{}, err
return &MoveMessageToDLQOutput[T]{}, err
}
return &MoveMessageToDLQOutput{
ID: params.ID,
Status: updated.GetStatus(c.clock.Now()),
UpdatedAt: updated.UpdatedAt,
Version: updated.Version,
return &MoveMessageToDLQOutput[T]{
MovedMessage: updated,
}, nil
}

Expand Down
24 changes: 9 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,20 +438,17 @@ func TestDynamoMQClientDeleteMessage(t *testing.T) {

func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) {
t.Parallel()
tests := []ClientTestCase[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput]{
tests := []ClientTestCase[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput[test.MessageData]]{
{
name: "should succeed when id is found and queue type is standard",
setup: NewSetupFunc(newPutRequestWithDLQItem("A-101", test.DefaultTestDate)),
args: &dynamomq.MoveMessageToDLQInput{
ID: "A-101",
},
want: func() *dynamomq.MoveMessageToDLQOutput {
want: func() *dynamomq.MoveMessageToDLQOutput[test.MessageData] {
s := NewTestMessageItemAsDLQ("A-101", test.DefaultTestDate)
r := &dynamomq.MoveMessageToDLQOutput{
ID: s.ID,
Status: dynamomq.StatusReady,
UpdatedAt: s.UpdatedAt,
Version: s.Version,
r := &dynamomq.MoveMessageToDLQOutput[test.MessageData]{
MovedMessage: s,
}
return r
}(),
Expand All @@ -466,23 +463,20 @@ func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) {
args: &dynamomq.MoveMessageToDLQInput{
ID: "A-101",
},
want: func() *dynamomq.MoveMessageToDLQOutput {
want: func() *dynamomq.MoveMessageToDLQOutput[test.MessageData] {
m := NewTestMessageItemAsReady("A-101", test.DefaultTestDate)
MarkAsMovedToDLQ(m, test.DefaultTestDate.Add(10*time.Second))
m.Version = 2
r := &dynamomq.MoveMessageToDLQOutput{
ID: m.ID,
Status: dynamomq.StatusReady,
UpdatedAt: m.UpdatedAt,
Version: m.Version,
r := &dynamomq.MoveMessageToDLQOutput[test.MessageData]{
MovedMessage: m,
}
return r
}(),
wantErr: nil,
},
}
runTestsParallel[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput](t, "MoveMessageToDLQ()", tests,
func(client dynamomq.Client[test.MessageData], args *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) {
runTestsParallel[*dynamomq.MoveMessageToDLQInput, *dynamomq.MoveMessageToDLQOutput[test.MessageData]](t, "MoveMessageToDLQ()", tests,
func(client dynamomq.Client[test.MessageData], args *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[test.MessageData], error) {
return client.MoveMessageToDLQ(context.Background(), args)
})
}
Expand Down
9 changes: 3 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,15 @@ func NewClientForConsumerTest(queue, dlq chan *dynamomq.Message[test.MessageData
}
return &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{}, nil
},
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) {
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[test.MessageData], error) {
if cfg.SimulateMoveMessageToDLQError {
return nil, test.ErrTest
}
v, _ := store.Load(params.ID)
msg, _ := v.(*dynamomq.Message[test.MessageData])
dlq <- msg
return &dynamomq.MoveMessageToDLQOutput{
ID: msg.ID,
Status: dynamomq.StatusReady,
UpdatedAt: msg.UpdatedAt,
Version: 2,
return &dynamomq.MoveMessageToDLQOutput[test.MessageData]{
MovedMessage: msg,
}, nil
},
}
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd/interactive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func TestRunInteractiveFailReturnError(t *testing.T) {
func TestRunInteractiveInvalidShouldReturnError(t *testing.T) {
c := &cmd.Interactive{
Client: mock.Client[any]{
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) {
return &dynamomq.MoveMessageToDLQOutput{}, nil
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) {
return &dynamomq.MoveMessageToDLQOutput[any]{}, nil
},
GetQueueStatsFunc: func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) {
return &dynamomq.GetQueueStatsOutput{}, test.ErrTest
Expand Down
8 changes: 4 additions & 4 deletions internal/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Client[T any] struct {
ReceiveMessageFunc func(ctx context.Context, params *dynamomq.ReceiveMessageInput) (*dynamomq.ReceiveMessageOutput[T], error)
ChangeMessageVisibilityFunc func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error)
DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error)
MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error)
MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error)
RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error)
GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error)
GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error)
Expand Down Expand Up @@ -53,7 +53,7 @@ func (m Client[T]) DeleteMessage(ctx context.Context, params *dynamomq.DeleteMes
return nil, ErrNotImplemented
}

func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) {
func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error) {
if m.MoveMessageToDLQFunc != nil {
return m.MoveMessageToDLQFunc(ctx, params)
}
Expand Down Expand Up @@ -121,8 +121,8 @@ var SuccessfulMockClient = &Client[any]{
DeleteMessageFunc: func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) {
return &dynamomq.DeleteMessageOutput{}, nil
},
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) {
return &dynamomq.MoveMessageToDLQOutput{}, nil
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) {
return &dynamomq.MoveMessageToDLQOutput[any]{}, nil
},
RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) {
return &dynamomq.RedriveMessageOutput{}, nil
Expand Down

0 comments on commit 91874f3

Please sign in to comment.