From 515aed276febf953638fac42b2b04a868685962e Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 19 Sep 2024 18:37:51 -0400 Subject: [PATCH] More simplification and debug Signed-off-by: Peter Broadhurst --- core/go/pkg/blockindexer/block_indexer.go | 16 ++++--- core/go/pkg/blockindexer/event_streams.go | 20 ++++----- .../go/pkg/blockindexer/event_streams_test.go | 42 +++++++++++++++++++ 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/core/go/pkg/blockindexer/block_indexer.go b/core/go/pkg/blockindexer/block_indexer.go index eb37c60f4..56978f2ae 100644 --- a/core/go/pkg/blockindexer/block_indexer.go +++ b/core/go/pkg/blockindexer/block_indexer.go @@ -853,20 +853,23 @@ func (bi *blockIndexer) enrichTransactionEvents(ctx context.Context, abi abi.ABI } // Spin through the logs to find the corresponding result entries - for _, l := range receipt.Logs { - for _, e := range events { + for _, e := range events { + for _, l := range receipt.Logs { if ethtypes.HexUint64(e.LogIndex) == l.LogIndex { - bi.matchLog(ctx, abi, l, e, nil) + // We decode the data if possible (sets .Data on each event if there's a match) + if bi.matchLog(ctx, abi, l, e, nil) { + break // next log + } } } } return nil } -func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRPC, out *EventWithData, source *tktypes.EthAddress) { +func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRPC, out *EventWithData, source *tktypes.EthAddress) bool { if !source.IsZero() && !source.Equals((*tktypes.EthAddress)(in.Address)) { log.L(ctx).Debugf("Event %d/%d/%d does not match source=%s (tx=%s,address=%s)", in.BlockNumber, in.TransactionIndex, in.LogIndex, source, in.TransactionHash, in.Address) - return + return false } // This is one that matches our signature, but we need to check it against our ABI list. // We stop at the first entry that parses it, and it's perfectly fine and expected that @@ -883,9 +886,10 @@ func (bi *blockIndexer) matchLog(ctx context.Context, abi abi.ABI, in *LogJSONRP if in.Address != nil { out.Address = tktypes.EthAddress(*in.Address) } - return + return true } else { log.L(ctx).Debugf("Event %d/%d/%d does not match ABI event %s matchSource=%v (tx=%s,address=%s): %s", in.BlockNumber, in.TransactionIndex, in.LogIndex, abiEntry, source, in.TransactionHash, in.Address, err) } } + return false } diff --git a/core/go/pkg/blockindexer/event_streams.go b/core/go/pkg/blockindexer/event_streams.go index 401177ee0..f7c0481ab 100644 --- a/core/go/pkg/blockindexer/event_streams.go +++ b/core/go/pkg/blockindexer/event_streams.go @@ -192,14 +192,12 @@ func (bi *blockIndexer) initEventStream(ctx context.Context, definition *EventSt es.handler = handler location := "*" - if definition.Source != nil { - source := *definition.Source - es.definition.Source = &source - location = source.String() + if es.definition.Source != nil { + location = es.definition.Source.String() } // Calculate all the signatures we require - sigStrings := []string{} + solStrings := []string{} for _, abiEntry := range definition.ABI { if abiEntry.Type == abi.Event { sig := tktypes.NewBytes32FromSlice(abiEntry.SignatureHashBytes()) @@ -207,12 +205,12 @@ func (bi *blockIndexer) initEventStream(ctx context.Context, definition *EventSt es.eventABIs = append(es.eventABIs, abiEntry) if _, dup := es.signatures[sigStr]; !dup { es.signatures[sigStr] = true - sigStrings = append(sigStrings, sigStr) + solStrings = append(solStrings, abiEntry.SolString()) es.signatureList = append(es.signatureList, sig) } } } - log.L(ctx).Infof("Event stream %s configured on sourceMatch=%s signatures: %s", es.definition.ID, location, sigStrings) + log.L(ctx).Infof("Event stream %s configured matchSource=%s events: %s", es.definition.ID, location, solStrings) // ok - all looks good, put ourselves in the blockindexer list bi.eventStreams[definition.ID] = es @@ -370,13 +368,13 @@ func (es *eventStream) detector() { } func (es *eventStream) processNotifiedBlock(block *eventStreamBlock, fullBlock bool) { + matchSource := es.definition.Source for i, l := range block.events { event := &EventWithData{ IndexedEvent: es.bi.logToIndexedEvent(l), } - es.matchLog(l, event) // Only dispatch events that were completed by the validation against our ABI - if event.Data != nil { + if es.bi.matchLog(es.ctx, es.eventABIs, l, event, matchSource) { es.sendToDispatcher(event, // Can only move checkpoint past this block once we know we've processed the last one fullBlock && i == (len(block.events)-1)) @@ -574,7 +572,3 @@ func (es *eventStream) processCatchupEventPage(checkpointBlock int64, catchUpToB func (es *eventStream) queryTransactionEvents(tx tktypes.Bytes32, events []*EventWithData, done chan error) { done <- es.bi.enrichTransactionEvents(es.ctx, es.eventABIs, tx, events, true /* retry indefinitely */) } - -func (es *eventStream) matchLog(in *LogJSONRPC, out *EventWithData) { - es.bi.matchLog(es.ctx, es.eventABIs, in, out, es.definition.Source) -} diff --git a/core/go/pkg/blockindexer/event_streams_test.go b/core/go/pkg/blockindexer/event_streams_test.go index 8f5c3e97a..3ce9151e9 100644 --- a/core/go/pkg/blockindexer/event_streams_test.go +++ b/core/go/pkg/blockindexer/event_streams_test.go @@ -304,6 +304,48 @@ func TestInternalEventStreamDeliveryCatchUp(t *testing.T) { } } +func TestNoMatchingEvents(t *testing.T) { + + // This test uses a real DB, includes the full block indexer, but simulates the blockchain. + _, bi, mRPC, blDone := newTestBlockIndexer(t) + defer blDone() + + // Mock up the block calls to the blockchain for 15 blocks + blocks, receipts := testBlockArray(t, 15) + mockBlocksRPCCalls(mRPC, blocks, receipts) + mockBlockListenerNil(mRPC) + + // Create a matcher that only mismatched on indexed - so same signature + testABICopy := testParseABI(testEventABIJSON) + testABICopy[1].Inputs[0].Indexed = !testABICopy[1].Inputs[0].Indexed + + // Do a full start now with an internal event listener + err := bi.Start(&InternalEventStream{ + Handler: func(ctx context.Context, tx *gorm.DB, batch *EventDeliveryBatch) (PostCommit, error) { + require.Fail(t, "should not be called") + return nil, nil + }, + Definition: &EventStream{ + Name: "unit_test", + Config: EventStreamConfig{ + BatchSize: confutil.P(1), + BatchTimeout: confutil.P("5ms"), + }, + // Listen to two out of three event types + ABI: abi.ABI{ + // Mismatched only on index + testABICopy[1], + }, + }, + }) + require.NoError(t, err) + + for i := 0; i < 15; i++ { + <-bi.utBatchNotify + } + +} + func TestStartBadInternalEventStream(t *testing.T) { // This test uses a real DB, includes the full block indexer, but simulates the blockchain.