Skip to content

Commit

Permalink
Address Go routine memory access issue that caused #164 by simplifyin…
Browse files Browse the repository at this point in the history
…g startup

Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Sep 19, 2024
1 parent d499ac3 commit dd3fa18
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 155 deletions.
1 change: 0 additions & 1 deletion core/go/internal/msgs/en_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ var (
MsgBlockIndexerESAlreadyInit = ffe("PD011304", "Event stream already initialized")
MsgBlockIndexerConfirmedReceiptNotFound = ffe("PD011305", "Receipt for confirmed transaction %s not found")
MsgBlockIndexerInvalidEventStreamType = ffe("PD011306", "Unsupported event stream type: %s")
MsgBlockMissingHandler = ffe("PD011307", "Handler not registered for stream")
MsgBlockIndexerNoBlocksIndexed = ffe("PD011308", "No confirmed blocks have yet been indexed")
MsgBlockIndexerTransactionReverted = ffe("PD011309", "Transaction reverted: %s")

Expand Down
70 changes: 17 additions & 53 deletions core/go/pkg/blockindexer/event_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package blockindexer

import (
"context"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -45,8 +44,6 @@ type eventStream struct {
batchTimeout time.Duration
blocks chan *eventStreamBlock
dispatch chan *eventDispatch
handlerLock sync.Mutex
waitForHandler chan struct{}
handler InternalStreamCallback
detectorDone chan struct{}
dispatcherDone chan struct{}
Expand Down Expand Up @@ -88,7 +85,7 @@ func (bi *blockIndexer) loadEventStreams(ctx context.Context) error {
}

for _, esDefinition := range eventStreams {
bi.initEventStream(esDefinition)
bi.initEventStream(esDefinition, nil /* no handler at this point */)
}
return nil
}
Expand Down Expand Up @@ -160,25 +157,13 @@ func (bi *blockIndexer) upsertInternalEventStream(ctx context.Context, ies *Inte

// We call init here
// TODO: Full stop/start lifecycle
es := bi.initEventStream(def)

// Register the internal handler against the new or existing stream
es.attachHandler(ies.Handler)
es := bi.initEventStream(def, ies.Handler)

return es, nil
}

func (es *eventStream) attachHandler(handler InternalStreamCallback) {
es.handlerLock.Lock()
prevHandler := es.handler
es.handler = handler
if prevHandler == nil {
close(es.waitForHandler)
}
es.handlerLock.Unlock()
}

func (bi *blockIndexer) initEventStream(definition *EventStream) *eventStream {
// Note that the event stream must be stopped when this is called
func (bi *blockIndexer) initEventStream(definition *EventStream, handler InternalStreamCallback) *eventStream {
bi.eventStreamsLock.Lock()
defer bi.eventStreamsLock.Unlock()

Expand All @@ -190,20 +175,22 @@ func (bi *blockIndexer) initEventStream(definition *EventStream) *eventStream {
es.definition.Config = definition.Config
} else {
es = &eventStream{
bi: bi,
definition: definition,
eventABIs: []*abi.Entry{},
signatures: make(map[string]bool),
blocks: make(chan *eventStreamBlock, bi.esBlockDispatchQueueLength),
dispatch: make(chan *eventDispatch, batchSize),
waitForHandler: make(chan struct{}),
bi: bi,
definition: definition,
eventABIs: []*abi.Entry{},
signatures: make(map[string]bool),
blocks: make(chan *eventStreamBlock, bi.esBlockDispatchQueueLength),
dispatch: make(chan *eventDispatch, batchSize),
}
}

// Set the batch config
es.batchSize = batchSize
es.batchTimeout = confutil.DurationMin(definition.Config.BatchTimeout, 0, *EventStreamDefaults.BatchTimeout)

// Note the handler will be nil when this is first called on startup before we've been passed handlers.
es.handler = handler

// Calculate all the signatures we require
for _, abiEntry := range definition.ABI {
if abiEntry.Type == abi.Event {
Expand Down Expand Up @@ -231,30 +218,15 @@ func (bi *blockIndexer) startEventStreams() {
}

func (es *eventStream) start() {
if es.detectorDone == nil && es.dispatcherDone == nil {
if es.handler != nil && es.detectorDone == nil && es.dispatcherDone == nil {
es.ctx, es.cancelCtx = context.WithCancel(log.WithLogField(es.bi.parentCtxForReset, "eventstream", es.definition.ID.String()))
es.detectorDone = make(chan struct{})
es.dispatcherDone = make(chan struct{})
es.run()
go es.detector()
go es.dispatcher()
}
}

func (es *eventStream) run() {

select {
case <-es.waitForHandler:
case <-es.ctx.Done():
log.L(es.ctx).Debugf("stopping before event handler registered")
close(es.detectorDone)
close(es.dispatcherDone)
return
}

go es.detector()
go es.dispatcher()

}

func (es *eventStream) stop() {
if es.cancelCtx != nil {
es.cancelCtx()
Expand Down Expand Up @@ -475,15 +447,7 @@ func (es *eventStream) runBatch(batch *eventBatch) error {
return es.bi.retry.Do(es.ctx, func(attempt int) (retryable bool, err error) {
var postCommit PostCommit
err = es.bi.persistence.DB().Transaction(func(tx *gorm.DB) (err error) {

es.handlerLock.Lock()
handler := es.handler
es.handlerLock.Unlock()

if handler == nil {
return i18n.NewError(es.ctx, msgs.MsgBlockMissingHandler)
}
postCommit, err = handler(es.ctx, tx, &batch.EventDeliveryBatch)
postCommit, err = es.handler(es.ctx, tx, &batch.EventDeliveryBatch)
if err != nil {
return err
}
Expand Down
101 changes: 0 additions & 101 deletions core/go/pkg/blockindexer/event_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,107 +649,6 @@ func TestDispatcherDispatchClosed(t *testing.T) {
assert.True(t, called)
}

func TestDispatcherRunLateHandler(t *testing.T) {
ctx, bi, _, p, done := newMockBlockIndexer(t, &Config{})
defer done()

p.Mock.ExpectBegin()
p.Mock.ExpectRollback()

called := false

bi.retry.UTSetMaxAttempts(1)
es := &eventStream{
bi: bi,
ctx: ctx,
definition: &EventStream{
ID: uuid.New(),
Type: tktypes.Enum[EventStreamType]("wrong"),
ABI: testABI,
},
eventABIs: testABI,
batchSize: 2, // aim for two
batchTimeout: 1 * time.Microsecond, // but not going to wait
dispatch: make(chan *eventDispatch),
dispatcherDone: make(chan struct{}),
detectorDone: make(chan struct{}),
waitForHandler: make(chan struct{}),
}
go func() {
assert.NotPanics(t, func() { es.run() })
}()

time.Sleep(1 * time.Millisecond)
es.attachHandler(func(ctx context.Context, tx *gorm.DB, batch *EventDeliveryBatch) (PostCommit, error) {
called = true
return nil, fmt.Errorf("pop")
})

es.dispatch <- &eventDispatch{
event: &EventWithData{
IndexedEvent: &IndexedEvent{},
},
}

<-es.dispatcherDone

assert.True(t, called)
}

func TestDispatcherRunMissingHandler(t *testing.T) {
ctx, bi, _, p, done := newMockBlockIndexer(t, &Config{})
defer done()

p.Mock.ExpectBegin()
p.Mock.ExpectRollback()

bi.retry.UTSetMaxAttempts(1)
es := &eventStream{
bi: bi,
ctx: ctx,
definition: &EventStream{
ID: uuid.New(),
Type: tktypes.Enum[EventStreamType]("wrong"),
ABI: testABI,
},
eventABIs: testABI,
batchSize: 2, // aim for two
batchTimeout: 1 * time.Microsecond, // but not going to wait
dispatch: make(chan *eventDispatch),
dispatcherDone: make(chan struct{}),
waitForHandler: make(chan struct{}),
}
close(es.waitForHandler)

go func() {
assert.NotPanics(t, func() { es.dispatcher() })
}()

es.dispatch <- &eventDispatch{
event: &EventWithData{
IndexedEvent: &IndexedEvent{},
},
}

<-es.dispatcherDone
}

func TestDispatcherCloseBeforeHandler(t *testing.T) {
ctx, bi, _, _, done := newMockBlockIndexer(t, &Config{})
defer done()

cancelledCtx, cancelCtx := context.WithCancel(ctx)
cancelCtx()
es := &eventStream{
bi: bi,
ctx: cancelledCtx,
waitForHandler: make(chan struct{}),
detectorDone: make(chan struct{}),
dispatcherDone: make(chan struct{}),
}
es.run()
}

func TestProcessCatchupEventPageFailRPC(t *testing.T) {
ctx, bi, mRPC, p, done := newMockBlockIndexer(t, &Config{})
defer done()
Expand Down

0 comments on commit dd3fa18

Please sign in to comment.