Skip to content

Commit

Permalink
Merge pull request #213 from moov-io/file-tracking-table
Browse files Browse the repository at this point in the history
pipeline: record accepted files in database table
  • Loading branch information
adamdecaf authored Nov 9, 2023
2 parents 7ea6a6f + a6d225d commit 47df8c1
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 6 deletions.
5 changes: 4 additions & 1 deletion internal/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/moov-io/achgateway"
"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/internal/files"
"github.com/moov-io/achgateway/internal/incoming/odfi"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/incoming/web"
Expand Down Expand Up @@ -160,8 +161,10 @@ func NewEnvironment(env *Environment) (*Environment, error) {
return env, fmt.Errorf("unable to create http files subscription: %v", err)
}

fileRepository := files.NewRepository(env.DB)
shardRepository := shards.NewRepository(env.DB, env.Config.Sharding.Mappings)
fileReceiver, err := pipeline.Start(ctx, env.Logger, env.Config, shardRepository, httpSub)

fileReceiver, err := pipeline.Start(ctx, env.Logger, env.Config, shardRepository, fileRepository, httpSub)
if err != nil {
return env, fmt.Errorf("unable to create file pipeline: %v", err)
}
Expand Down
15 changes: 15 additions & 0 deletions internal/files/mock_repo_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package files

import "context"

type MockRepository struct {
Err error
}

func (r *MockRepository) Record(_ context.Context, file AcceptedFile) error {
return r.Err
}

func (r *MockRepository) Cancel(_ context.Context, fileID string) error {
return r.Err
}
13 changes: 13 additions & 0 deletions internal/files/model_accepted_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package files

import "time"

type AcceptedFile struct {
FileID string
ShardKey string

Hostname string

AcceptedAt time.Time
CanceledAt time.Time
}
67 changes: 67 additions & 0 deletions internal/files/repo_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package files

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/moov-io/base/telemetry"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Repository interface {
Record(ctx context.Context, file AcceptedFile) error
Cancel(ctx context.Context, fileID string) error
}

func NewRepository(db *sql.DB) Repository {
if db == nil {
return &MockRepository{}
}
return &sqlRepository{db: db}
}

type sqlRepository struct {
db *sql.DB
}

func (r *sqlRepository) Record(ctx context.Context, file AcceptedFile) error {
ctx, span := telemetry.StartSpan(ctx, "files-record", trace.WithAttributes(
attribute.String("achgateway.file_id", file.FileID),
))
defer span.End()

qry := `INSERT INTO files (file_id, shard_key, hostname, accepted_at) VALUES (?,?,?,?);`
_, err := r.db.ExecContext(ctx, qry,
file.FileID,
file.ShardKey,
file.Hostname,
file.AcceptedAt,
)
if err != nil {
return fmt.Errorf("recording file failed: %w", err)
}
return nil
}

func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error {
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
attribute.String("achgateway.file_id", fileID),
))
defer span.End()

qry := `UPDATE files SET canceled_at = ? WHERE file_id = ? AND canceled_at IS NULL;`
_, err := r.db.ExecContext(ctx, qry,
// SET
time.Now().In(time.UTC),
// WHERE
fileID,
)
if err != nil {
return fmt.Errorf("saving file cancellation failed: %w", err)
}
return nil
}
58 changes: 58 additions & 0 deletions internal/files/repo_files_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package files

import (
"context"
"testing"
"time"

"github.com/moov-io/achgateway/internal/dbtest"
"github.com/moov-io/base"
"github.com/moov-io/base/database"

"github.com/stretchr/testify/require"
)

func TestRepository(t *testing.T) {
if testing.Short() {
t.Skip("-short flag was specified")
}

conf := dbtest.CreateTestDatabase(t, dbtest.LocalDatabaseConfig())
db := dbtest.LoadDatabase(t, conf)
require.NoError(t, db.Ping())

repo := NewRepository(db)
if _, ok := repo.(*sqlRepository); !ok {
t.Errorf("unexpected repository type: %T", repo)
}

ctx := context.Background()
fileID1 := base.ID()
accepted := AcceptedFile{
FileID: fileID1,
ShardKey: base.ID(),
Hostname: "achgateway-0",
AcceptedAt: time.Now(),
}

// Record
err := repo.Record(ctx, accepted)
require.NoError(t, err)

err = repo.Record(ctx, accepted)
require.ErrorContains(t, err, "Duplicate entry")
require.True(t, database.UniqueViolation(err))

// Second File
fileID2 := base.ID()
accepted.FileID = fileID2
err = repo.Record(ctx, accepted)
require.NoError(t, err)

// Cancel
err = repo.Cancel(ctx, fileID1)
require.NoError(t, err)

err = repo.Cancel(ctx, base.ID())
require.NoError(t, err)
}
31 changes: 29 additions & 2 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/internal/files"
"github.com/moov-io/achgateway/internal/incoming"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/service"
Expand Down Expand Up @@ -53,6 +56,8 @@ type FileReceiver struct {
shardRepository shards.Repository
shardAggregators map[string]*aggregator

fileRepository files.Repository

httpFiles stream.Subscription
streamFiles stream.Subscription

Expand All @@ -65,6 +70,7 @@ func newFileReceiver(
eventEmitter events.Emitter,
shardRepository shards.Repository,
shardAggregators map[string]*aggregator,
fileRepository files.Repository,
httpFiles stream.Subscription,
transformConfig *models.TransformConfig,
) (*FileReceiver, error) {
Expand All @@ -76,6 +82,7 @@ func newFileReceiver(
defaultShardName: cfg.Sharding.Default,
shardRepository: shardRepository,
shardAggregators: shardAggregators,
fileRepository: fileRepository,
httpFiles: httpFiles,
transformConfig: transformConfig,
}
Expand Down Expand Up @@ -429,6 +436,7 @@ func (fr *FileReceiver) processACHFile(ctx context.Context, file incoming.ACHFil
return nil
}

// Pull Aggregator from the config
agg := fr.getAggregator(ctx, file.ShardKey)
if agg == nil {
return fmt.Errorf("no aggregator for shard key %s found", file.ShardKey)
Expand All @@ -439,6 +447,19 @@ func (fr *FileReceiver) processACHFile(ctx context.Context, file incoming.ACHFil
"shardName": log.String(agg.shard.Name),
"shardKey": log.String(file.ShardKey),
})

// We only want to handle files once, so become the winner by saving the record.
hostname, _ := os.Hostname()
err = fr.fileRepository.Record(ctx, files.AcceptedFile{
FileID: file.FileID,
ShardKey: file.ShardKey,
Hostname: hostname,
AcceptedAt: time.Now().In(time.UTC),
})
if err != nil {
logger.Warn().LogErrorf("not handling received ACH file: %v", err)
return nil
}
logger.Log("begin handling of received ACH file")

err = agg.acceptFile(ctx, file)
Expand All @@ -458,6 +479,7 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel
return errors.New("missing fileID or shardKey")
}

// Get the Aggregator from the config
agg := fr.getAggregator(ctx, cancel.ShardKey)
if agg == nil {
return nil
Expand All @@ -468,11 +490,16 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel
"shardName": log.String(agg.shard.Name),
"shardKey": log.String(cancel.ShardKey),
})

// Record the cancellation
err := fr.fileRepository.Cancel(ctx, cancel.FileID)
if err != nil {
return logger.Error().LogErrorf("problem recording cancellation: %v", err).Err()
}
logger.Log("begin canceling ACH file")

evt := incoming.CancelACHFile(*cancel)

err := agg.cancelFile(ctx, evt)
err = agg.cancelFile(ctx, evt)
if err != nil {
return logger.Error().LogErrorf("problem canceling file: %v", err).Err()
}
Expand Down
5 changes: 4 additions & 1 deletion internal/pipeline/file_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/internal/files"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/incoming/stream/streamtest"
"github.com/moov-io/achgateway/internal/service"
Expand Down Expand Up @@ -114,8 +115,10 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
shardRepo := shards.NewInMemoryRepository()
shardRepo.Add(service.ShardMapping{ShardKey: "testing", ShardName: "testing"}, database.NopInTx)

fileRepo := &files.MockRepository{}

filesTopic, _ := streamtest.InmemStream(t)
fileReceiver, err := Start(ctx, logger, conf, shardRepo, nil)
fileReceiver, err := Start(ctx, logger, conf, shardRepo, fileRepo, nil)
require.NoError(t, err)
t.Cleanup(func() { fileReceiver.Shutdown() })

Expand Down
4 changes: 3 additions & 1 deletion internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/internal/files"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/achgateway/internal/shards"
Expand All @@ -34,6 +35,7 @@ func Start(
logger log.Logger,
cfg *service.Config,
shardRepository shards.Repository,
fileRepository files.Repository,
httpFiles stream.Subscription,
) (*FileReceiver, error) {

Expand Down Expand Up @@ -62,7 +64,7 @@ func Start(
if cfg.Inbound.Kafka != nil && cfg.Inbound.Kafka.Transform != nil {
transformConfig = cfg.Inbound.Kafka.Transform
}
receiver, err := newFileReceiver(logger, cfg, eventEmitter, shardRepository, shardAggregators, httpFiles, transformConfig)
receiver, err := newFileReceiver(logger, cfg, eventEmitter, shardRepository, shardAggregators, fileRepository, httpFiles, transformConfig)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion internal/test/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/admintest"
"github.com/moov-io/achgateway/internal/files"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/incoming/stream/streamtest"
"github.com/moov-io/achgateway/internal/incoming/web"
Expand Down Expand Up @@ -170,7 +171,9 @@ func TestUploads(t *testing.T) {
fileController.AppendRoutes(r)

outboundPath := setupTestDirectory(t, uploadConf)
fileReceiver, err := pipeline.Start(ctx, logger, uploadConf, shardRepo, httpSub)
fileRepo := &files.MockRepository{}

fileReceiver, err := pipeline.Start(ctx, logger, uploadConf, shardRepo, fileRepo, httpSub)
require.NoError(t, err)
t.Cleanup(func() { fileReceiver.Shutdown() })

Expand Down
7 changes: 7 additions & 0 deletions migrations/002_files.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE files(
file_id VARCHAR(128) PRIMARY KEY,
shard_key VARCHAR(50) NOT NULL,
hostname VARCHAR(100) NOT NULL,
accepted_at TIMESTAMP NOT NULL,
canceled_at TIMESTAMP
);

0 comments on commit 47df8c1

Please sign in to comment.