Skip to content

Commit

Permalink
Add queue (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
mslipper authored Jan 8, 2025
1 parent b57f931 commit d0c628f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 5 deletions.
3 changes: 2 additions & 1 deletion cmd/bailiff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func main() {
wl := bailiff.NewTeamWhitelist(cfg.Org, cfg.AdminTeams, gh)

repusher := bailiff.NewShellRepusher(l.New("module", "shell-repusher"), workdir, envCfg.PrivateKeyFile)
eh := bailiff.NewEventHandler(gh, wl, cfg, workdir, l, repusher)
asyncRepusher := bailiff.NewAsyncRepusher(l.New("module", "async-repusher"), repusher)
eh := bailiff.NewEventHandler(gh, wl, cfg, workdir, l, asyncRepusher)
srv := bailiff.NewServer(l, envCfg.WebhookSecret, eh)

repoURL := fmt.Sprintf("[email protected]:%s/%s.git", cfg.Org, cfg.Repo)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down
107 changes: 103 additions & 4 deletions internal/pkg/bailiff/repush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,123 @@ import (
"io"
"os/exec"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
)

//go:embed repush.sh
var scriptSrc string

const maxAsyncRepushes = 10

type Repusher interface {
Repush(ctx context.Context, forkRepo, srcBranch, upstreamBranch, requestedSHA string) error
}

type repushReq struct {
forkRepo string
srcBranch string
upstreamBranch string
requestedSHA string
}

type AsnycRepusher struct {
lgr log.Logger
queue []repushReq
repusher Repusher
mtx sync.Mutex
doneC chan struct{}
closed bool
workers int
}

func NewAsyncRepusher(lgr log.Logger, repusher Repusher) *AsnycRepusher {
return &AsnycRepusher{
lgr: lgr,
repusher: repusher,
doneC: make(chan struct{}),
}
}

func (a *AsnycRepusher) Repush(ctx context.Context, forkRepo, srcBranch, upstreamBranch, requestedSHA string) error {
a.mtx.Lock()
defer a.mtx.Unlock()

if a.closed {
return fmt.Errorf("repusher is closed")
}

if len(a.queue) >= maxAsyncRepushes {
return fmt.Errorf("queue is full")
}

a.queue = append(a.queue, repushReq{
forkRepo: forkRepo,
srcBranch: srcBranch,
upstreamBranch: upstreamBranch,
requestedSHA: requestedSHA,
})

a.workers++
go a.processQueue()

return nil
}

func (a *AsnycRepusher) processQueue() {
a.mtx.Lock()
head := a.queue[0]
a.queue = a.queue[1:]
a.mtx.Unlock()

defer func() {
a.mtx.Lock()
a.workers--
if a.workers == 0 && a.closed {
a.lgr.Info("all workers finished, shutting down")
close(a.doneC)
}
a.mtx.Unlock()
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

lgr := a.lgr.New(
"forkRepo", head.forkRepo,
"srcBranch", head.srcBranch,
"upstreamBranch", head.upstreamBranch,
"requestedSHA", head.requestedSHA,
)

if err := a.repusher.Repush(ctx, head.forkRepo, head.srcBranch, head.upstreamBranch, head.requestedSHA); err != nil {
lgr.Error(fmt.Sprintf("repush failed: %s", err))
return
}

lgr.Info("repush succeeded")
}

func (a *AsnycRepusher) Close() {
a.mtx.Lock()
if a.closed {
a.mtx.Unlock()
return
}
a.closed = true
a.mtx.Unlock()

<-a.doneC
}

type ShellRepusher struct {
lgr log.Logger
workdir string
privateKeyFile string
mtx sync.Mutex
}

type Repusher interface {
Repush(ctx context.Context, forkRepo, srcBranch, upstreamBranch, requestedSHA string) error
}

func NewShellRepusher(lgr log.Logger, workdir string, privateKeyFile string) *ShellRepusher {
return &ShellRepusher{
lgr: lgr,
Expand Down
43 changes: 43 additions & 0 deletions internal/pkg/bailiff/repush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bailiff

import (
"context"
"fmt"
"testing"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type testifyMockRepusher struct {
mock.Mock
}

func (m *testifyMockRepusher) Repush(ctx context.Context, forkRepo, srcBranch, upstreamBranch, requestedSHA string) error {
args := m.Called(ctx, forkRepo, srcBranch, upstreamBranch, requestedSHA)
return args.Error(0)
}

func TestAsyncRepusher(t *testing.T) {
innerRepusher := new(testifyMockRepusher)
asyncRepusher := NewAsyncRepusher(testlog.Logger(t, log.LevelInfo), innerRepusher)

for i := 0; i < maxAsyncRepushes; i++ {
iStr := fmt.Sprintf("%d", i)
innerRepusher.On("Repush", mock.Anything, "forkRepo"+iStr, "srcBranch"+iStr, "upstreamBranch"+iStr, "requestedSHA"+iStr).Return(nil)
require.NoError(t, asyncRepusher.Repush(
context.Background(),
"forkRepo"+iStr,
"srcBranch"+iStr,
"upstreamBranch"+iStr,
"requestedSHA"+iStr,
))
}

asyncRepusher.Close()
innerRepusher.AssertExpectations(t)

require.ErrorContains(t, asyncRepusher.Repush(context.Background(), "forkRepo", "srcBranch", "upstreamBranch", "requestedSHA"), "closed")
}

0 comments on commit d0c628f

Please sign in to comment.