Skip to content

Commit

Permalink
command/run: handle large tokens in scanner (#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonmezonur authored May 13, 2022
1 parent 9a7fa24 commit c978e4a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Fixed a bug where errors did not result a non-zero exit code. ([#304](https://github.com/peak/s5cmd/issues/304))
- Print error if the commands file of `run` command is not accessible. ([#410](https://github.com/peak/s5cmd/pull/410))
- Updated region detection call to use current session's address resolving method ([#314](https://github.com/peak/s5cmd/issues/314))
- Fixed a bug where lines with large tokens fail in `run` command. `sync` was failing when it finds multiple files to remove. ([#435](https://github.com/peak/s5cmd/issues/435), [#436](https://github.com/peak/s5cmd/issues/436))

## v1.4.0 - 21 Sep 2021

Expand Down
72 changes: 39 additions & 33 deletions command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func NewRun(c *cli.Context, r io.Reader) Run {
}

func (r Run) Run(ctx context.Context) error {
reader := r.reader
pm := parallel.New(r.numWorkers)
defer pm.Close()

Expand All @@ -96,10 +95,10 @@ func (r Run) Run(ctx context.Context) error {
}
}()

scanner := NewScanner(ctx, reader)
reader := NewReader(ctx, r.reader)

lineno := -1
for line := range scanner.Scan() {
for line := range reader.Read() {
lineno++

line = strings.TrimSpace(line)
Expand Down Expand Up @@ -153,60 +152,67 @@ func (r Run) Run(ctx context.Context) error {
waiter.Wait()
<-errDoneCh

return multierror.Append(merrorWaiter, scanner.Err()).ErrorOrNil()
if reader.Err() != nil {
printError(commandFromContext(r.c), r.c.Command.Name, reader.Err())
}

return multierror.Append(merrorWaiter, reader.Err()).ErrorOrNil()
}

// Scanner is a cancelable scanner.
type Scanner struct {
*bufio.Scanner
// Reader is a cancelable reader.
type Reader struct {
*bufio.Reader
err error
linech chan string
ctx context.Context
}

// NewScanner creates a new scanner with cancellation.
func NewScanner(ctx context.Context, r io.Reader) *Scanner {
scanner := &Scanner{
ctx: ctx,
Scanner: bufio.NewScanner(r),
linech: make(chan string),
// NewReader creates a new reader with cancellation.
func NewReader(ctx context.Context, r io.Reader) *Reader {
reader := &Reader{
ctx: ctx,
Reader: bufio.NewReader(r),
linech: make(chan string),
}

go scanner.scan()
return scanner
go reader.read()
return reader
}

// scan read the underlying reader.
func (s *Scanner) scan() {
defer close(s.linech)
// read reads lines from the underlying reader.
func (r *Reader) read() {
defer close(r.linech)

for {
select {
case <-s.ctx.Done():
s.err = s.ctx.Err()
case <-r.ctx.Done():
r.err = r.ctx.Err()
return
default:
if !s.Scanner.Scan() {
return
// If ReadString encounters an error before finding a delimiter,
// it returns the data read before the error and the error itself (often io.EOF).
line, err := r.ReadString('\n')
if line != "" {
r.linech <- line
}
if err != nil {
if err == io.EOF {
return
}
r.err = multierror.Append(r.err, err)
}

s.linech <- s.Scanner.Text()
}
}
}

// Scan returns read-only channel to consume lines.
func (s *Scanner) Scan() <-chan string {
return s.linech
// Read returns read-only channel to consume lines.
func (r *Reader) Read() <-chan string {
return r.linech
}

// Err returns encountered errors, if any.
func (s *Scanner) Err() error {
if s.err != nil {
return s.err
}

return s.Scanner.Err()
func (r *Reader) Err() error {
return r.err
}

func validateRunCommand(c *cli.Context) error {
Expand Down
56 changes: 56 additions & 0 deletions e2e/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,3 +1621,59 @@ func TestSyncLocalDirectoryToS3WithExcludeFilter(t *testing.T) {
})
}
}

// sync --delete somedir s3://bucket/ (removes 10k objects)
func TestIssue435(t *testing.T) {
t.Parallel()

bucket := s3BucketFromTestName(t)

s3client, s5cmd, cleanup := setup(t, withS3Backend("mem"))
defer cleanup()

createBucket(t, s3client, bucket)

// empty folder
folderLayout := []fs.PathOp{}

workdir := fs.NewDir(t, "somedir", folderLayout...)
defer workdir.Remove()

const filecount = 10_000

filenameFunc := func(i int) string { return fmt.Sprintf("file_%06d", i) }
contentFunc := func(i int) string { return fmt.Sprintf("file body %06d", i) }

for i := 0; i < filecount; i++ {
filename := filenameFunc(i)
content := contentFunc(i)
putFile(t, s3client, bucket, filename, content)
}

src := fmt.Sprintf("%v/", workdir.Path())
src = filepath.ToSlash(src)
dst := fmt.Sprintf("s3://%v/", bucket)

cmd := s5cmd("--log", "debug", "sync", "--delete", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Success)

assertLines(t, result.Stderr(), map[int]compareFunc{})

expected := make(map[int]compareFunc)
for i := 0; i < filecount; i++ {
expected[i] = contains("rm s3://%v/file_%06d", bucket, i)
}

assertLines(t, result.Stdout(), expected, sortInput(true))

// assert s3 objects
for i := 0; i < filecount; i++ {
filename := filenameFunc(i)
content := contentFunc(i)

err := ensureS3Object(s3client, bucket, filename, content)
assertError(t, err, errS3NoSuchKey)
}
}

0 comments on commit c978e4a

Please sign in to comment.