Skip to content

Commit

Permalink
More simplification and debug
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Sep 19, 2024
1 parent 92c2292 commit 515aed2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 19 deletions.
16 changes: 10 additions & 6 deletions core/go/pkg/blockindexer/block_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,20 +853,23 @@ func (bi *blockIndexer) enrichTransactionEvents(ctx context.Context, abi abi.ABI
}

// Spin through the logs to find the corresponding result entries
for _, l := range receipt.Logs {
for _, e := range events {
for _, e := range events {
for _, l := range receipt.Logs {
if ethtypes.HexUint64(e.LogIndex) == l.LogIndex {
bi.matchLog(ctx, abi, l, e, nil)
// We decode the data if possible (sets .Data on each event if there's a match)
if bi.matchLog(ctx, abi, l, e, nil) {
break // next log
}
}
}
}
return nil
}

func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRPC, out *EventWithData, source *tktypes.EthAddress) {
func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRPC, out *EventWithData, source *tktypes.EthAddress) bool {
if !source.IsZero() && !source.Equals((*tktypes.EthAddress)(in.Address)) {
log.L(ctx).Debugf("Event %d/%d/%d does not match source=%s (tx=%s,address=%s)", in.BlockNumber, in.TransactionIndex, in.LogIndex, source, in.TransactionHash, in.Address)
return
return false
}
// This is one that matches our signature, but we need to check it against our ABI list.
// We stop at the first entry that parses it, and it's perfectly fine and expected that
Expand All @@ -883,9 +886,10 @@ func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRP
if in.Address != nil {
out.Address = tktypes.EthAddress(*in.Address)
}
return
return true
} else {
log.L(ctx).Debugf("Event %d/%d/%d does not match ABI event %s matchSource=%v (tx=%s,address=%s): %s", in.BlockNumber, in.TransactionIndex, in.LogIndex, abiEntry, source, in.TransactionHash, in.Address, err)
}
}
return false
}
20 changes: 7 additions & 13 deletions core/go/pkg/blockindexer/event_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,27 +192,25 @@ func (bi *blockIndexer) initEventStream(ctx context.Context, definition *EventSt
es.handler = handler

location := "*"
if definition.Source != nil {
source := *definition.Source
es.definition.Source = &source
location = source.String()
if es.definition.Source != nil {
location = es.definition.Source.String()
}

// Calculate all the signatures we require
sigStrings := []string{}
solStrings := []string{}
for _, abiEntry := range definition.ABI {
if abiEntry.Type == abi.Event {
sig := tktypes.NewBytes32FromSlice(abiEntry.SignatureHashBytes())
sigStr := sig.String()
es.eventABIs = append(es.eventABIs, abiEntry)
if _, dup := es.signatures[sigStr]; !dup {
es.signatures[sigStr] = true
sigStrings = append(sigStrings, sigStr)
solStrings = append(solStrings, abiEntry.SolString())
es.signatureList = append(es.signatureList, sig)
}
}
}
log.L(ctx).Infof("Event stream %s configured on sourceMatch=%s signatures: %s", es.definition.ID, location, sigStrings)
log.L(ctx).Infof("Event stream %s configured matchSource=%s events: %s", es.definition.ID, location, solStrings)

// ok - all looks good, put ourselves in the blockindexer list
bi.eventStreams[definition.ID] = es
Expand Down Expand Up @@ -370,13 +368,13 @@ func (es *eventStream) detector() {
}

func (es *eventStream) processNotifiedBlock(block *eventStreamBlock, fullBlock bool) {
matchSource := es.definition.Source
for i, l := range block.events {
event := &EventWithData{
IndexedEvent: es.bi.logToIndexedEvent(l),
}
es.matchLog(l, event)
// Only dispatch events that were completed by the validation against our ABI
if event.Data != nil {
if es.bi.matchLog(es.ctx, es.eventABIs, l, event, matchSource) {
es.sendToDispatcher(event,
// Can only move checkpoint past this block once we know we've processed the last one
fullBlock && i == (len(block.events)-1))
Expand Down Expand Up @@ -574,7 +572,3 @@ func (es *eventStream) processCatchupEventPage(checkpointBlock int64, catchUpToB
func (es *eventStream) queryTransactionEvents(tx tktypes.Bytes32, events []*EventWithData, done chan error) {
done <- es.bi.enrichTransactionEvents(es.ctx, es.eventABIs, tx, events, true /* retry indefinitely */)
}

func (es *eventStream) matchLog(in *LogJSONRPC, out *EventWithData) {
es.bi.matchLog(es.ctx, es.eventABIs, in, out, es.definition.Source)
}
42 changes: 42 additions & 0 deletions core/go/pkg/blockindexer/event_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,48 @@ func TestInternalEventStreamDeliveryCatchUp(t *testing.T) {
}
}

func TestNoMatchingEvents(t *testing.T) {

// This test uses a real DB, includes the full block indexer, but simulates the blockchain.
_, bi, mRPC, blDone := newTestBlockIndexer(t)
defer blDone()

// Mock up the block calls to the blockchain for 15 blocks
blocks, receipts := testBlockArray(t, 15)
mockBlocksRPCCalls(mRPC, blocks, receipts)
mockBlockListenerNil(mRPC)

// Create a matcher that only mismatched on indexed - so same signature
testABICopy := testParseABI(testEventABIJSON)
testABICopy[1].Inputs[0].Indexed = !testABICopy[1].Inputs[0].Indexed

// Do a full start now with an internal event listener
err := bi.Start(&InternalEventStream{
Handler: func(ctx context.Context, tx *gorm.DB, batch *EventDeliveryBatch) (PostCommit, error) {
require.Fail(t, "should not be called")
return nil, nil
},
Definition: &EventStream{
Name: "unit_test",
Config: EventStreamConfig{
BatchSize: confutil.P(1),
BatchTimeout: confutil.P("5ms"),
},
// Listen to two out of three event types
ABI: abi.ABI{
// Mismatched only on index
testABICopy[1],
},
},
})
require.NoError(t, err)

for i := 0; i < 15; i++ {
<-bi.utBatchNotify
}

}

func TestStartBadInternalEventStream(t *testing.T) {

// This test uses a real DB, includes the full block indexer, but simulates the blockchain.
Expand Down

0 comments on commit 515aed2

Please sign in to comment.