Skip to content

Commit

Permalink
upload: add OTEL tracing to agent upload/delete/read
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Nov 3, 2023
1 parent cd8a980 commit c5b0890
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 96 deletions.
4 changes: 2 additions & 2 deletions internal/incoming/odfi/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func deleteFilesOnRemote(ctx context.Context, logger log.Logger, agent upload.Ag
var el base.ErrorList
for i := range infos {
path := filepath.Join(suffix, filepath.Base(infos[i].Name()))
if err := agent.Delete(path); err != nil {
if err := agent.Delete(ctx, path); err != nil {
// Ignore the error if it's about deleting a remote file that's gone
if os.IsNotExist(err) {
continue
Expand Down Expand Up @@ -134,7 +134,7 @@ func deleteEmptyFiles(ctx context.Context, logger log.Logger, agent upload.Agent
os.Remove(info.Name())

// Go ahead and delete the remote file
if err := agent.Delete(path); err != nil {
if err := agent.Delete(ctx, path); err != nil {
el.Add(err)
}

Expand Down
16 changes: 8 additions & 8 deletions internal/incoming/odfi/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,35 +145,35 @@ func (dl *downloaderImpl) CopyFilesFromRemote(ctx context.Context, agent upload.
})

// copy down files from our "inbound" directory
filepaths, err := agent.GetInboundFiles()
filepaths, err := agent.GetInboundFiles(ctx)
logger.Logf("%T found %d inbound files in %s", agent, len(filepaths), agent.InboundPath())
if err != nil {
return out, fmt.Errorf("problem downloading inbound files: %v", err)
}
filesDownloaded.With("kind", "inbound").Add(float64(len(filepaths)))
if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.InboundPath())); err != nil {
if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.InboundPath())); err != nil {
return out, fmt.Errorf("problem saving inbound files: %v", err)
}

// copy down files from out "reconciliation" directory
filepaths, err = agent.GetReconciliationFiles()
filepaths, err = agent.GetReconciliationFiles(ctx)
logger.Logf("%T found %d reconciliation files in %s", agent, len(filepaths), agent.ReconciliationPath())
if err != nil {
return out, fmt.Errorf("problem downloading reconciliation files: %v", err)
}
filesDownloaded.With("kind", "reconciliation").Add(float64(len(filepaths)))
if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.ReconciliationPath())); err != nil {
if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.ReconciliationPath())); err != nil {
return out, fmt.Errorf("problem saving reconciliation files: %v", err)
}

// copy down files from out "return" directory
filepaths, err = agent.GetReturnFiles()
filepaths, err = agent.GetReturnFiles(ctx)
logger.Logf("%T found %d return files in %s", agent, len(filepaths), agent.ReturnPath())
if err != nil {
return out, fmt.Errorf("problem downloading return files: %v", err)
}
filesDownloaded.With("kind", "return").Add(float64(len(filepaths)))
if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.ReturnPath())); err != nil {
if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.ReturnPath())); err != nil {
return out, fmt.Errorf("problem saving return files: %v", err)
}

Expand All @@ -182,7 +182,7 @@ func (dl *downloaderImpl) CopyFilesFromRemote(ctx context.Context, agent upload.

// saveFilepaths will create files in dir for each file object provided
// The contents of each file struct will always be closed.
func saveFilepaths(logger log.Logger, agent upload.Agent, filepaths []string, dir string) error {
func saveFilepaths(ctx context.Context, logger log.Logger, agent upload.Agent, filepaths []string, dir string) error {
var firstErr error
var errordFilenames []string

Expand All @@ -201,7 +201,7 @@ func saveFilepaths(logger log.Logger, agent upload.Agent, filepaths []string, di
continue
}

file, err := agent.ReadFile(filepaths[i])
file, err := agent.ReadFile(ctx, filepaths[i])
if err != nil {
// Save the error if it's our first, otherwise log
err = fmt.Errorf("reading %s failed: %w", filepaths[i], err)
Expand Down
2 changes: 1 addition & 1 deletion internal/pipeline/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (xfagg *aggregator) uploadFile(ctx context.Context, index int, agent upload
}

// Upload our file
err = agent.UploadFile(upload.File{
err = agent.UploadFile(ctx, upload.File{
Filepath: filename,
Contents: io.NopCloser(&buf),
})
Expand Down
13 changes: 7 additions & 6 deletions internal/upload/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package upload

import (
"context"
"fmt"
"sync"

Expand All @@ -17,12 +18,12 @@ import (
type Agent interface {
ID() string

GetInboundFiles() ([]string, error)
GetReconciliationFiles() ([]string, error)
GetReturnFiles() ([]string, error)
UploadFile(f File) error
Delete(path string) error
ReadFile(path string) (*File, error)
GetInboundFiles(ctx context.Context) ([]string, error)
GetReconciliationFiles(ctx context.Context) ([]string, error)
GetReturnFiles(ctx context.Context) ([]string, error)
UploadFile(ctx context.Context, f File) error
Delete(ctx context.Context, path string) error
ReadFile(ctx context.Context, path string) (*File, error)

InboundPath() string
OutboundPath() string
Expand Down
45 changes: 35 additions & 10 deletions internal/upload/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
package upload

import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"

"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/log"
"github.com/moov-io/base/telemetry"
go_ftp "github.com/moov-io/go-ftp"

"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
Expand Down Expand Up @@ -111,23 +115,39 @@ func (agent *FTPTransferAgent) Hostname() string {
return agent.cfg.FTP.Hostname
}

func (agent *FTPTransferAgent) Delete(path string) error {
func (agent *FTPTransferAgent) Delete(ctx context.Context, path string) error {
ctx, span := telemetry.StartSpan(ctx, "agent-ftp-delete", trace.WithAttributes(
attribute.String("path", path),
))
defer span.End()

return agent.client.Delete(path)
}

// uploadFile saves the content of File at the given filename in the OutboundPath directory
//
// The File's contents will always be closed
func (agent *FTPTransferAgent) UploadFile(f File) error {
func (agent *FTPTransferAgent) UploadFile(ctx context.Context, f File) error {
if agent == nil || agent.cfg.FTP == nil {
return errors.New("missing FTP client or config")
}

pathToWrite := filepath.Join(agent.OutboundPath(), f.Filepath)

ctx, span := telemetry.StartSpan(ctx, "agent-ftp-upload", trace.WithAttributes(
attribute.String("path", pathToWrite),
))
defer span.End()

return agent.client.UploadFile(pathToWrite, f.Contents)
}

func (agent *FTPTransferAgent) ReadFile(path string) (*File, error) {
func (agent *FTPTransferAgent) ReadFile(ctx context.Context, path string) (*File, error) {
ctx, span := telemetry.StartSpan(ctx, "agent-ftp-read", trace.WithAttributes(
attribute.String("path", path),
))
defer span.End()

file, err := agent.client.Open(path)
if err != nil {
return nil, fmt.Errorf("ftp open %s failed: %w", path, err)
Expand All @@ -138,19 +158,24 @@ func (agent *FTPTransferAgent) ReadFile(path string) (*File, error) {
}, nil
}

func (agent *FTPTransferAgent) GetInboundFiles() ([]string, error) {
return agent.readFilepaths(agent.cfg.Paths.Inbound)
func (agent *FTPTransferAgent) GetInboundFiles(ctx context.Context) ([]string, error) {
return agent.readFilepaths(ctx, agent.cfg.Paths.Inbound)
}

func (agent *FTPTransferAgent) GetReconciliationFiles() ([]string, error) {
return agent.readFilepaths(agent.cfg.Paths.Reconciliation)
func (agent *FTPTransferAgent) GetReconciliationFiles(ctx context.Context) ([]string, error) {
return agent.readFilepaths(ctx, agent.cfg.Paths.Reconciliation)
}

func (agent *FTPTransferAgent) GetReturnFiles() ([]string, error) {
return agent.readFilepaths(agent.cfg.Paths.Return)
func (agent *FTPTransferAgent) GetReturnFiles(ctx context.Context) ([]string, error) {
return agent.readFilepaths(ctx, agent.cfg.Paths.Return)
}

func (agent *FTPTransferAgent) readFilepaths(dir string) ([]string, error) {
func (agent *FTPTransferAgent) readFilepaths(ctx context.Context, dir string) ([]string, error) {
ctx, span := telemetry.StartSpan(ctx, "agent-ftp-list", trace.WithAttributes(
attribute.String("path", dir),
))
defer span.End()

filepaths, err := agent.client.ListFiles(dir)
if err != nil {
return nil, err
Expand Down
45 changes: 27 additions & 18 deletions internal/upload/ftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upload

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -210,13 +211,14 @@ func TestFTP__getInboundFiles(t *testing.T) {
defer agent.Close()
defer svc.Shutdown()

filenames, err := agent.GetInboundFiles()
ctx := context.Background()
filenames, err := agent.GetInboundFiles(ctx)
require.NoError(t, err)
require.Len(t, filenames, 3)

for i := range filenames {
if filenames[i] == "inbound/iat-credit.ach" {
file, err := agent.ReadFile(filenames[i])
file, err := agent.ReadFile(ctx, filenames[i])
require.NoError(t, err)

bs, _ := io.ReadAll(file.Contents)
Expand All @@ -228,7 +230,7 @@ func TestFTP__getInboundFiles(t *testing.T) {
}

// make sure we perform the same call and get the same result
filenames, err = agent.GetInboundFiles()
filenames, err = agent.GetInboundFiles(ctx)
require.NoError(t, err)
require.Len(t, filenames, 3)
require.ElementsMatch(t, filenames, []string{"inbound/iat-credit.ach", "inbound/cor-c01.ach", "inbound/prenote-ppd-debit.ach"})
Expand All @@ -239,14 +241,15 @@ func TestFTP__getReconciliationFiles(t *testing.T) {
defer agent.Close()
defer svc.Shutdown()

filenames, err := agent.GetReconciliationFiles()
ctx := context.Background()
filenames, err := agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.Len(t, filenames, 1)
require.ElementsMatch(t, filenames, []string{"reconciliation/ppd-debit.ach"})

for i := range filenames {
if filenames[i] == "reconciliation/ppd-debit.ach" {
file, err := agent.ReadFile(filenames[i])
file, err := agent.ReadFile(ctx, filenames[i])
require.NoError(t, err)

bs, _ := io.ReadAll(file.Contents)
Expand All @@ -258,7 +261,7 @@ func TestFTP__getReconciliationFiles(t *testing.T) {
}

// make sure we perform the same call and get the same result
filenames, err = agent.GetReconciliationFiles()
filenames, err = agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.ElementsMatch(t, filenames, []string{"reconciliation/ppd-debit.ach"})
}
Expand All @@ -268,13 +271,14 @@ func TestFTP__getReturnFiles(t *testing.T) {
defer agent.Close()
defer svc.Shutdown()

filenames, err := agent.GetReturnFiles()
ctx := context.Background()
filenames, err := agent.GetReturnFiles(ctx)
require.NoError(t, err)
require.Len(t, filenames, 1)
require.Equal(t, "returned/return-WEB.ach", filenames[0])

// read the returned file and verify its contents
file, err := agent.ReadFile(filenames[0])
file, err := agent.ReadFile(ctx, filenames[0])
require.NoError(t, err)

bs, _ := io.ReadAll(file.Contents)
Expand All @@ -284,7 +288,7 @@ func TestFTP__getReturnFiles(t *testing.T) {
}

// make sure we perform the same call and get the same result
filenames, err = agent.GetReturnFiles()
filenames, err = agent.GetReturnFiles(ctx)
require.NoError(t, err)
require.Len(t, filenames, 1)
require.Equal(t, "returned/return-WEB.ach", filenames[0])
Expand All @@ -307,7 +311,8 @@ func TestFTP__uploadFile(t *testing.T) {
t.Fatal(err)
}

if err := agent.UploadFile(f); err != nil {
ctx := context.Background()
if err := agent.UploadFile(ctx, f); err != nil {
t.Fatal(err)
}

Expand All @@ -322,13 +327,13 @@ func TestFTP__uploadFile(t *testing.T) {
require.Equal(t, content, string(bs))

// delete the file
if err := agent.Delete(path); err != nil {
if err := agent.Delete(ctx, path); err != nil {
t.Fatal(err)
}

// get an error with no FTP configs
agent.cfg.FTP = nil
if err := agent.UploadFile(f); err == nil {
if err := agent.UploadFile(ctx, f); err == nil {
t.Error("expected error")
}
}
Expand All @@ -349,7 +354,8 @@ func TestFTP__Issue494(t *testing.T) {
defer os.Remove(path)

// Read without an error
files, err := agent.GetReturnFiles()
ctx := context.Background()
files, err := agent.GetReturnFiles(ctx)
if err != nil {
t.Error(err)
}
Expand All @@ -363,7 +369,8 @@ func TestFTP__DeleteMissing(t *testing.T) {
defer agent.Close()
defer svc.Shutdown()

err := agent.Delete("/missing.txt")
ctx := context.Background()
err := agent.Delete(ctx, "/missing.txt")
require.NoError(t, err)
}

Expand All @@ -385,13 +392,15 @@ func TestFTP_GetReconciliationFiles(t *testing.T) {
Reconciliation: "reconciliation",
},
}

ctx := context.Background()
logger := log.NewTestLogger()

t.Run("relative path", func(t *testing.T) {
agent, err := newFTPTransferAgent(logger, conf)
require.NoError(t, err)

filepaths, err := agent.GetReconciliationFiles()
filepaths, err := agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"})
})
Expand All @@ -402,7 +411,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) {
agent, err := newFTPTransferAgent(logger, conf)
require.NoError(t, err)

filepaths, err := agent.GetReconciliationFiles()
filepaths, err := agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"})
})
Expand All @@ -413,7 +422,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) {
agent, err := newFTPTransferAgent(logger, conf)
require.NoError(t, err)

filepaths, err := agent.GetReconciliationFiles()
filepaths, err := agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"})
})
Expand All @@ -424,7 +433,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) {
agent, err := newFTPTransferAgent(logger, conf)
require.NoError(t, err)

filepaths, err := agent.GetReconciliationFiles()
filepaths, err := agent.GetReconciliationFiles(ctx)
require.NoError(t, err)
require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"})
})
Expand Down
Loading

0 comments on commit c5b0890

Please sign in to comment.