diff --git a/internal/incoming/odfi/cleanup.go b/internal/incoming/odfi/cleanup.go index d1b9200..fd9cdd4 100644 --- a/internal/incoming/odfi/cleanup.go +++ b/internal/incoming/odfi/cleanup.go @@ -84,7 +84,7 @@ func deleteFilesOnRemote(ctx context.Context, logger log.Logger, agent upload.Ag var el base.ErrorList for i := range infos { path := filepath.Join(suffix, filepath.Base(infos[i].Name())) - if err := agent.Delete(path); err != nil { + if err := agent.Delete(ctx, path); err != nil { // Ignore the error if it's about deleting a remote file that's gone if os.IsNotExist(err) { continue @@ -134,7 +134,7 @@ func deleteEmptyFiles(ctx context.Context, logger log.Logger, agent upload.Agent os.Remove(info.Name()) // Go ahead and delete the remote file - if err := agent.Delete(path); err != nil { + if err := agent.Delete(ctx, path); err != nil { el.Add(err) } diff --git a/internal/incoming/odfi/download.go b/internal/incoming/odfi/download.go index 287e825..819e2ae 100644 --- a/internal/incoming/odfi/download.go +++ b/internal/incoming/odfi/download.go @@ -145,35 +145,35 @@ func (dl *downloaderImpl) CopyFilesFromRemote(ctx context.Context, agent upload. }) // copy down files from our "inbound" directory - filepaths, err := agent.GetInboundFiles() + filepaths, err := agent.GetInboundFiles(ctx) logger.Logf("%T found %d inbound files in %s", agent, len(filepaths), agent.InboundPath()) if err != nil { return out, fmt.Errorf("problem downloading inbound files: %v", err) } filesDownloaded.With("kind", "inbound").Add(float64(len(filepaths))) - if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.InboundPath())); err != nil { + if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.InboundPath())); err != nil { return out, fmt.Errorf("problem saving inbound files: %v", err) } // copy down files from out "reconciliation" directory - filepaths, err = agent.GetReconciliationFiles() + filepaths, err = agent.GetReconciliationFiles(ctx) logger.Logf("%T found %d reconciliation files in %s", agent, len(filepaths), agent.ReconciliationPath()) if err != nil { return out, fmt.Errorf("problem downloading reconciliation files: %v", err) } filesDownloaded.With("kind", "reconciliation").Add(float64(len(filepaths))) - if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.ReconciliationPath())); err != nil { + if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.ReconciliationPath())); err != nil { return out, fmt.Errorf("problem saving reconciliation files: %v", err) } // copy down files from out "return" directory - filepaths, err = agent.GetReturnFiles() + filepaths, err = agent.GetReturnFiles(ctx) logger.Logf("%T found %d return files in %s", agent, len(filepaths), agent.ReturnPath()) if err != nil { return out, fmt.Errorf("problem downloading return files: %v", err) } filesDownloaded.With("kind", "return").Add(float64(len(filepaths))) - if err := saveFilepaths(logger, agent, filepaths, filepath.Join(out.dir, agent.ReturnPath())); err != nil { + if err := saveFilepaths(ctx, logger, agent, filepaths, filepath.Join(out.dir, agent.ReturnPath())); err != nil { return out, fmt.Errorf("problem saving return files: %v", err) } @@ -182,7 +182,7 @@ func (dl *downloaderImpl) CopyFilesFromRemote(ctx context.Context, agent upload. // saveFilepaths will create files in dir for each file object provided // The contents of each file struct will always be closed. -func saveFilepaths(logger log.Logger, agent upload.Agent, filepaths []string, dir string) error { +func saveFilepaths(ctx context.Context, logger log.Logger, agent upload.Agent, filepaths []string, dir string) error { var firstErr error var errordFilenames []string @@ -201,7 +201,7 @@ func saveFilepaths(logger log.Logger, agent upload.Agent, filepaths []string, di continue } - file, err := agent.ReadFile(filepaths[i]) + file, err := agent.ReadFile(ctx, filepaths[i]) if err != nil { // Save the error if it's our first, otherwise log err = fmt.Errorf("reading %s failed: %w", filepaths[i], err) diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index bb930f0..ad23ace 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -295,7 +295,7 @@ func (xfagg *aggregator) uploadFile(ctx context.Context, index int, agent upload } // Upload our file - err = agent.UploadFile(upload.File{ + err = agent.UploadFile(ctx, upload.File{ Filepath: filename, Contents: io.NopCloser(&buf), }) diff --git a/internal/upload/agent.go b/internal/upload/agent.go index 53af4fe..b0acdaa 100644 --- a/internal/upload/agent.go +++ b/internal/upload/agent.go @@ -5,6 +5,7 @@ package upload import ( + "context" "fmt" "sync" @@ -17,12 +18,12 @@ import ( type Agent interface { ID() string - GetInboundFiles() ([]string, error) - GetReconciliationFiles() ([]string, error) - GetReturnFiles() ([]string, error) - UploadFile(f File) error - Delete(path string) error - ReadFile(path string) (*File, error) + GetInboundFiles(ctx context.Context) ([]string, error) + GetReconciliationFiles(ctx context.Context) ([]string, error) + GetReturnFiles(ctx context.Context) ([]string, error) + UploadFile(ctx context.Context, f File) error + Delete(ctx context.Context, path string) error + ReadFile(ctx context.Context, path string) (*File, error) InboundPath() string OutboundPath() string diff --git a/internal/upload/ftp.go b/internal/upload/ftp.go index a22596a..5641f14 100644 --- a/internal/upload/ftp.go +++ b/internal/upload/ftp.go @@ -5,6 +5,7 @@ package upload import ( + "context" "errors" "fmt" "path/filepath" @@ -12,10 +13,13 @@ import ( "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/base/log" + "github.com/moov-io/base/telemetry" go_ftp "github.com/moov-io/go-ftp" "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ( @@ -111,23 +115,39 @@ func (agent *FTPTransferAgent) Hostname() string { return agent.cfg.FTP.Hostname } -func (agent *FTPTransferAgent) Delete(path string) error { +func (agent *FTPTransferAgent) Delete(ctx context.Context, path string) error { + ctx, span := telemetry.StartSpan(ctx, "agent-ftp-delete", trace.WithAttributes( + attribute.String("path", path), + )) + defer span.End() + return agent.client.Delete(path) } // uploadFile saves the content of File at the given filename in the OutboundPath directory // // The File's contents will always be closed -func (agent *FTPTransferAgent) UploadFile(f File) error { +func (agent *FTPTransferAgent) UploadFile(ctx context.Context, f File) error { if agent == nil || agent.cfg.FTP == nil { return errors.New("missing FTP client or config") } pathToWrite := filepath.Join(agent.OutboundPath(), f.Filepath) + + ctx, span := telemetry.StartSpan(ctx, "agent-ftp-upload", trace.WithAttributes( + attribute.String("path", pathToWrite), + )) + defer span.End() + return agent.client.UploadFile(pathToWrite, f.Contents) } -func (agent *FTPTransferAgent) ReadFile(path string) (*File, error) { +func (agent *FTPTransferAgent) ReadFile(ctx context.Context, path string) (*File, error) { + ctx, span := telemetry.StartSpan(ctx, "agent-ftp-read", trace.WithAttributes( + attribute.String("path", path), + )) + defer span.End() + file, err := agent.client.Open(path) if err != nil { return nil, fmt.Errorf("ftp open %s failed: %w", path, err) @@ -138,19 +158,24 @@ func (agent *FTPTransferAgent) ReadFile(path string) (*File, error) { }, nil } -func (agent *FTPTransferAgent) GetInboundFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Inbound) +func (agent *FTPTransferAgent) GetInboundFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Inbound) } -func (agent *FTPTransferAgent) GetReconciliationFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Reconciliation) +func (agent *FTPTransferAgent) GetReconciliationFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Reconciliation) } -func (agent *FTPTransferAgent) GetReturnFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Return) +func (agent *FTPTransferAgent) GetReturnFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Return) } -func (agent *FTPTransferAgent) readFilepaths(dir string) ([]string, error) { +func (agent *FTPTransferAgent) readFilepaths(ctx context.Context, dir string) ([]string, error) { + ctx, span := telemetry.StartSpan(ctx, "agent-ftp-list", trace.WithAttributes( + attribute.String("path", dir), + )) + defer span.End() + filepaths, err := agent.client.ListFiles(dir) if err != nil { return nil, err diff --git a/internal/upload/ftp_test.go b/internal/upload/ftp_test.go index a233297..a12ac98 100644 --- a/internal/upload/ftp_test.go +++ b/internal/upload/ftp_test.go @@ -6,6 +6,7 @@ package upload import ( "bytes" + "context" "errors" "fmt" "io" @@ -210,13 +211,14 @@ func TestFTP__getInboundFiles(t *testing.T) { defer agent.Close() defer svc.Shutdown() - filenames, err := agent.GetInboundFiles() + ctx := context.Background() + filenames, err := agent.GetInboundFiles(ctx) require.NoError(t, err) require.Len(t, filenames, 3) for i := range filenames { if filenames[i] == "inbound/iat-credit.ach" { - file, err := agent.ReadFile(filenames[i]) + file, err := agent.ReadFile(ctx, filenames[i]) require.NoError(t, err) bs, _ := io.ReadAll(file.Contents) @@ -228,7 +230,7 @@ func TestFTP__getInboundFiles(t *testing.T) { } // make sure we perform the same call and get the same result - filenames, err = agent.GetInboundFiles() + filenames, err = agent.GetInboundFiles(ctx) require.NoError(t, err) require.Len(t, filenames, 3) require.ElementsMatch(t, filenames, []string{"inbound/iat-credit.ach", "inbound/cor-c01.ach", "inbound/prenote-ppd-debit.ach"}) @@ -239,14 +241,15 @@ func TestFTP__getReconciliationFiles(t *testing.T) { defer agent.Close() defer svc.Shutdown() - filenames, err := agent.GetReconciliationFiles() + ctx := context.Background() + filenames, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.Len(t, filenames, 1) require.ElementsMatch(t, filenames, []string{"reconciliation/ppd-debit.ach"}) for i := range filenames { if filenames[i] == "reconciliation/ppd-debit.ach" { - file, err := agent.ReadFile(filenames[i]) + file, err := agent.ReadFile(ctx, filenames[i]) require.NoError(t, err) bs, _ := io.ReadAll(file.Contents) @@ -258,7 +261,7 @@ func TestFTP__getReconciliationFiles(t *testing.T) { } // make sure we perform the same call and get the same result - filenames, err = agent.GetReconciliationFiles() + filenames, err = agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filenames, []string{"reconciliation/ppd-debit.ach"}) } @@ -268,13 +271,14 @@ func TestFTP__getReturnFiles(t *testing.T) { defer agent.Close() defer svc.Shutdown() - filenames, err := agent.GetReturnFiles() + ctx := context.Background() + filenames, err := agent.GetReturnFiles(ctx) require.NoError(t, err) require.Len(t, filenames, 1) require.Equal(t, "returned/return-WEB.ach", filenames[0]) // read the returned file and verify its contents - file, err := agent.ReadFile(filenames[0]) + file, err := agent.ReadFile(ctx, filenames[0]) require.NoError(t, err) bs, _ := io.ReadAll(file.Contents) @@ -284,7 +288,7 @@ func TestFTP__getReturnFiles(t *testing.T) { } // make sure we perform the same call and get the same result - filenames, err = agent.GetReturnFiles() + filenames, err = agent.GetReturnFiles(ctx) require.NoError(t, err) require.Len(t, filenames, 1) require.Equal(t, "returned/return-WEB.ach", filenames[0]) @@ -307,7 +311,8 @@ func TestFTP__uploadFile(t *testing.T) { t.Fatal(err) } - if err := agent.UploadFile(f); err != nil { + ctx := context.Background() + if err := agent.UploadFile(ctx, f); err != nil { t.Fatal(err) } @@ -322,13 +327,13 @@ func TestFTP__uploadFile(t *testing.T) { require.Equal(t, content, string(bs)) // delete the file - if err := agent.Delete(path); err != nil { + if err := agent.Delete(ctx, path); err != nil { t.Fatal(err) } // get an error with no FTP configs agent.cfg.FTP = nil - if err := agent.UploadFile(f); err == nil { + if err := agent.UploadFile(ctx, f); err == nil { t.Error("expected error") } } @@ -349,7 +354,8 @@ func TestFTP__Issue494(t *testing.T) { defer os.Remove(path) // Read without an error - files, err := agent.GetReturnFiles() + ctx := context.Background() + files, err := agent.GetReturnFiles(ctx) if err != nil { t.Error(err) } @@ -363,7 +369,8 @@ func TestFTP__DeleteMissing(t *testing.T) { defer agent.Close() defer svc.Shutdown() - err := agent.Delete("/missing.txt") + ctx := context.Background() + err := agent.Delete(ctx, "/missing.txt") require.NoError(t, err) } @@ -385,13 +392,15 @@ func TestFTP_GetReconciliationFiles(t *testing.T) { Reconciliation: "reconciliation", }, } + + ctx := context.Background() logger := log.NewTestLogger() t.Run("relative path", func(t *testing.T) { agent, err := newFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"}) }) @@ -402,7 +411,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) { agent, err := newFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"}) }) @@ -413,7 +422,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) { agent, err := newFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"}) }) @@ -424,7 +433,7 @@ func TestFTP_GetReconciliationFiles(t *testing.T) { agent, err := newFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"}) }) diff --git a/internal/upload/mock_agent.go b/internal/upload/mock_agent.go index 71bab2d..8402614 100644 --- a/internal/upload/mock_agent.go +++ b/internal/upload/mock_agent.go @@ -6,6 +6,7 @@ package upload import ( "bytes" + "context" "io" "sync" ) @@ -28,28 +29,28 @@ func (a *MockAgent) ID() string { return "mock-agent" } -func (a *MockAgent) GetInboundFiles() ([]string, error) { +func (a *MockAgent) GetInboundFiles(_ context.Context) ([]string, error) { a.mu.RLock() defer a.mu.RUnlock() return a.InboundFilepaths, nil } -func (a *MockAgent) GetReconciliationFiles() ([]string, error) { +func (a *MockAgent) GetReconciliationFiles(_ context.Context) ([]string, error) { a.mu.RLock() defer a.mu.RUnlock() return a.ReconciliationFilepaths, nil } -func (a *MockAgent) GetReturnFiles() ([]string, error) { +func (a *MockAgent) GetReturnFiles(_ context.Context) ([]string, error) { a.mu.RLock() defer a.mu.RUnlock() return a.ReturnFilepaths, nil } -func (a *MockAgent) UploadFile(f File) error { +func (a *MockAgent) UploadFile(_ context.Context, f File) error { a.mu.Lock() defer a.mu.Unlock() @@ -60,7 +61,7 @@ func (a *MockAgent) UploadFile(f File) error { return nil } -func (a *MockAgent) Delete(path string) error { +func (a *MockAgent) Delete(_ context.Context, path string) error { a.mu.Lock() defer a.mu.Unlock() @@ -68,7 +69,7 @@ func (a *MockAgent) Delete(path string) error { return nil } -func (a *MockAgent) ReadFile(path string) (*File, error) { +func (a *MockAgent) ReadFile(_ context.Context, path string) (*File, error) { a.mu.Lock() defer a.mu.Unlock() diff --git a/internal/upload/retry.go b/internal/upload/retry.go index 67a63be..7ebb8f6 100644 --- a/internal/upload/retry.go +++ b/internal/upload/retry.go @@ -59,15 +59,14 @@ func (rt *RetryAgent) newBackoff() (retry.Backoff, error) { return fib, nil } -func (rt *RetryAgent) retryFiles(f func() ([]string, error)) ([]string, error) { +func (rt *RetryAgent) retryFiles(ctx context.Context, f func(context.Context) ([]string, error)) ([]string, error) { backoff, err := rt.newBackoff() if err != nil { return nil, err } var files []string - ctx := context.Background() err = retry.Do(ctx, backoff, func(ctx context.Context) error { - fs, err := f() + fs, err := f(ctx) if err := isRetryableError(err); err != nil { return err } @@ -78,49 +77,46 @@ func (rt *RetryAgent) retryFiles(f func() ([]string, error)) ([]string, error) { } // Network'd calls -func (rt *RetryAgent) GetInboundFiles() ([]string, error) { - return rt.retryFiles(rt.underlying.GetInboundFiles) +func (rt *RetryAgent) GetInboundFiles(ctx context.Context) ([]string, error) { + return rt.retryFiles(ctx, rt.underlying.GetInboundFiles) } -func (rt *RetryAgent) GetReconciliationFiles() ([]string, error) { - return rt.retryFiles(rt.underlying.GetReconciliationFiles) +func (rt *RetryAgent) GetReconciliationFiles(ctx context.Context) ([]string, error) { + return rt.retryFiles(ctx, rt.underlying.GetReconciliationFiles) } -func (rt *RetryAgent) GetReturnFiles() ([]string, error) { - return rt.retryFiles(rt.underlying.GetReturnFiles) +func (rt *RetryAgent) GetReturnFiles(ctx context.Context) ([]string, error) { + return rt.retryFiles(ctx, rt.underlying.GetReturnFiles) } -func (rt *RetryAgent) UploadFile(f File) error { +func (rt *RetryAgent) UploadFile(ctx context.Context, f File) error { backoff, err := rt.newBackoff() if err != nil { return err } - ctx := context.Background() return retry.Do(ctx, backoff, func(ctx context.Context) error { - return isRetryableError(rt.underlying.UploadFile(f)) + return isRetryableError(rt.underlying.UploadFile(ctx, f)) }) } -func (rt *RetryAgent) Delete(path string) error { +func (rt *RetryAgent) Delete(ctx context.Context, path string) error { backoff, err := rt.newBackoff() if err != nil { return err } - ctx := context.Background() return retry.Do(ctx, backoff, func(ctx context.Context) error { - return isRetryableError(rt.underlying.Delete(path)) + return isRetryableError(rt.underlying.Delete(ctx, path)) }) } -func (rt *RetryAgent) ReadFile(path string) (*File, error) { +func (rt *RetryAgent) ReadFile(ctx context.Context, path string) (*File, error) { backoff, err := rt.newBackoff() if err != nil { return nil, err } var file *File - ctx := context.Background() err = retry.Do(ctx, backoff, func(ctx context.Context) error { - file, err = rt.underlying.ReadFile(path) + file, err = rt.underlying.ReadFile(ctx, path) if err := isRetryableError(err); err != nil { return err } diff --git a/internal/upload/sftp.go b/internal/upload/sftp.go index a9ea428..f41793c 100644 --- a/internal/upload/sftp.go +++ b/internal/upload/sftp.go @@ -5,6 +5,7 @@ package upload import ( + "context" "errors" "fmt" "path/filepath" @@ -12,7 +13,10 @@ import ( "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/base/log" + "github.com/moov-io/base/telemetry" go_sftp "github.com/moov-io/go-sftp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type SFTPTransferAgent struct { @@ -62,7 +66,6 @@ func (agent *SFTPTransferAgent) Ping() error { if agent == nil { return errors.New("nil SFTPTransferAgent") } - return agent.client.Ping() } @@ -96,21 +99,36 @@ func (agent *SFTPTransferAgent) Hostname() string { return agent.cfg.SFTP.Hostname } -func (agent *SFTPTransferAgent) Delete(path string) error { +func (agent *SFTPTransferAgent) Delete(ctx context.Context, path string) error { + ctx, span := telemetry.StartSpan(ctx, "agent-sftp-delete", trace.WithAttributes( + attribute.String("path", path), + )) + defer span.End() + return agent.client.Delete(path) } // uploadFile saves the content of File at the given filename in the OutboundPath directory // // The File's contents will always be closed -func (agent *SFTPTransferAgent) UploadFile(f File) error { +func (agent *SFTPTransferAgent) UploadFile(ctx context.Context, f File) error { // Take the base of f.Filepath and our (out of band) OutboundPath to avoid accepting a write like '../../../../etc/passwd'. pathToWrite := filepath.Join(agent.OutboundPath(), filepath.Base(f.Filepath)) + ctx, span := telemetry.StartSpan(ctx, "agent-sftp-upload", trace.WithAttributes( + attribute.String("path", pathToWrite), + )) + defer span.End() + return agent.client.UploadFile(pathToWrite, f.Contents) } -func (agent *SFTPTransferAgent) ReadFile(path string) (*File, error) { +func (agent *SFTPTransferAgent) ReadFile(ctx context.Context, path string) (*File, error) { + ctx, span := telemetry.StartSpan(ctx, "agent-sftp-read", trace.WithAttributes( + attribute.String("path", path), + )) + defer span.End() + file, err := agent.client.Open(path) if err != nil { return nil, fmt.Errorf("sftp open %s failed: %w", path, err) @@ -121,19 +139,24 @@ func (agent *SFTPTransferAgent) ReadFile(path string) (*File, error) { }, nil } -func (agent *SFTPTransferAgent) GetInboundFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Inbound) +func (agent *SFTPTransferAgent) GetInboundFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Inbound) } -func (agent *SFTPTransferAgent) GetReconciliationFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Reconciliation) +func (agent *SFTPTransferAgent) GetReconciliationFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Reconciliation) } -func (agent *SFTPTransferAgent) GetReturnFiles() ([]string, error) { - return agent.readFilepaths(agent.cfg.Paths.Return) +func (agent *SFTPTransferAgent) GetReturnFiles(ctx context.Context) ([]string, error) { + return agent.readFilepaths(ctx, agent.cfg.Paths.Return) } -func (agent *SFTPTransferAgent) readFilepaths(dir string) ([]string, error) { +func (agent *SFTPTransferAgent) readFilepaths(ctx context.Context, dir string) ([]string, error) { + ctx, span := telemetry.StartSpan(ctx, "agent-sftp-list", trace.WithAttributes( + attribute.String("path", dir), + )) + defer span.End() + filepaths, err := agent.client.ListFiles(dir) if err != nil { return nil, err diff --git a/internal/upload/sftp_test.go b/internal/upload/sftp_test.go index 02ae901..d139e0b 100644 --- a/internal/upload/sftp_test.go +++ b/internal/upload/sftp_test.go @@ -8,6 +8,7 @@ package upload import ( + "context" "fmt" "io" "os" @@ -164,13 +165,14 @@ func TestSFTP__password(t *testing.T) { err := deployment.agent.Ping() require.NoError(t, err) - err = deployment.agent.UploadFile(File{ + ctx := context.Background() + err = deployment.agent.UploadFile(ctx, File{ Filepath: "upload.ach", Contents: io.NopCloser(strings.NewReader("test data")), }) require.NoError(t, err) - err = deployment.agent.Delete(deployment.agent.OutboundPath() + "upload.ach") + err = deployment.agent.Delete(ctx, deployment.agent.OutboundPath()+"upload.ach") require.NoError(t, err) // Inbound files (IAT in our testdata/sftp-server/) @@ -181,7 +183,7 @@ func TestSFTP__password(t *testing.T) { ) require.NoError(t, err) - filepaths, err := deployment.agent.GetInboundFiles() + filepaths, err := deployment.agent.GetInboundFiles(ctx) require.NoError(t, err) require.Len(t, filepaths, 1) require.Equal(t, "/upload/inbound/iat-credit.ach", filepaths[0]) @@ -196,7 +198,7 @@ func TestSFTP__password(t *testing.T) { time.Sleep(100 * time.Millisecond) - filepaths, err = deployment.agent.GetReturnFiles() + filepaths, err = deployment.agent.GetReturnFiles(ctx) require.NoError(t, err) require.Len(t, filepaths, 1) require.Equal(t, "/upload/returned/return-WEB.ach", filepaths[0]) @@ -209,22 +211,23 @@ func TestSFTP__readFilesEmpty(t *testing.T) { require.NoError(t, err) // Upload an empty file + ctx := context.Background() filename := fmt.Sprintf("%s.ach", base.ID()) - err = deployment.agent.UploadFile(File{ + err = deployment.agent.UploadFile(ctx, File{ Filepath: filename, Contents: io.NopCloser(strings.NewReader("")), }) require.NoError(t, err) // Read the empty file - filepaths, err := deployment.agent.readFilepaths(deployment.agent.OutboundPath()) + filepaths, err := deployment.agent.readFilepaths(ctx, deployment.agent.OutboundPath()) require.NoError(t, err) require.Len(t, filepaths, 1) require.ElementsMatch(t, filepaths, []string{ filepath.Join(deployment.agent.OutboundPath(), filename), }) - file, err := deployment.agent.ReadFile(filepaths[0]) + file, err := deployment.agent.ReadFile(ctx, filepaths[0]) require.NoError(t, err) bs, err := io.ReadAll(file.Contents) @@ -232,7 +235,7 @@ func TestSFTP__readFilesEmpty(t *testing.T) { require.Equal(t, "", string(bs)) // read a non-existent directory - filepaths, err = deployment.agent.readFilepaths("/dev/null") + filepaths, err = deployment.agent.readFilepaths(ctx, "/dev/null") require.NoError(t, err) require.Len(t, filepaths, 0) } @@ -240,12 +243,14 @@ func TestSFTP__readFilesEmpty(t *testing.T) { func TestSFTP__uploadFile(t *testing.T) { deployment := spawnSFTP(t) + ctx := context.Background() + err := deployment.agent.Ping() require.NoError(t, err) // force out OutboundPath to create more directories deployment.agent.cfg.Paths.Outbound = filepath.Join("upload", "foo") - err = deployment.agent.UploadFile(File{ + err = deployment.agent.UploadFile(ctx, File{ Filepath: "upload.ach", Contents: io.NopCloser(strings.NewReader("test data")), }) @@ -253,7 +258,7 @@ func TestSFTP__uploadFile(t *testing.T) { // fail to create the OutboundPath deployment.agent.cfg.Paths.Outbound = string(os.PathSeparator) + filepath.Join("home", "bad-path") - err = deployment.agent.UploadFile(File{ + err = deployment.agent.UploadFile(ctx, File{ Filepath: "upload.ach", Contents: io.NopCloser(strings.NewReader("test data")), }) @@ -340,7 +345,8 @@ func TestSFTP__Issue494(t *testing.T) { require.NoError(t, err) // Read without an error - files, err := deploy.agent.GetReturnFiles() + ctx := context.Background() + files, err := deploy.agent.GetReturnFiles(ctx) if err != nil { t.Error(err) } @@ -352,7 +358,8 @@ func TestSFTP__Issue494(t *testing.T) { func TestSFTP__DeleteMissing(t *testing.T) { deploy := spawnSFTP(t) - err := deploy.agent.Delete("/missing.txt") + ctx := context.Background() + err := deploy.agent.Delete(ctx, "/missing.txt") require.NoError(t, err) } @@ -374,13 +381,15 @@ func TestSFTP_GetReconciliationFiles(t *testing.T) { Reconciliation: "reconciliation", }, } + + ctx := context.Background() logger := log.NewTestLogger() t.Run("relative path", func(t *testing.T) { agent, err := newSFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"}) }) @@ -391,7 +400,7 @@ func TestSFTP_GetReconciliationFiles(t *testing.T) { agent, err := newSFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"reconciliation/ppd-debit.ach"}) }) @@ -402,7 +411,7 @@ func TestSFTP_GetReconciliationFiles(t *testing.T) { agent, err := newSFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"}) }) @@ -413,7 +422,7 @@ func TestSFTP_GetReconciliationFiles(t *testing.T) { agent, err := newSFTPTransferAgent(logger, conf) require.NoError(t, err) - filepaths, err := agent.GetReconciliationFiles() + filepaths, err := agent.GetReconciliationFiles(ctx) require.NoError(t, err) require.ElementsMatch(t, filepaths, []string{"/reconciliation/ppd-debit.ach"}) })