Skip to content

Commit

Permalink
Merge pull request #123 from DefiantLabs/pharr117/finalize-block-even…
Browse files Browse the repository at this point in the history
…ts-normalization

pharr117/block events sdk v0.4x and v0.5x normalization
  • Loading branch information
pharr117 authored Sep 14, 2024
2 parents 4b15faa + dea4d6d commit e61737b
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 15 deletions.
4 changes: 2 additions & 2 deletions core/block_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/filter"
"github.com/DefiantLabs/cosmos-indexer/parsers"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/DefiantLabs/cosmos-indexer/rpc"
)

func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *ctypes.ResultBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) {
func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *rpc.CustomBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) {
var blockDBWrapper db.BlockDBWrapper

blockDBWrapper.Block = &block
Expand Down
3 changes: 2 additions & 1 deletion core/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/DefiantLabs/cosmos-indexer/config"
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/rpc"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
sdkTypes "github.com/cosmos/cosmos-sdk/types"
)
Expand All @@ -24,7 +25,7 @@ const (
type FailedBlockHandler func(height int64, code BlockProcessingFailure, err error)

// Process RPC Block data into the model object used by the application.
func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *ctypes.ResultBlockResults, chainID uint) (models.Block, error) {
func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *rpc.CustomBlockResults, chainID uint) (models.Block, error) {
block := models.Block{
Height: blockData.Block.Height,
ChainID: chainID,
Expand Down
66 changes: 63 additions & 3 deletions core/rpc_worker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package core

import (
"fmt"
"net/http"
"sync"

"github.com/DefiantLabs/cosmos-indexer/config"
dbTypes "github.com/DefiantLabs/cosmos-indexer/db"
"github.com/DefiantLabs/cosmos-indexer/rpc"
"github.com/DefiantLabs/probe/client"
abci "github.com/cometbft/cometbft/abci/types"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
txTypes "github.com/cosmos/cosmos-sdk/types/tx"
"gorm.io/gorm"
Expand All @@ -16,7 +18,7 @@ import (
// Wrapper types for gathering full dataset.
type IndexerBlockEventData struct {
BlockData *ctypes.ResultBlock
BlockResultsData *ctypes.ResultBlockResults
BlockResultsData *rpc.CustomBlockResults
BlockEventRequestsFailed bool
GetTxsResponse *txTypes.GetTxsEventResponse
TxRequestsFailed bool
Expand Down Expand Up @@ -78,7 +80,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
currentHeightIndexerData.BlockResultsData = nil
currentHeightIndexerData.BlockEventRequestsFailed = true
} else {
currentHeightIndexerData.BlockResultsData = bresults
bresults, err = NormalizeCustomBlockResults(bresults)
if err != nil {
config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err)
err := dbTypes.UpsertFailedEventBlock(db, block.Height, chainStringID, cfg.Probe.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block event", err)
}
} else {
currentHeightIndexerData.BlockResultsData = bresults
}
}
}

Expand Down Expand Up @@ -106,7 +117,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
// Only set failed when we can't get the block results either.
currentHeightIndexerData.TxRequestsFailed = true
} else {
currentHeightIndexerData.BlockResultsData = bresults
bresults, err = NormalizeCustomBlockResults(bresults)
if err != nil {
config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err)
err := dbTypes.UpsertFailedBlock(db, block.Height, chainStringID, cfg.Probe.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block", err)
}
} else {
currentHeightIndexerData.BlockResultsData = bresults
}
}

}
Expand All @@ -118,3 +138,43 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
outputChannel <- currentHeightIndexerData
}
}

func NormalizeCustomBlockResults(blockResults *rpc.CustomBlockResults) (*rpc.CustomBlockResults, error) {
if len(blockResults.FinalizeBlockEvents) != 0 {
beginBlockEvents := []abci.Event{}
endBlockEvents := []abci.Event{}

for _, event := range blockResults.FinalizeBlockEvents {
eventAttrs := []abci.EventAttribute{}
isBeginBlock := false
isEndBlock := false
for _, attr := range event.Attributes {
if attr.Key == "mode" {
if attr.Value == "BeginBlock" {
isBeginBlock = true
} else if attr.Value == "EndBlock" {
isEndBlock = true
}
} else {
eventAttrs = append(eventAttrs, attr)
}
}

switch {
case isBeginBlock && isEndBlock:
return nil, fmt.Errorf("finalize block event has both BeginBlock and EndBlock mode")
case !isBeginBlock && !isEndBlock:
return nil, fmt.Errorf("finalize block event has neither BeginBlock nor EndBlock mode")
case isBeginBlock:
beginBlockEvents = append(beginBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs})
case isEndBlock:
endBlockEvents = append(endBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs})
}
}

blockResults.BeginBlockEvents = append(blockResults.BeginBlockEvents, beginBlockEvents...)
blockResults.EndBlockEvents = append(blockResults.EndBlockEvents, endBlockEvents...)
}

return blockResults, nil
}
3 changes: 2 additions & 1 deletion core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/filter"
"github.com/DefiantLabs/cosmos-indexer/parsers"
"github.com/DefiantLabs/cosmos-indexer/rpc"
"github.com/DefiantLabs/cosmos-indexer/util"
"github.com/DefiantLabs/probe/client"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
Expand All @@ -31,7 +32,7 @@ func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *rpc.CustomBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) {
config.Log.Fatalf("blockResults & resultBlockRes: different length")
}
Expand Down
4 changes: 1 addition & 3 deletions indexer/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ func (indexer *Indexer) RegisterCustomModuleBasics(basics []module.AppModuleBasi
}

func (indexer *Indexer) RegisterCustomMsgTypesByTypeURLs(customMessageTypeURLSToTypes map[string]sdkTypes.Msg) error {

if indexer.CustomMsgTypeRegistry == nil {
indexer.CustomMsgTypeRegistry = make(map[string]sdkTypes.Msg)
}

for url, msg := range customMessageTypeURLSToTypes {
if _, ok := indexer.CustomMsgTypeRegistry[url]; ok {
return fmt.Errorf("found duplicate message type with URL \"%s\", message types must be uniquely identified", url)
} else {
indexer.CustomMsgTypeRegistry[url] = msg
}
indexer.CustomMsgTypeRegistry[url] = msg
}

return nil
Expand Down
22 changes: 17 additions & 5 deletions rpc/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"time"

"github.com/DefiantLabs/cosmos-indexer/config"
abci "github.com/cometbft/cometbft/abci/types"
tmjson "github.com/cometbft/cometbft/libs/json"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
jsonrpc "github.com/cometbft/cometbft/rpc/jsonrpc/client"
types "github.com/cometbft/cometbft/rpc/jsonrpc/types"
)
Expand Down Expand Up @@ -140,8 +141,19 @@ func validateResponseID(id interface{}) error {
return nil
}

func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
// This type **should** cover SDK v0.4x and v0.50, but updates will need to be monitored
type CustomBlockResults struct {
Height int64 `json:"height"`
TxsResults []*abci.ResponseDeliverTx `json:"txs_results"`
BeginBlockEvents []abci.Event `json:"begin_block_events"`
EndBlockEvents []abci.Event `json:"end_block_events"`
ValidatorUpdates []abci.ValidatorUpdate `json:"validator_updates"`
ConsensusParamUpdates *cmtproto.ConsensusParams `json:"consensus_param_updates"`
FinalizeBlockEvents []abci.Event `json:"finalize_block_events"`
}

func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*CustomBlockResults, error) {
result := new(CustomBlockResults)
params := make(map[string]interface{})
if height != nil {
params["height"] = height
Expand All @@ -155,7 +167,7 @@ func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.
return result, nil
}

func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, error) {
func GetBlockResult(client URIClient, height int64) (*CustomBlockResults, error) {
brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

Expand All @@ -167,7 +179,7 @@ func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults,
return bresults, nil
}

func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*ctypes.ResultBlockResults, error) {
func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*CustomBlockResults, error) {
if retryMaxAttempts == 0 {
return GetBlockResult(client, height)
}
Expand Down

0 comments on commit e61737b

Please sign in to comment.