Skip to content

Commit

Permalink
Use buffered listener for NewHeads subscription (#1625)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored May 4, 2024
1 parent 3fc8d42 commit aee2743
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ linters:
disable-all: true
enable:
- bodyclose
- deadcode
# - depguard ## see https://github.com/golangci/golangci-lint/issues/3906
- dogsled
- exportloopref
Expand Down
42 changes: 29 additions & 13 deletions evmrpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const SleepInterval = 5 * time.Second
const NewHeadsListenerBuffer = 10

type SubscriptionAPI struct {
tmClient rpcclient.Client
Expand Down Expand Up @@ -64,24 +65,35 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, logFetcher *LogFetcher, subsc
continue
}
api.newHeadListenersMtx.Lock()
for _, c := range api.newHeadListeners {
c := c
go func() {
defer func() {
// if the channel is already closed, sending to it will panic
if err := recover(); err != nil {
return
}
}()
c <- ethHeader
}()
toDelete := []rpc.ID{}
for id, c := range api.newHeadListeners {
if !handleListener(c, ethHeader) {
toDelete = append(toDelete, id)
}
}
for _, id := range toDelete {
delete(api.newHeadListeners, id)
}
api.newHeadListenersMtx.Unlock()
}
}()
return api
}

func handleListener(c chan map[string]interface{}, ethHeader map[string]interface{}) bool {
// if the channel is already closed, sending to it/closing it will panic
defer func() { _ = recover() }()
select {
case c <- ethHeader:
return true
default:
// this path is hit when the buffer is full, meaning that the subscriber is not consuming
// fast enough
close(c)
return false
}
}

func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, err error) {
defer recordMetrics("eth_newHeads", a.connectionType, time.Now(), err == nil)
notifier, supported := rpc.NotifierFromContext(ctx)
Expand All @@ -90,7 +102,7 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er
}

rpcSub := notifier.CreateSubscription()
listener := make(chan map[string]interface{})
listener := make(chan map[string]interface{}, NewHeadsListenerBuffer)
a.newHeadListenersMtx.Lock()
defer a.newHeadListenersMtx.Unlock()
if uint64(len(a.newHeadListeners)) >= a.subscriptonConfig.newHeadLimit {
Expand All @@ -102,11 +114,14 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er
OUTER:
for {
select {
case res := <-listener:
case res, ok := <-listener:
err = notifier.Notify(rpcSub.ID, res)
if err != nil {
break OUTER
}
if !ok {
break OUTER
}
case <-rpcSub.Err():
break OUTER
case <-notifier.Closed():
Expand All @@ -116,6 +131,7 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er
a.newHeadListenersMtx.Lock()
defer a.newHeadListenersMtx.Unlock()
delete(a.newHeadListeners, rpcSub.ID)
defer func() { _ = recover() }() // might have already been closed
close(listener)
}()

Expand Down

0 comments on commit aee2743

Please sign in to comment.