Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch size cli arg and config option #335

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type Format struct {
Version bool `name:"version" short:"V" help:"Print version."`
Init bool `name:"init" short:"i" help:"Create a new treefmt.toml."`

Stdin bool `help:"Format the context passed in via stdin."`
OnUnmatched log.Level `name:"on-unmatched" short:"u" default:"warn" help:"Log paths that did not match any formatters at the specified log level, with fatal exiting the process with an error. Possible values are <debug|info|warn|error|fatal>."`
CpuProfile string `optional:"" help:"The file into which a cpu profile will be written."`
BatchSize int `default:"1024" short:"b" help:"Specify the maximum number of paths to apply to a sequence of formatters."`

Paths []string `name:"paths" arg:"" type:"path" optional:"" help:"Paths to format. Defaults to formatting the whole tree."`
Stdin bool `help:"Format the context passed in via stdin."`

CpuProfile string `optional:"" help:"The file into which a cpu profile will be written."`

formatters map[string]*format.Formatter
globalExcludes []glob.Glob
Expand Down
29 changes: 22 additions & 7 deletions cli/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ import (
)

const (
BatchSize = 1024
DefaultBatchSize = 1024
)

var ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
var (
ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
ErrInvalidBatchSize = errors.New("batch size must be >= 1")
)

func (f *Format) Run() (err error) {
// set log level and other options
f.configureLogging()

// validate the cli batch size
// todo move this into kong validation
if f.BatchSize < 1 {
return ErrInvalidBatchSize
}

// cpu profiling
if f.CpuProfile != "" {
cpuProfile, err := os.Create(f.CpuProfile)
Expand Down Expand Up @@ -96,6 +105,12 @@ func (f *Format) Run() (err error) {
return fmt.Errorf("failed to read config file %v: %w", f.ConfigFile, err)
}

// update the batch size only if it has not already been set by the cli arg
// cli arg takes precedence over config
if f.BatchSize == DefaultBatchSize && cfg.Global.BatchSize != 0 {
f.BatchSize = cfg.Global.BatchSize
}

// compile global exclude globs
if f.globalExcludes, err = format.CompileGlobs(cfg.Global.Excludes); err != nil {
return fmt.Errorf("failed to compile global excludes: %w", err)
Expand Down Expand Up @@ -146,7 +161,7 @@ func (f *Format) Run() (err error) {

// create a channel for files needing to be processed
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
f.filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())
f.filesCh = make(chan *walk.File, f.BatchSize*runtime.NumCPU())

// create a channel for files that have been processed
f.processedCh = make(chan *walk.File, cap(f.filesCh))
Expand All @@ -165,7 +180,7 @@ func (f *Format) Run() (err error) {
func (f *Format) updateCache(ctx context.Context) func() error {
return func() error {
// used to batch updates for more efficient txs
batch := make([]*walk.File, 0, BatchSize)
batch := make([]*walk.File, 0, f.BatchSize)

// apply a batch
processBatch := func() error {
Expand Down Expand Up @@ -212,7 +227,7 @@ func (f *Format) updateCache(ctx context.Context) func() error {

// append to batch and process if we have enough
batch = append(batch, file)
if len(batch) == BatchSize {
if len(batch) == f.BatchSize {
if err := processBatch(); err != nil {
return err
}
Expand Down Expand Up @@ -242,7 +257,7 @@ func (f *Format) updateCache(ctx context.Context) func() error {
func (f *Format) walkFilesystem(ctx context.Context) func() error {
return func() error {
eg, ctx := errgroup.WithContext(ctx)
pathsCh := make(chan string, BatchSize)
pathsCh := make(chan string, f.BatchSize)

// By default, we use the cli arg, but if the stdin flag has been set we force a filesystem walk
// since we will only be processing one file from a temp directory
Expand Down Expand Up @@ -352,7 +367,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
}

// process the batch if it's full, or we've been asked to flush partial batches
if flush || len(batch) == BatchSize {
if flush || len(batch) == f.BatchSize {

// copy the batch as we re-use it for the next batch
tasks := make([]*format.Task, len(batch))
Expand Down
27 changes: 27 additions & 0 deletions cli/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,33 @@ func TestOnUnmatched(t *testing.T) {
checkOutput("DEBU", out)
}

func TestBatchSize(t *testing.T) {
as := require.New(t)

// capture current cwd, so we can replace it after the test is finished
cwd, err := os.Getwd()
as.NoError(err)

t.Cleanup(func() {
// return to the previous working directory
as.NoError(os.Chdir(cwd))
})

tempDir := test.TempExamples(t)

// 0 batch size
_, err = cmd(t, "-C", tempDir, "--allow-missing-formatter", "-b", "0")
as.ErrorIs(err, ErrInvalidBatchSize)

_, err = cmd(t, "-C", tempDir, "-c", "--allow-missing-formatter", "-b", "1")
as.NoError(err)
assertStats(t, as, 31, 31, 21, 0)

_, err = cmd(t, "-C", tempDir, "-c", "--allow-missing-formatter", "-b", "100")
as.NoError(err)
assertStats(t, as, 31, 31, 21, 0)
}

func TestCpuProfile(t *testing.T) {
as := require.New(t)
tempDir := test.TempExamples(t)
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
// Config is used to represent the list of configured Formatters.
type Config struct {
Global struct {
// BatchSize controls the maximum number of paths to batch before applying them to a sequence of formatters.
BatchSize int `toml:"batch_size"`
// Excludes is an optional list of glob patterns used to exclude certain files from all formatters.
Excludes []string `toml:"excludes"`
} `toml:"global"`
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestReadConfigFile(t *testing.T) {

as.NotNil(cfg)

as.Equal(10, cfg.Global.BatchSize)
as.Equal([]string{"*.toml"}, cfg.Global.Excludes)

// python
Expand Down
2 changes: 2 additions & 0 deletions config/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ type Formatter struct {
Excludes []string `toml:"excludes,omitempty"`
// Indicates the order of precedence when executing this Formatter in a sequence of Formatters.
Priority int `toml:"priority,omitempty"`
// BatchSize controls the maximum number of paths to apply to the formatter in one invocation.
BatchSize int `toml:"batch_size"`
}
1 change: 1 addition & 0 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ priority = 1
## Global Options

- `excludes` - an optional list of [glob patterns](#glob-patterns-format) used to exclude certain files from all formatters.
- `batch_size` - the maximum number of files each formatter invocation receives. Setting it to a smaller size can be used to increase parallelism. (default: 1024)

## Formatter Options

Expand Down
48 changes: 44 additions & 4 deletions format/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os/exec"
"time"

"golang.org/x/sync/errgroup"

"git.numtide.com/numtide/treefmt/walk"

"git.numtide.com/numtide/treefmt/config"
Expand Down Expand Up @@ -48,6 +50,48 @@ func (f *Formatter) Priority() int {

func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {
start := time.Now()
defer func() {
f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))
}()

if f.config.BatchSize == 0 {
// apply as one single batch
return f.applyBatch(ctx, tasks)
}

// otherwise we create smaller batches and apply them concurrently
// todo how to constrain the overall number of 'threads' as we want a separate group here for the eg.Wait()
eg := errgroup.Group{}

var batch []*Task
for idx := range tasks {
batch = append(batch, tasks[idx])
if len(batch) == f.config.BatchSize {
// copy the batch as we re-use it for the next batch
next := make([]*Task, len(batch))
copy(next, batch)

// fire off a routine to process the next batch
eg.Go(func() error {
return f.applyBatch(ctx, next)
})
// reset batch for next iteration
batch = batch[:0]
}
}

// flush final partial batch
if len(batch) > 0 {
eg.Go(func() error {
return f.applyBatch(ctx, batch)
})
}

return eg.Wait()
}

func (f *Formatter) applyBatch(ctx context.Context, tasks []*Task) error {
f.log.Debugf("applying batch, size = %d", len(tasks))

// construct args, starting with config
args := f.config.Options
Expand Down Expand Up @@ -80,10 +124,6 @@ func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {
return fmt.Errorf("formatter '%s' with options '%v' failed to apply: %w", f.config.Command, f.config.Options, err)
}

//

f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))

return nil
}

Expand Down
1 change: 1 addition & 0 deletions test/examples/treefmt.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# One CLI to format the code tree - https://git.numtide.com/numtide/treefmt

[global]
batch_size = 10
excludes = ["*.toml"]

[formatter.python]
Expand Down
Loading