diff --git a/.golangci.yml b/.golangci.yml index 6490cac8c8..511f556fc4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,7 +9,6 @@ linters: disable-all: true enable: - bodyclose - - deadcode # - depguard ## see https://github.com/golangci/golangci-lint/issues/3906 - dogsled - exportloopref diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index b2abb4303b..f7b2bbe0ba 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -20,6 +20,7 @@ import ( ) const SleepInterval = 5 * time.Second +const NewHeadsListenerBuffer = 10 type SubscriptionAPI struct { tmClient rpcclient.Client @@ -64,17 +65,14 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, logFetcher *LogFetcher, subsc continue } api.newHeadListenersMtx.Lock() - for _, c := range api.newHeadListeners { - c := c - go func() { - defer func() { - // if the channel is already closed, sending to it will panic - if err := recover(); err != nil { - return - } - }() - c <- ethHeader - }() + toDelete := []rpc.ID{} + for id, c := range api.newHeadListeners { + if !handleListener(c, ethHeader) { + toDelete = append(toDelete, id) + } + } + for _, id := range toDelete { + delete(api.newHeadListeners, id) } api.newHeadListenersMtx.Unlock() } @@ -82,6 +80,20 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, logFetcher *LogFetcher, subsc return api } +func handleListener(c chan map[string]interface{}, ethHeader map[string]interface{}) bool { + // if the channel is already closed, sending to it/closing it will panic + defer func() { _ = recover() }() + select { + case c <- ethHeader: + return true + default: + // this path is hit when the buffer is full, meaning that the subscriber is not consuming + // fast enough + close(c) + return false + } +} + func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, err error) { defer recordMetrics("eth_newHeads", a.connectionType, time.Now(), err == nil) notifier, supported := rpc.NotifierFromContext(ctx) @@ -90,7 +102,7 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er } rpcSub := notifier.CreateSubscription() - listener := make(chan map[string]interface{}) + listener := make(chan map[string]interface{}, NewHeadsListenerBuffer) a.newHeadListenersMtx.Lock() defer a.newHeadListenersMtx.Unlock() if uint64(len(a.newHeadListeners)) >= a.subscriptonConfig.newHeadLimit { @@ -102,11 +114,14 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er OUTER: for { select { - case res := <-listener: + case res, ok := <-listener: err = notifier.Notify(rpcSub.ID, res) if err != nil { break OUTER } + if !ok { + break OUTER + } case <-rpcSub.Err(): break OUTER case <-notifier.Closed(): @@ -116,6 +131,7 @@ func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, er a.newHeadListenersMtx.Lock() defer a.newHeadListenersMtx.Unlock() delete(a.newHeadListeners, rpcSub.ID) + defer func() { _ = recover() }() // might have already been closed close(listener) }()