diff --git a/CHANGELOG.md b/CHANGELOG.md index 1406549a3..7bc8d94cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Fixed a bug where errors did not result a non-zero exit code. ([#304](https://github.com/peak/s5cmd/issues/304)) - Print error if the commands file of `run` command is not accessible. ([#410](https://github.com/peak/s5cmd/pull/410)) - Updated region detection call to use current session's address resolving method ([#314](https://github.com/peak/s5cmd/issues/314)) +- Fixed a bug where lines with large tokens fail in `run` command. `sync` was failing when it finds multiple files to remove. ([#435](https://github.com/peak/s5cmd/issues/435), [#436](https://github.com/peak/s5cmd/issues/436)) ## v1.4.0 - 21 Sep 2021 diff --git a/command/run.go b/command/run.go index 37e8cb923..0b1774823 100644 --- a/command/run.go +++ b/command/run.go @@ -81,7 +81,6 @@ func NewRun(c *cli.Context, r io.Reader) Run { } func (r Run) Run(ctx context.Context) error { - reader := r.reader pm := parallel.New(r.numWorkers) defer pm.Close() @@ -96,10 +95,10 @@ func (r Run) Run(ctx context.Context) error { } }() - scanner := NewScanner(ctx, reader) + reader := NewReader(ctx, r.reader) lineno := -1 - for line := range scanner.Scan() { + for line := range reader.Read() { lineno++ line = strings.TrimSpace(line) @@ -153,60 +152,67 @@ func (r Run) Run(ctx context.Context) error { waiter.Wait() <-errDoneCh - return multierror.Append(merrorWaiter, scanner.Err()).ErrorOrNil() + if reader.Err() != nil { + printError(commandFromContext(r.c), r.c.Command.Name, reader.Err()) + } + + return multierror.Append(merrorWaiter, reader.Err()).ErrorOrNil() } -// Scanner is a cancelable scanner. -type Scanner struct { - *bufio.Scanner +// Reader is a cancelable reader. +type Reader struct { + *bufio.Reader err error linech chan string ctx context.Context } -// NewScanner creates a new scanner with cancellation. -func NewScanner(ctx context.Context, r io.Reader) *Scanner { - scanner := &Scanner{ - ctx: ctx, - Scanner: bufio.NewScanner(r), - linech: make(chan string), +// NewReader creates a new reader with cancellation. +func NewReader(ctx context.Context, r io.Reader) *Reader { + reader := &Reader{ + ctx: ctx, + Reader: bufio.NewReader(r), + linech: make(chan string), } - go scanner.scan() - return scanner + go reader.read() + return reader } -// scan read the underlying reader. -func (s *Scanner) scan() { - defer close(s.linech) +// read reads lines from the underlying reader. +func (r *Reader) read() { + defer close(r.linech) for { select { - case <-s.ctx.Done(): - s.err = s.ctx.Err() + case <-r.ctx.Done(): + r.err = r.ctx.Err() return default: - if !s.Scanner.Scan() { - return + // If ReadString encounters an error before finding a delimiter, + // it returns the data read before the error and the error itself (often io.EOF). + line, err := r.ReadString('\n') + if line != "" { + r.linech <- line + } + if err != nil { + if err == io.EOF { + return + } + r.err = multierror.Append(r.err, err) } - - s.linech <- s.Scanner.Text() } } } -// Scan returns read-only channel to consume lines. -func (s *Scanner) Scan() <-chan string { - return s.linech +// Read returns read-only channel to consume lines. +func (r *Reader) Read() <-chan string { + return r.linech } // Err returns encountered errors, if any. -func (s *Scanner) Err() error { - if s.err != nil { - return s.err - } - - return s.Scanner.Err() +func (r *Reader) Err() error { + return r.err } func validateRunCommand(c *cli.Context) error { diff --git a/e2e/sync_test.go b/e2e/sync_test.go index ca4489178..cc1554878 100644 --- a/e2e/sync_test.go +++ b/e2e/sync_test.go @@ -1621,3 +1621,59 @@ func TestSyncLocalDirectoryToS3WithExcludeFilter(t *testing.T) { }) } } + +// sync --delete somedir s3://bucket/ (removes 10k objects) +func TestIssue435(t *testing.T) { + t.Parallel() + + bucket := s3BucketFromTestName(t) + + s3client, s5cmd, cleanup := setup(t, withS3Backend("mem")) + defer cleanup() + + createBucket(t, s3client, bucket) + + // empty folder + folderLayout := []fs.PathOp{} + + workdir := fs.NewDir(t, "somedir", folderLayout...) + defer workdir.Remove() + + const filecount = 10_000 + + filenameFunc := func(i int) string { return fmt.Sprintf("file_%06d", i) } + contentFunc := func(i int) string { return fmt.Sprintf("file body %06d", i) } + + for i := 0; i < filecount; i++ { + filename := filenameFunc(i) + content := contentFunc(i) + putFile(t, s3client, bucket, filename, content) + } + + src := fmt.Sprintf("%v/", workdir.Path()) + src = filepath.ToSlash(src) + dst := fmt.Sprintf("s3://%v/", bucket) + + cmd := s5cmd("--log", "debug", "sync", "--delete", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stderr(), map[int]compareFunc{}) + + expected := make(map[int]compareFunc) + for i := 0; i < filecount; i++ { + expected[i] = contains("rm s3://%v/file_%06d", bucket, i) + } + + assertLines(t, result.Stdout(), expected, sortInput(true)) + + // assert s3 objects + for i := 0; i < filecount; i++ { + filename := filenameFunc(i) + content := contentFunc(i) + + err := ensureS3Object(s3client, bucket, filename, content) + assertError(t, err, errS3NoSuchKey) + } +}