From dea4d6d518114581c4fd7868c11ad253455d2f33 Mon Sep 17 00:00:00 2001 From: pharr117 Date: Sat, 14 Sep 2024 03:23:26 -0400 Subject: [PATCH] Normalize sdk v0.4x and v0.5x block events by unmerging sdk v0.5x FinalizeBlockEvents back into BeginBlock and EndBlock events --- core/block_events.go | 4 +-- core/processor.go | 3 +- core/rpc_worker.go | 66 +++++++++++++++++++++++++++++++++++++++-- core/tx.go | 3 +- indexer/registration.go | 4 +-- rpc/blocks.go | 22 ++++++++++---- 6 files changed, 87 insertions(+), 15 deletions(-) diff --git a/core/block_events.go b/core/block_events.go index 73f8ac8b..98b5d2a6 100644 --- a/core/block_events.go +++ b/core/block_events.go @@ -10,10 +10,10 @@ import ( "github.com/DefiantLabs/cosmos-indexer/db/models" "github.com/DefiantLabs/cosmos-indexer/filter" "github.com/DefiantLabs/cosmos-indexer/parsers" - ctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/DefiantLabs/cosmos-indexer/rpc" ) -func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *ctypes.ResultBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) { +func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *rpc.CustomBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) { var blockDBWrapper db.BlockDBWrapper blockDBWrapper.Block = &block diff --git a/core/processor.go b/core/processor.go index e40d63d5..e634e80e 100644 --- a/core/processor.go +++ b/core/processor.go @@ -5,6 +5,7 @@ import ( "github.com/DefiantLabs/cosmos-indexer/config" "github.com/DefiantLabs/cosmos-indexer/db/models" + "github.com/DefiantLabs/cosmos-indexer/rpc" ctypes "github.com/cometbft/cometbft/rpc/core/types" sdkTypes "github.com/cosmos/cosmos-sdk/types" ) @@ -24,7 +25,7 @@ const ( type FailedBlockHandler func(height int64, code BlockProcessingFailure, err error) // Process RPC Block data into the model object used by the application. -func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *ctypes.ResultBlockResults, chainID uint) (models.Block, error) { +func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *rpc.CustomBlockResults, chainID uint) (models.Block, error) { block := models.Block{ Height: blockData.Block.Height, ChainID: chainID, diff --git a/core/rpc_worker.go b/core/rpc_worker.go index 21173374..59732d4b 100644 --- a/core/rpc_worker.go +++ b/core/rpc_worker.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "net/http" "sync" @@ -8,6 +9,7 @@ import ( dbTypes "github.com/DefiantLabs/cosmos-indexer/db" "github.com/DefiantLabs/cosmos-indexer/rpc" "github.com/DefiantLabs/probe/client" + abci "github.com/cometbft/cometbft/abci/types" ctypes "github.com/cometbft/cometbft/rpc/core/types" txTypes "github.com/cosmos/cosmos-sdk/types/tx" "gorm.io/gorm" @@ -16,7 +18,7 @@ import ( // Wrapper types for gathering full dataset. type IndexerBlockEventData struct { BlockData *ctypes.ResultBlock - BlockResultsData *ctypes.ResultBlockResults + BlockResultsData *rpc.CustomBlockResults BlockEventRequestsFailed bool GetTxsResponse *txTypes.GetTxsEventResponse TxRequestsFailed bool @@ -78,7 +80,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai currentHeightIndexerData.BlockResultsData = nil currentHeightIndexerData.BlockEventRequestsFailed = true } else { - currentHeightIndexerData.BlockResultsData = bresults + bresults, err = NormalizeCustomBlockResults(bresults) + if err != nil { + config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err) + err := dbTypes.UpsertFailedEventBlock(db, block.Height, chainStringID, cfg.Probe.ChainName) + if err != nil { + config.Log.Fatal("Failed to insert failed block event", err) + } + } else { + currentHeightIndexerData.BlockResultsData = bresults + } } } @@ -106,7 +117,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai // Only set failed when we can't get the block results either. currentHeightIndexerData.TxRequestsFailed = true } else { - currentHeightIndexerData.BlockResultsData = bresults + bresults, err = NormalizeCustomBlockResults(bresults) + if err != nil { + config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err) + err := dbTypes.UpsertFailedBlock(db, block.Height, chainStringID, cfg.Probe.ChainName) + if err != nil { + config.Log.Fatal("Failed to insert failed block", err) + } + } else { + currentHeightIndexerData.BlockResultsData = bresults + } } } @@ -118,3 +138,43 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai outputChannel <- currentHeightIndexerData } } + +func NormalizeCustomBlockResults(blockResults *rpc.CustomBlockResults) (*rpc.CustomBlockResults, error) { + if len(blockResults.FinalizeBlockEvents) != 0 { + beginBlockEvents := []abci.Event{} + endBlockEvents := []abci.Event{} + + for _, event := range blockResults.FinalizeBlockEvents { + eventAttrs := []abci.EventAttribute{} + isBeginBlock := false + isEndBlock := false + for _, attr := range event.Attributes { + if attr.Key == "mode" { + if attr.Value == "BeginBlock" { + isBeginBlock = true + } else if attr.Value == "EndBlock" { + isEndBlock = true + } + } else { + eventAttrs = append(eventAttrs, attr) + } + } + + switch { + case isBeginBlock && isEndBlock: + return nil, fmt.Errorf("finalize block event has both BeginBlock and EndBlock mode") + case !isBeginBlock && !isEndBlock: + return nil, fmt.Errorf("finalize block event has neither BeginBlock nor EndBlock mode") + case isBeginBlock: + beginBlockEvents = append(beginBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs}) + case isEndBlock: + endBlockEvents = append(endBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs}) + } + } + + blockResults.BeginBlockEvents = append(blockResults.BeginBlockEvents, beginBlockEvents...) + blockResults.EndBlockEvents = append(blockResults.EndBlockEvents, endBlockEvents...) + } + + return blockResults, nil +} diff --git a/core/tx.go b/core/tx.go index b20dc734..4b1fda5a 100644 --- a/core/tx.go +++ b/core/tx.go @@ -15,6 +15,7 @@ import ( "github.com/DefiantLabs/cosmos-indexer/db/models" "github.com/DefiantLabs/cosmos-indexer/filter" "github.com/DefiantLabs/cosmos-indexer/parsers" + "github.com/DefiantLabs/cosmos-indexer/rpc" "github.com/DefiantLabs/cosmos-indexer/util" "github.com/DefiantLabs/probe/client" coretypes "github.com/cometbft/cometbft/rpc/core/types" @@ -31,7 +32,7 @@ func getUnexportedField(field reflect.Value) interface{} { return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() } -func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { +func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *rpc.CustomBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) { config.Log.Fatalf("blockResults & resultBlockRes: different length") } diff --git a/indexer/registration.go b/indexer/registration.go index 95312fed..f94c32ae 100644 --- a/indexer/registration.go +++ b/indexer/registration.go @@ -16,7 +16,6 @@ func (indexer *Indexer) RegisterCustomModuleBasics(basics []module.AppModuleBasi } func (indexer *Indexer) RegisterCustomMsgTypesByTypeURLs(customMessageTypeURLSToTypes map[string]sdkTypes.Msg) error { - if indexer.CustomMsgTypeRegistry == nil { indexer.CustomMsgTypeRegistry = make(map[string]sdkTypes.Msg) } @@ -24,9 +23,8 @@ func (indexer *Indexer) RegisterCustomMsgTypesByTypeURLs(customMessageTypeURLSTo for url, msg := range customMessageTypeURLSToTypes { if _, ok := indexer.CustomMsgTypeRegistry[url]; ok { return fmt.Errorf("found duplicate message type with URL \"%s\", message types must be uniquely identified", url) - } else { - indexer.CustomMsgTypeRegistry[url] = msg } + indexer.CustomMsgTypeRegistry[url] = msg } return nil diff --git a/rpc/blocks.go b/rpc/blocks.go index 5041ee23..847d71df 100644 --- a/rpc/blocks.go +++ b/rpc/blocks.go @@ -13,8 +13,9 @@ import ( "time" "github.com/DefiantLabs/cosmos-indexer/config" + abci "github.com/cometbft/cometbft/abci/types" tmjson "github.com/cometbft/cometbft/libs/json" - ctypes "github.com/cometbft/cometbft/rpc/core/types" + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" jsonrpc "github.com/cometbft/cometbft/rpc/jsonrpc/client" types "github.com/cometbft/cometbft/rpc/jsonrpc/types" ) @@ -140,8 +141,19 @@ func validateResponseID(id interface{}) error { return nil } -func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { - result := new(ctypes.ResultBlockResults) +// This type **should** cover SDK v0.4x and v0.50, but updates will need to be monitored +type CustomBlockResults struct { + Height int64 `json:"height"` + TxsResults []*abci.ResponseDeliverTx `json:"txs_results"` + BeginBlockEvents []abci.Event `json:"begin_block_events"` + EndBlockEvents []abci.Event `json:"end_block_events"` + ValidatorUpdates []abci.ValidatorUpdate `json:"validator_updates"` + ConsensusParamUpdates *cmtproto.ConsensusParams `json:"consensus_param_updates"` + FinalizeBlockEvents []abci.Event `json:"finalize_block_events"` +} + +func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*CustomBlockResults, error) { + result := new(CustomBlockResults) params := make(map[string]interface{}) if height != nil { params["height"] = height @@ -155,7 +167,7 @@ func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes. return result, nil } -func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, error) { +func GetBlockResult(client URIClient, height int64) (*CustomBlockResults, error) { brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() @@ -167,7 +179,7 @@ func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, return bresults, nil } -func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*ctypes.ResultBlockResults, error) { +func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*CustomBlockResults, error) { if retryMaxAttempts == 0 { return GetBlockResult(client, height) }