Skip to content

Commit

Permalink
Add Amazon DynamoDB deduplication storage implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
maestre3d committed Apr 24, 2023
1 parent 176f2d9 commit c98b191
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 7 deletions.
1 change: 1 addition & 0 deletions driver/dynamodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
shared-local-instance.db
2 changes: 2 additions & 0 deletions driver/dynamodb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
run-integration-tests:
go test -v ./... -coverprofile coverage.out -tags=integration
63 changes: 58 additions & 5 deletions driver/dynamodb/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Streams Driver for Amazon DynamoDB

The **stream driver** for `Amazon DynamoDB` offers a `Writer` implementation to be used by systems implementing the
The **stream driver** for `Amazon DynamoDB` which offers a deduplication storage implementation to ensure idempotency
for message processing.

It is planned to offer in a near future a `Writer` implementation to be used by systems implementing the
_**transactional outbox**_ messaging pattern.

Moreover, the `Message Egress Proxy` (_aka. log trailing_) component could be used along this driver to
Expand All @@ -10,14 +13,64 @@ Furthermore, `Amazon DynamoDB` has a **_Change-Data-Capture stream feature_** re
services such as _Lambda and Kinesis_. Thus, this feature could be combined along the `Message Egress Proxy` component
in order to stream messages into desired infrastructure non-supported by `Amazon DynamoDB Stream` feature.

## Requirements
## Deduplication Storage Requirements

In order for this driver to work, the database MUST have a deduplication table with the following schema.

```json
{
"TableName": "deduplication-table",
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "worker_id"
},
{
"KeyType": "RANGE",
"AttributeName": "message_id"
}
],
"AttributeDefinitions": [
{
"AttributeName": "worker_id",
"AttributeType": "S"
},
{
"AttributeName": "message_id",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
```

In order for this driver to work, the database MUST have an outbox table with the following schema
(_called streams_egress by default_).
## Transactional Outbox Requirements

In order for this driver to work, the database MUST have an outbox table with the following schema.

```json
{
"fields": {}
"TableName": "deduplication-table",
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "worker_id"
},
{
"KeyType": "RANGE",
"AttributeName": "message_id"
}
],
"AttributeDefinitions": [
{
"AttributeName": "worker_id",
"AttributeType": "S"
},
{
"AttributeName": "message_id",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
```
102 changes: 102 additions & 0 deletions driver/dynamodb/deduplication_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package dynamodb

import (
"context"
"log"
"os"

"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/aws/aws-sdk-go-v2/aws"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"

"github.com/alexandria-oss/streams"
)

// DeduplicationStorageConfig is the configuration schema for Amazon DynamoDB streams.DeduplicationStorage implementation.
type DeduplicationStorageConfig struct {
TableName string
Logger *log.Logger
ErrorLogger *log.Logger
}

// DeduplicationStorage is the Amazon DynamoDB streams.DeduplicationStorage
type DeduplicationStorage struct {
client *dynamodb.Client
cfg DeduplicationStorageConfig
tableRef *string
}

var _ streams.DeduplicationStorage = DeduplicationStorage{}

func NewDeduplicationStorage(cfg DeduplicationStorageConfig, client *dynamodb.Client) DeduplicationStorage {
if cfg.Logger == nil || cfg.ErrorLogger == nil {
logger := log.New(os.Stdout, "streams.dynamodb: ", 0)
if cfg.Logger == nil {
cfg.Logger = logger
}
if cfg.ErrorLogger == nil {
cfg.ErrorLogger = logger
}
}
return DeduplicationStorage{
client: client,
cfg: cfg,
tableRef: aws.String(cfg.TableName),
}
}

func (d DeduplicationStorage) Commit(ctx context.Context, workerID, messageID string) {
_, err := d.client.PutItem(ctx, &dynamodb.PutItemInput{
Item: map[string]types.AttributeValue{
"message_id": &types.AttributeValueMemberS{
Value: messageID,
},
"worker_id": &types.AttributeValueMemberS{
Value: workerID,
},
},
TableName: d.tableRef,
ConditionExpression: nil,
ConditionalOperator: "",
Expected: nil,
ExpressionAttributeNames: nil,
ExpressionAttributeValues: nil,
ReturnConsumedCapacity: "",
ReturnItemCollectionMetrics: "",
ReturnValues: "",
})
if err != nil {
d.cfg.ErrorLogger.Printf("failed to commit message, error %s", err.Error())
return
}

d.cfg.Logger.Printf("committed message with id <%s> and worker id <%s>", workerID, messageID)
}

func (d DeduplicationStorage) IsDuplicated(ctx context.Context, workerID, messageID string) (bool, error) {
out, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{
Key: map[string]types.AttributeValue{
"message_id": &types.AttributeValueMemberS{
Value: messageID,
},
"worker_id": &types.AttributeValueMemberS{
Value: workerID,
},
},
TableName: d.tableRef,
AttributesToGet: nil,
ConsistentRead: nil,
ExpressionAttributeNames: nil,
ProjectionExpression: nil,
ReturnConsumedCapacity: "",
})

if err != nil {
d.cfg.ErrorLogger.Printf("failed to get message commit, error %s", err.Error())
return false, err
}

return len(out.Item) >= 2, nil
}
115 changes: 115 additions & 0 deletions driver/dynamodb/deduplication_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//go:build integration

package dynamodb_test

import (
"context"
"strings"
"testing"

streamsdynamo "github.com/alexandria-oss/streams/driver/dynamodb"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/suite"
)

type dedupeStorageSuit struct {
suite.Suite
client *dynamodb.Client
tableName string
}

func TestDeduplicationStorage(t *testing.T) {
suite.Run(t, &dedupeStorageSuit{})
}

func (s *dedupeStorageSuit) SetupSuite() {
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("fake", "fake", "TOKEN")),
config.WithRegion("us-east-1"),
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: "http://localhost:8001",
HostnameImmutable: false,
PartitionID: "aws",
SigningName: "",
SigningRegion: "us-east-1",
SigningMethod: "",
Source: 0,
}, nil
})),
)
s.Require().NoError(err)
s.client = dynamodb.NewFromConfig(cfg)
s.tableName = "deduplication-storage"
s.runMigrations()
}

func (s *dedupeStorageSuit) runMigrations() {
_, err := s.client.CreateTable(context.TODO(), &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("message_id"),
AttributeType: "S",
},
{
AttributeName: aws.String("worker_id"),
AttributeType: "S",
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("message_id"),
KeyType: "HASH",
},
{
AttributeName: aws.String("worker_id"),
KeyType: "RANGE",
},
},
TableName: aws.String(s.tableName),
BillingMode: "",
GlobalSecondaryIndexes: nil,
LocalSecondaryIndexes: nil,
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
SSESpecification: nil,
StreamSpecification: nil,
TableClass: "",
Tags: nil,
})
if err != nil && !strings.Contains(err.Error(), "ResourceInUseException") {
s.Fail(err.Error())
}
}

func (s *dedupeStorageSuit) TearDownSuite() {
_, err := s.client.DeleteTable(context.TODO(), &dynamodb.DeleteTableInput{
TableName: aws.String(s.tableName),
})
s.Assert().NoError(err)
}

func (s *dedupeStorageSuit) TestStorage() {
dedupeStorage := streamsdynamo.NewDeduplicationStorage(streamsdynamo.DeduplicationStorageConfig{
TableName: s.tableName,
Logger: nil,
ErrorLogger: nil,
}, s.client)
worker := "worker-0"
messageID := "123"
isDupe, err := dedupeStorage.IsDuplicated(context.TODO(), worker, messageID)
s.Assert().Nil(err)
s.Assert().False(isDupe)

dedupeStorage.Commit(context.TODO(), worker, messageID)
isDupe, err = dedupeStorage.IsDuplicated(context.TODO(), worker, messageID)
s.Assert().NoError(err)
s.Assert().True(isDupe)
}
11 changes: 11 additions & 0 deletions driver/dynamodb/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.8'
services:
dynamodb-local:
command: "-jar DynamoDBLocal.jar -sharedDb -optimizeDbBeforeStartup -dbPath ./data"
image: "amazon/dynamodb-local:latest"
container_name: dynamodb-local
ports:
- "8001:8000"
volumes:
- "./:/home/dynamodblocal/data"
working_dir: /home/dynamodblocal
41 changes: 41 additions & 0 deletions driver/dynamodb/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,44 @@
module github.com/alexandria-oss/streams/driver/dynamodb

go 1.18

replace github.com/alexandria-oss/streams => ../../

require (
github.com/alexandria-oss/streams v0.0.1-alpha.7
github.com/aws/aws-sdk-go-v2 v1.17.8
github.com/aws/aws-sdk-go-v2/config v1.18.21
github.com/aws/aws-sdk-go-v2/credentials v1.13.20
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.5
github.com/stretchr/testify v1.8.2
)

require (
github.com/allegro/bigcache/v3 v3.1.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.9 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
google.golang.org/protobuf v1.29.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit c98b191

Please sign in to comment.