Skip to content

Commit

Permalink
wip: formatter.batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmcgee committed Jul 5, 2024
1 parent 1288c33 commit 106340c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
2 changes: 2 additions & 0 deletions config/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ type Formatter struct {
Excludes []string `toml:"excludes,omitempty"`
// Indicates the order of precedence when executing this Formatter in a sequence of Formatters.
Priority int `toml:"priority,omitempty"`
// BatchSize controls the maximum number of paths to apply to the formatter in one invocation.
BatchSize int `toml:"batch_size"`
}
48 changes: 44 additions & 4 deletions format/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os/exec"
"time"

"golang.org/x/sync/errgroup"

"git.numtide.com/numtide/treefmt/walk"

"git.numtide.com/numtide/treefmt/config"
Expand Down Expand Up @@ -48,6 +50,48 @@ func (f *Formatter) Priority() int {

func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {
start := time.Now()
defer func() {
f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))
}()

if f.config.BatchSize == 0 {
// apply as one single batch
return f.applyBatch(ctx, tasks)
}

// otherwise we create smaller batches and apply them concurrently
// todo how to constrain the overall number of 'threads' as we want a separate group here for the eg.Wait()
eg := errgroup.Group{}

var batch []*Task
for idx := range tasks {
batch = append(batch, tasks[idx])
if len(batch) == f.config.BatchSize {
// copy the batch as we re-use it for the next batch
next := make([]*Task, len(batch))
copy(next, batch)

// fire off a routine to process the next batch
eg.Go(func() error {
return f.applyBatch(ctx, next)
})
// reset batch for next iteration
batch = batch[:0]
}
}

// flush final partial batch
if len(batch) > 0 {
eg.Go(func() error {
return f.applyBatch(ctx, batch)
})
}

return eg.Wait()
}

func (f *Formatter) applyBatch(ctx context.Context, tasks []*Task) error {
f.log.Debugf("applying batch, size = %d", len(tasks))

// construct args, starting with config
args := f.config.Options
Expand Down Expand Up @@ -80,10 +124,6 @@ func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {
return fmt.Errorf("formatter '%s' with options '%v' failed to apply: %w", f.config.Command, f.config.Options, err)
}

//

f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))

return nil
}

Expand Down

0 comments on commit 106340c

Please sign in to comment.