diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index 8b0312db..adf5dece 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -71,16 +71,16 @@ func main() { sm := stateManager.NewEigenStateManager(l, grm) - if _, err := avsOperators.NewAvsOperators(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := avsOperators.NewAvsOperators(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create AvsOperatorsModel", zap.Error(err)) } - if _, err := operatorShares.NewOperatorSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := operatorShares.NewOperatorSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create OperatorSharesModel", zap.Error(err)) } - if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerDelegationsModel", zap.Error(err)) } - if _, err := stakerShares.NewStakerSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerShares.NewStakerSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerSharesModel", zap.Error(err)) } diff --git a/cmd/sidecar/main.go b/cmd/sidecar/main.go index 29b1ce5f..8ef5af60 100644 --- a/cmd/sidecar/main.go +++ b/cmd/sidecar/main.go @@ -75,22 +75,22 @@ func main() { sm := stateManager.NewEigenStateManager(l, grm) - if _, err := avsOperators.NewAvsOperators(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := avsOperators.NewAvsOperators(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create AvsOperatorsModel", zap.Error(err)) } - if _, err := operatorShares.NewOperatorSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := operatorShares.NewOperatorSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create OperatorSharesModel", zap.Error(err)) } - if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerDelegationsModel", zap.Error(err)) } - if _, err := stakerShares.NewStakerSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerShares.NewStakerSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerSharesModel", zap.Error(err)) } - if _, err := submittedDistributionRoots.NewSubmittedDistributionRootsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := submittedDistributionRoots.NewSubmittedDistributionRootsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create SubmittedDistributionRootsModel", zap.Error(err)) } - if _, err := rewardSubmissions.NewRewardSubmissionsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := rewardSubmissions.NewRewardSubmissionsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create RewardSubmissionsModel", zap.Error(err)) } diff --git a/internal/clients/etherscan/client.go b/internal/clients/etherscan/client.go index a87c9fa5..e9ab0045 100644 --- a/internal/clients/etherscan/client.go +++ b/internal/clients/etherscan/client.go @@ -1,10 +1,11 @@ package etherscan import ( + "crypto/rand" "encoding/json" "fmt" "io" - "math/rand/v2" + "math/big" "net/http" "net/url" "regexp" @@ -138,14 +139,18 @@ func (ec *EtherscanClient) makeRequestWithBackoff(values url.Values) (*Etherscan func (ec *EtherscanClient) selectApiKey() string { maxNumber := len(ec.ApiKeys) - randInt := rand.IntN(maxNumber) + randInt, err := rand.Int(rand.Reader, big.NewInt(int64(maxNumber))) + if err != nil { + ec.logger.Sugar().Errorw("Failed to generate random number", zap.Error(err)) + return ec.ApiKeys[0] + } - if randInt > maxNumber { + if randInt.Uint64() > uint64(maxNumber) { ec.logger.Sugar().Warnw("Random number is greater than the number of api keys", "randInt", randInt, "maxNumber", maxNumber) - randInt = 0 + randInt = big.NewInt(int64(0)) } - return ec.ApiKeys[randInt] + return ec.ApiKeys[randInt.Int64()] } func (ec *EtherscanClient) buildBaseUrlParams(module string, action string) url.Values { diff --git a/internal/eigenState/avsOperators/avsOperators.go b/internal/eigenState/avsOperators/avsOperators.go index 4fa291a3..34a2b4be 100644 --- a/internal/eigenState/avsOperators/avsOperators.go +++ b/internal/eigenState/avsOperators/avsOperators.go @@ -14,9 +14,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/eigenState/types" "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -47,33 +44,26 @@ type RegisteredAvsOperatorDiff struct { Registered bool } -// SlotId represents a unique identifier for a slot. -type SlotId string - -func NewSlotId(avs string, operator string) SlotId { - return SlotId(fmt.Sprintf("%s_%s", avs, operator)) +func NewSlotID(avs string, operator string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", avs, operator)) } // EigenState model for AVS operators that implements IEigenStateModel. type AvsOperatorsModel struct { base.BaseEigenState StateTransitions types.StateTransitions[AccumulatedStateChange] - Db *gorm.DB - Network config.Network - Environment config.Environment + DB *gorm.DB logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*AccumulatedStateChange + stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange } -// Create new instance of AvsOperatorsModel state model. +// NewAvsOperators creates a new AvsOperatorsModel. func NewAvsOperators( esm *stateManager.EigenStateManager, grm *gorm.DB, - Network config.Network, - Environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*AvsOperatorsModel, error) { @@ -81,13 +71,11 @@ func NewAvsOperators( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*AccumulatedStateChange), + stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), } esm.RegisterState(s, 0) return s, nil @@ -132,21 +120,21 @@ func (a *AvsOperatorsModel) GetStateTransitions() (types.StateTransitions[Accumu registered = uint64(val.(float64)) == 1 } - slotId := NewSlotId(avs, operator) - record, ok := a.stateAccumulator[log.BlockNumber][slotId] + slotID := NewSlotID(avs, operator) + record, ok := a.stateAccumulator[log.BlockNumber][slotID] if !ok { record = &AccumulatedStateChange{ Avs: avs, Operator: operator, BlockNumber: log.BlockNumber, } - a.stateAccumulator[log.BlockNumber][slotId] = record + a.stateAccumulator[log.BlockNumber][slotID] = record } if registered == false && ok { // In this situation, we've encountered a register and unregister in the same block // which functionally results in no state change at all so we want to remove the record // from the accumulated state. - delete(a.stateAccumulator[log.BlockNumber], slotId) + delete(a.stateAccumulator[log.BlockNumber], slotID) return nil, nil } record.Registered = registered @@ -184,7 +172,7 @@ func (a *AvsOperatorsModel) IsInterestingLog(log *storage.TransactionLog) bool { } func (a *AvsOperatorsModel) InitBlockProcessing(blockNumber uint64) error { - a.stateAccumulator[blockNumber] = make(map[SlotId]*AccumulatedStateChange) + a.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) return nil } @@ -222,7 +210,7 @@ func (a *AvsOperatorsModel) clonePreviousBlocksToNewBlock(blockNumber uint64) er from registered_avs_operators where block_number = @previousBlock ` - res := a.Db.Exec(query, + res := a.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -274,7 +262,7 @@ func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error { } for _, record := range recordsToDelete { - res := a.Db.Delete(&RegisteredAvsOperators{}, "avs = ? and operator = ? and block_number = ?", record.Avs, record.Operator, record.BlockNumber) + res := a.DB.Delete(&RegisteredAvsOperators{}, "avs = ? and operator = ? and block_number = ?", record.Avs, record.Operator, record.BlockNumber) if res.Error != nil { a.logger.Sugar().Errorw("Failed to delete record", zap.Error(res.Error), @@ -286,7 +274,7 @@ func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error { } } if len(recordsToInsert) > 0 { - res := a.Db.Model(&RegisteredAvsOperators{}).Clauses(clause.Returning{}).Create(&recordsToInsert) + res := a.DB.Model(&RegisteredAvsOperators{}).Clauses(clause.Returning{}).Create(&recordsToInsert) if res.Error != nil { a.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error)) return res.Error @@ -307,9 +295,9 @@ func (a *AvsOperatorsModel) GenerateStateRoot(blockNumber uint64) (types.StateRo return "", err } - combinedResults := make([]RegisteredAvsOperatorDiff, 0) + combinedResults := make([]*RegisteredAvsOperatorDiff, 0) for _, record := range inserts { - combinedResults = append(combinedResults, RegisteredAvsOperatorDiff{ + combinedResults = append(combinedResults, &RegisteredAvsOperatorDiff{ Avs: record.Avs, Operator: record.Operator, BlockNumber: record.BlockNumber, @@ -317,7 +305,7 @@ func (a *AvsOperatorsModel) GenerateStateRoot(blockNumber uint64) (types.StateRo }) } for _, record := range deletes { - combinedResults = append(combinedResults, RegisteredAvsOperatorDiff{ + combinedResults = append(combinedResults, &RegisteredAvsOperatorDiff{ Avs: record.Avs, Operator: record.Operator, BlockNumber: record.BlockNumber, @@ -325,73 +313,29 @@ func (a *AvsOperatorsModel) GenerateStateRoot(blockNumber uint64) (types.StateRo }) } - fullTree, err := a.merkelizeState(blockNumber, combinedResults) + inputs := a.sortValuesForMerkleTree(combinedResults) + + fullTree, err := a.MerkleizeState(blockNumber, inputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (a *AvsOperatorsModel) merkelizeState(blockNumber uint64, avsOperators []RegisteredAvsOperatorDiff) (*merkletree.MerkleTree, error) { - // Avs -> operator:registered - om := orderedmap.New[string, *orderedmap.OrderedMap[string, bool]]() - - for _, result := range avsOperators { - existingAvs, found := om.Get(result.Avs) - if !found { - existingAvs = orderedmap.New[string, bool]() - om.Set(result.Avs, existingAvs) - - prev := om.GetPair(result.Avs).Prev() - if prev != nil && strings.Compare(prev.Key, result.Avs) >= 0 { - om.Delete(result.Avs) - return nil, fmt.Errorf("avs not in order") - } - } - existingAvs.Set(result.Operator, result.Registered) - - prev := existingAvs.GetPair(result.Operator).Prev() - if prev != nil && strings.Compare(prev.Key, result.Operator) >= 0 { - existingAvs.Delete(result.Operator) - return nil, fmt.Errorf("operator not in order") - } - } - - avsLeaves := a.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - - for avs := om.Oldest(); avs != nil; avs = avs.Next() { - operatorLeafs := make([][]byte, 0) - for operator := avs.Value.Oldest(); operator != nil; operator = operator.Next() { - operatorAddr := operator.Key - registered := operator.Value - operatorLeafs = append(operatorLeafs, encodeOperatorLeaf(operatorAddr, registered)) - } - - avsTree, err := merkletree.NewTree( - merkletree.WithData(operatorLeafs), - merkletree.WithHashType(keccak256.New()), - ) - if err != nil { - return nil, err - } - - avsLeaves = append(avsLeaves, encodeAvsLeaf(avs.Key, avsTree.Root())) +func (a *AvsOperatorsModel) sortValuesForMerkleTree(diffs []*RegisteredAvsOperatorDiff) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) + for _, diff := range diffs { + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: NewSlotID(diff.Avs, diff.Operator), + Value: []byte(fmt.Sprintf("%t", diff.Registered)), + }) } - - return merkletree.NewTree( - merkletree.WithData(avsLeaves), - merkletree.WithHashType(keccak256.New()), - ) + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + return inputs } func (a *AvsOperatorsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return a.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, a.Db) -} - -func encodeOperatorLeaf(operator string, registered bool) []byte { - return []byte(fmt.Sprintf("%s:%t", operator, registered)) -} - -func encodeAvsLeaf(avs string, avsOperatorRoot []byte) []byte { - return append([]byte(avs), avsOperatorRoot...) + return a.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, a.DB) } diff --git a/internal/eigenState/avsOperators/avsOperators_test.go b/internal/eigenState/avsOperators/avsOperators_test.go index 686b15e8..87d09eff 100644 --- a/internal/eigenState/avsOperators/avsOperators_test.go +++ b/internal/eigenState/avsOperators/avsOperators_test.go @@ -37,8 +37,8 @@ func setup() ( } func teardown(model *AvsOperatorsModel) { - model.Db.Exec("delete from avs_operator_changes") - model.Db.Exec("delete from registered_avs_operators") + model.DB.Exec("delete from avs_operator_changes") + model.DB.Exec("delete from registered_avs_operators") } func Test_AvsOperatorState(t *testing.T) { @@ -50,7 +50,7 @@ func Test_AvsOperatorState(t *testing.T) { t.Run("Should create a new AvsOperatorState", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - avsOperatorState, err := NewAvsOperators(esm, grm, cfg.Network, cfg.Environment, l, cfg) + avsOperatorState, err := NewAvsOperators(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, avsOperatorState) }) @@ -71,7 +71,7 @@ func Test_AvsOperatorState(t *testing.T) { DeletedAt: time.Time{}, } - avsOperatorState, err := NewAvsOperators(esm, grm, cfg.Network, cfg.Environment, l, cfg) + avsOperatorState, err := NewAvsOperators(esm, grm, l, cfg) assert.Equal(t, true, avsOperatorState.IsInterestingLog(&log)) @@ -102,7 +102,7 @@ func Test_AvsOperatorState(t *testing.T) { DeletedAt: time.Time{}, } - avsOperatorState, err := NewAvsOperators(esm, grm, cfg.Network, cfg.Environment, l, cfg) + avsOperatorState, err := NewAvsOperators(esm, grm, l, cfg) assert.Nil(t, err) assert.Equal(t, true, avsOperatorState.IsInterestingLog(&log)) @@ -118,7 +118,7 @@ func Test_AvsOperatorState(t *testing.T) { assert.Nil(t, err) states := []RegisteredAvsOperators{} - statesRes := avsOperatorState.Db. + statesRes := avsOperatorState.DB. Model(&RegisteredAvsOperators{}). Raw("select * from registered_avs_operators where block_number = @blockNumber", sql.Named("blockNumber", blockNumber)). Scan(&states) @@ -170,7 +170,7 @@ func Test_AvsOperatorState(t *testing.T) { }, } - avsOperatorState, err := NewAvsOperators(esm, grm, cfg.Network, cfg.Environment, l, cfg) + avsOperatorState, err := NewAvsOperators(esm, grm, l, cfg) assert.Nil(t, err) for _, log := range logs { @@ -187,7 +187,7 @@ func Test_AvsOperatorState(t *testing.T) { assert.Nil(t, err) states := []RegisteredAvsOperators{} - statesRes := avsOperatorState.Db. + statesRes := avsOperatorState.DB. Model(&RegisteredAvsOperators{}). Raw("select * from registered_avs_operators where block_number = @blockNumber", sql.Named("blockNumber", log.BlockNumber)). Scan(&states) diff --git a/internal/eigenState/base/baseEigenState.go b/internal/eigenState/base/baseEigenState.go index b05f3892..f672d1aa 100644 --- a/internal/eigenState/base/baseEigenState.go +++ b/internal/eigenState/base/baseEigenState.go @@ -3,7 +3,12 @@ package base import ( "database/sql" "encoding/json" + "errors" "fmt" + "github.com/Layr-Labs/go-sidecar/internal/eigenState/types" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" + orderedmap "github.com/wk8/go-ordered-map/v2" "slices" "strings" @@ -71,7 +76,7 @@ func (b *BaseEigenState) DeleteState(tableName string, startBlockNumber uint64, zap.Uint64("startBlockNumber", startBlockNumber), zap.Uint64("endBlockNumber", endBlockNumber), ) - return fmt.Errorf("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber") + return errors.New("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber") } // tokenizing the table name apparently doesnt work, so we need to use Sprintf to include it. @@ -92,3 +97,43 @@ func (b *BaseEigenState) DeleteState(tableName string, startBlockNumber uint64, } return nil } + +type MerkleTreeInput struct { + SlotID types.SlotID + Value []byte +} + +// MerkleizeState creates a merkle tree from the given inputs. +// +// Each input includes a SlotID and a byte representation of the state that changed +func (b *BaseEigenState) MerkleizeState(blockNumber uint64, inputs []*MerkleTreeInput) (*merkletree.MerkleTree, error) { + om := orderedmap.New[types.SlotID, []byte]() + + for _, input := range inputs { + _, found := om.Get(input.SlotID) + if !found { + om.Set(input.SlotID, input.Value) + + prev := om.GetPair(input.SlotID).Prev() + if prev != nil && prev.Key > input.SlotID { + om.Delete(input.SlotID) + return nil, errors.New("slotIDs are not in order") + } + } else { + return nil, errors.New(fmt.Sprintf("duplicate slotID %s", input.SlotID)) + } + } + + leaves := b.InitializeMerkleTreeBaseStateWithBlock(blockNumber) + for rootIndex := om.Oldest(); rootIndex != nil; rootIndex = rootIndex.Next() { + leaves = append(leaves, encodeMerkleLeaf(rootIndex.Key, rootIndex.Value)) + } + return merkletree.NewTree( + merkletree.WithData(leaves), + merkletree.WithHashType(keccak256.New()), + ) +} + +func encodeMerkleLeaf(slotID types.SlotID, value []byte) []byte { + return append([]byte(slotID), value...) +} diff --git a/internal/eigenState/eigenstate_test.go b/internal/eigenState/eigenstate_test.go index ebf05d7f..79ce5820 100644 --- a/internal/eigenState/eigenstate_test.go +++ b/internal/eigenState/eigenstate_test.go @@ -53,11 +53,11 @@ func Test_EigenStateManager(t *testing.T) { }) t.Run("Should create a state root with states from models", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - avsOperatorsModel, err := avsOperators.NewAvsOperators(esm, grm, cfg.Network, cfg.Environment, l, cfg) + avsOperatorsModel, err := avsOperators.NewAvsOperators(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, avsOperatorsModel) - operatorSharesModel, err := operatorShares.NewOperatorSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + operatorSharesModel, err := operatorShares.NewOperatorSharesModel(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, operatorSharesModel) diff --git a/internal/eigenState/operatorShares/operatorShares.go b/internal/eigenState/operatorShares/operatorShares.go index c68ea6be..47455e39 100644 --- a/internal/eigenState/operatorShares/operatorShares.go +++ b/internal/eigenState/operatorShares/operatorShares.go @@ -17,9 +17,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/types/numbers" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -51,32 +48,25 @@ type OperatorSharesDiff struct { IsNew bool } -// SlotId is a unique identifier for an operator's shares in a strategy. -type SlotId string - -func NewSlotId(operator string, strategy string) SlotId { - return SlotId(fmt.Sprintf("%s_%s", operator, strategy)) +func NewSlotID(operator string, strategy string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", operator, strategy)) } // Implements IEigenStateModel. type OperatorSharesModel struct { base.BaseEigenState StateTransitions types.StateTransitions[AccumulatedStateChange] - Db *gorm.DB - Network config.Network - Environment config.Environment + DB *gorm.DB logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*AccumulatedStateChange + stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange } func NewOperatorSharesModel( esm *stateManager.EigenStateManager, grm *gorm.DB, - Network config.Network, - Environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*OperatorSharesModel, error) { @@ -84,12 +74,10 @@ func NewOperatorSharesModel( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*AccumulatedStateChange), + stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), } esm.RegisterState(model, 1) @@ -154,8 +142,8 @@ func (osm *OperatorSharesModel) GetStateTransitions() (types.StateTransitions[Ac shares = shares.Mul(shares, big.NewInt(-1)) } - slotId := NewSlotId(operator, outputData.Strategy) - record, ok := osm.stateAccumulator[log.BlockNumber][slotId] + slotID := NewSlotID(operator, outputData.Strategy) + record, ok := osm.stateAccumulator[log.BlockNumber][slotID] if !ok { record = &AccumulatedStateChange{ Operator: operator, @@ -163,7 +151,7 @@ func (osm *OperatorSharesModel) GetStateTransitions() (types.StateTransitions[Ac Shares: shares, BlockNumber: log.BlockNumber, } - osm.stateAccumulator[log.BlockNumber][slotId] = record + osm.stateAccumulator[log.BlockNumber][slotID] = record } else { record.Shares = record.Shares.Add(record.Shares, shares) } @@ -200,7 +188,7 @@ func (osm *OperatorSharesModel) IsInterestingLog(log *storage.TransactionLog) bo } func (osm *OperatorSharesModel) InitBlockProcessing(blockNumber uint64) error { - osm.stateAccumulator[blockNumber] = make(map[SlotId]*AccumulatedStateChange) + osm.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) return nil } @@ -221,7 +209,7 @@ func (osm *OperatorSharesModel) HandleStateChange(log *storage.TransactionLog) ( return change, nil } } - return nil, nil + return nil, nil //nolint:nilnil } func (osm *OperatorSharesModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error { @@ -235,7 +223,7 @@ func (osm *OperatorSharesModel) clonePreviousBlocksToNewBlock(blockNumber uint64 from operator_shares where block_number = @previousBlock ` - res := osm.Db.Exec(query, + res := osm.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -258,9 +246,9 @@ func (osm *OperatorSharesModel) prepareState(blockNumber uint64) ([]OperatorShar return nil, err } - slotIds := make([]SlotId, 0) - for slotId := range accumulatedState { - slotIds = append(slotIds, slotId) + slotIds := make([]types.SlotID, 0) + for slotID := range accumulatedState { + slotIds = append(slotIds, slotID) } // Find only the records from the previous block, that are modified in this block @@ -275,7 +263,7 @@ func (osm *OperatorSharesModel) prepareState(blockNumber uint64) ([]OperatorShar and concat(operator, '_', strategy) in @slotIds ` existingRecords := make([]OperatorShares, 0) - res := osm.Db.Model(&OperatorShares{}). + res := osm.DB.Model(&OperatorShares{}). Raw(query, sql.Named("previousBlock", blockNumber-1), sql.Named("slotIds", slotIds), @@ -288,16 +276,16 @@ func (osm *OperatorSharesModel) prepareState(blockNumber uint64) ([]OperatorShar } // Map the existing records to a map for easier lookup - mappedRecords := make(map[SlotId]OperatorShares) + mappedRecords := make(map[types.SlotID]OperatorShares) for _, record := range existingRecords { fmt.Printf("Existing OperatorShares %+v\n", record) - slotId := NewSlotId(record.Operator, record.Strategy) - mappedRecords[slotId] = record + slotID := NewSlotID(record.Operator, record.Strategy) + mappedRecords[slotID] = record } // Loop over our new state changes. // If the record exists in the previous block, add the shares to the existing shares - for slotId, newState := range accumulatedState { + for slotID, newState := range accumulatedState { prepared := OperatorSharesDiff{ Operator: newState.Operator, Strategy: newState.Strategy, @@ -306,7 +294,7 @@ func (osm *OperatorSharesModel) prepareState(blockNumber uint64) ([]OperatorShar IsNew: false, } - if existingRecord, ok := mappedRecords[slotId]; ok { + if existingRecord, ok := mappedRecords[slotID]; ok { existingShares, success := numbers.NewBig257().SetString(existingRecord.Shares, 10) if !success { osm.logger.Sugar().Errorw("Failed to convert existing shares to big.Int", @@ -359,7 +347,7 @@ func (osm *OperatorSharesModel) CommitFinalState(blockNumber uint64) error { // Batch insert new records if len(newRecords) > 0 { - res := osm.Db.Model(&OperatorShares{}).Clauses(clause.Returning{}).Create(&newRecords) + res := osm.DB.Model(&OperatorShares{}).Clauses(clause.Returning{}).Create(&newRecords) if res.Error != nil { osm.logger.Sugar().Errorw("Failed to create new operator_shares records", zap.Error(res.Error)) return res.Error @@ -368,7 +356,7 @@ func (osm *OperatorSharesModel) CommitFinalState(blockNumber uint64) error { // Update existing records that were cloned from the previous block if len(updateRecords) > 0 { for _, record := range updateRecords { - res := osm.Db.Model(&OperatorShares{}). + res := osm.DB.Model(&OperatorShares{}). Where("operator = ? and strategy = ? and block_number = ?", record.Operator, record.Strategy, record.BlockNumber). Updates(map[string]interface{}{ "shares": record.Shares, @@ -394,75 +382,29 @@ func (osm *OperatorSharesModel) GenerateStateRoot(blockNumber uint64) (types.Sta return "", err } - fullTree, err := osm.merkelizeState(blockNumber, diffs) + inputs := osm.sortValuesForMerkleTree(diffs) + + fullTree, err := osm.MerkleizeState(blockNumber, inputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (osm *OperatorSharesModel) merkelizeState(blockNumber uint64, diffs []OperatorSharesDiff) (*merkletree.MerkleTree, error) { - // Create a merkle tree with the structure: - // strategy: map[operators]: shares - om := orderedmap.New[string, *orderedmap.OrderedMap[string, string]]() - +func (osm *OperatorSharesModel) sortValuesForMerkleTree(diffs []OperatorSharesDiff) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) for _, diff := range diffs { - existingStrategy, found := om.Get(diff.Strategy) - if !found { - existingStrategy = orderedmap.New[string, string]() - om.Set(diff.Strategy, existingStrategy) - - prev := om.GetPair(diff.Strategy).Prev() - if prev != nil && strings.Compare(prev.Key, diff.Strategy) >= 0 { - om.Delete(diff.Strategy) - return nil, fmt.Errorf("strategy not in order") - } - } - existingStrategy.Set(diff.Operator, diff.Shares.String()) - - prev := existingStrategy.GetPair(diff.Operator).Prev() - if prev != nil && strings.Compare(prev.Key, diff.Operator) >= 0 { - existingStrategy.Delete(diff.Operator) - return nil, fmt.Errorf("operator not in order") - } + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: NewSlotID(diff.Operator, diff.Strategy), + Value: diff.Shares.Bytes(), + }) } - - leaves := osm.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - for strat := om.Oldest(); strat != nil; strat = strat.Next() { - operatorLeaves := make([][]byte, 0) - for operator := strat.Value.Oldest(); operator != nil; operator = operator.Next() { - operatorAddr := operator.Key - shares := operator.Value - operatorLeaves = append(operatorLeaves, encodeOperatorSharesLeaf(operatorAddr, shares)) - } - - stratTree, err := merkletree.NewTree( - merkletree.WithData(operatorLeaves), - merkletree.WithHashType(keccak256.New()), - ) - if err != nil { - return nil, err - } - leaves = append(leaves, encodeStratTree(strat.Key, stratTree.Root())) - } - return merkletree.NewTree( - merkletree.WithData(leaves), - merkletree.WithHashType(keccak256.New()), - ) -} - -func encodeOperatorSharesLeaf(operator string, shares string) []byte { - operatorBytes := []byte(operator) - sharesBytes := []byte(shares) - - return append(operatorBytes, sharesBytes...) -} - -func encodeStratTree(strategy string, operatorTreeRoot []byte) []byte { - strategyBytes := []byte(strategy) - return append(strategyBytes, operatorTreeRoot...) + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + return inputs } func (osm *OperatorSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return osm.BaseEigenState.DeleteState("operator_shares", startBlockNumber, endBlockNumber, osm.Db) + return osm.BaseEigenState.DeleteState("operator_shares", startBlockNumber, endBlockNumber, osm.DB) } diff --git a/internal/eigenState/operatorShares/operatorShares_test.go b/internal/eigenState/operatorShares/operatorShares_test.go index 3a559a73..fa7cc080 100644 --- a/internal/eigenState/operatorShares/operatorShares_test.go +++ b/internal/eigenState/operatorShares/operatorShares_test.go @@ -39,8 +39,8 @@ func setup() ( } func teardown(model *OperatorSharesModel) { - model.Db.Exec("delete from operator_share_changes") - model.Db.Exec("delete from operator_shares") + model.DB.Exec("delete from operator_share_changes") + model.DB.Exec("delete from operator_shares") } func Test_OperatorSharesState(t *testing.T) { @@ -52,7 +52,7 @@ func Test_OperatorSharesState(t *testing.T) { t.Run("Should create a new OperatorSharesState", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewOperatorSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewOperatorSharesModel(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, model) }) @@ -73,7 +73,7 @@ func Test_OperatorSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewOperatorSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewOperatorSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -101,7 +101,7 @@ func Test_OperatorSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewOperatorSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewOperatorSharesModel(esm, grm, l, cfg) assert.Nil(t, err) err = model.InitBlockProcessing(blockNumber) @@ -115,7 +115,7 @@ func Test_OperatorSharesState(t *testing.T) { assert.Nil(t, err) states := []OperatorShares{} - statesRes := model.Db. + statesRes := model.DB. Model(&OperatorShares{}). Raw("select * from operator_shares where block_number = @blockNumber", sql.Named("blockNumber", blockNumber)). Scan(&states) @@ -150,7 +150,7 @@ func Test_OperatorSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewOperatorSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewOperatorSharesModel(esm, grm, l, cfg) assert.Nil(t, err) err = model.InitBlockProcessing(blockNumber) diff --git a/internal/eigenState/rewardSubmissions/rewardSubmissions.go b/internal/eigenState/rewardSubmissions/rewardSubmissions.go index 119d4408..dbf72505 100644 --- a/internal/eigenState/rewardSubmissions/rewardSubmissions.go +++ b/internal/eigenState/rewardSubmissions/rewardSubmissions.go @@ -16,9 +16,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/types/numbers" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -50,30 +47,26 @@ type RewardSubmissions struct { Submissions []*RewardSubmission } -type SlotId string - -func NewSlotId(rewardHash string, strategy string) SlotId { - return SlotId(fmt.Sprintf("%s_%s", rewardHash, strategy)) +func NewSlotID(rewardHash string, strategy string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", rewardHash, strategy)) } type RewardSubmissionsModel struct { base.BaseEigenState StateTransitions types.StateTransitions[RewardSubmission] - Db *gorm.DB + DB *gorm.DB Network config.Network Environment config.Environment logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*RewardSubmission + stateAccumulator map[uint64]map[types.SlotID]*RewardSubmission } func NewRewardSubmissionsModel( esm *stateManager.EigenStateManager, grm *gorm.DB, - Network config.Network, - Environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*RewardSubmissionsModel, error) { @@ -81,12 +74,10 @@ func NewRewardSubmissionsModel( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*RewardSubmission), + stateAccumulator: make(map[uint64]map[types.SlotID]*RewardSubmission), } esm.RegisterState(model, 5) @@ -189,7 +180,7 @@ func (rs *RewardSubmissionsModel) GetStateTransitions() (types.StateTransitions[ } for _, rewardSubmission := range rewardSubmissions.Submissions { - slotId := NewSlotId(rewardSubmission.RewardHash, rewardSubmission.Strategy) + slotId := NewSlotID(rewardSubmission.RewardHash, rewardSubmission.Strategy) _, ok := rs.stateAccumulator[log.BlockNumber][slotId] if ok { @@ -235,7 +226,7 @@ func (rs *RewardSubmissionsModel) IsInterestingLog(log *storage.TransactionLog) } func (rs *RewardSubmissionsModel) InitBlockProcessing(blockNumber uint64) error { - rs.stateAccumulator[blockNumber] = make(map[SlotId]*RewardSubmission) + rs.stateAccumulator[blockNumber] = make(map[types.SlotID]*RewardSubmission) return nil } @@ -278,7 +269,7 @@ func (rs *RewardSubmissionsModel) clonePreviousBlocksToNewBlock(blockNumber uint from reward_submissions where block_number = @previousBlock ` - res := rs.Db.Exec(query, + res := rs.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -300,7 +291,7 @@ func (rs *RewardSubmissionsModel) prepareState(blockNumber uint64) ([]*RewardSub } currentBlock := &storage.Block{} - err := rs.Db.Where("number = ?", blockNumber).First(currentBlock).Error + err := rs.DB.Where("number = ?", blockNumber).First(currentBlock).Error if err != nil { rs.logger.Sugar().Errorw("Failed to fetch block", zap.Error(err), zap.Uint64("blockNumber", blockNumber)) return nil, nil, err @@ -328,7 +319,7 @@ func (rs *RewardSubmissionsModel) prepareState(blockNumber uint64) ([]*RewardSub block_number = @previousBlock and end_timestamp <= @blockTime ` - res := rs.Db. + res := rs.DB. Model(&RewardSubmission{}). Raw(query, sql.Named("previousBlock", blockNumber-1), @@ -364,7 +355,7 @@ func (rs *RewardSubmissionsModel) CommitFinalState(blockNumber uint64) error { } for _, record := range recordsToDelete { - res := rs.Db.Delete(&RewardSubmission{}, "reward_hash = ? and strategy = ? and block_number = ?", record.RewardSubmission.RewardHash, record.RewardSubmission.Strategy, blockNumber) + res := rs.DB.Delete(&RewardSubmission{}, "reward_hash = ? and strategy = ? and block_number = ?", record.RewardSubmission.RewardHash, record.RewardSubmission.Strategy, blockNumber) if res.Error != nil { rs.logger.Sugar().Errorw("Failed to delete record", zap.Error(res.Error), @@ -378,7 +369,7 @@ func (rs *RewardSubmissionsModel) CommitFinalState(blockNumber uint64) error { if len(recordsToInsert) > 0 { // records := make([]RewardSubmission, 0) for _, record := range recordsToInsert { - res := rs.Db.Model(&RewardSubmission{}).Clauses(clause.Returning{}).Create(&record.RewardSubmission) + res := rs.DB.Model(&RewardSubmission{}).Clauses(clause.Returning{}).Create(&record.RewardSubmission) if res.Error != nil { rs.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error)) fmt.Printf("\n\n%+v\n\n", record.RewardSubmission) @@ -409,118 +400,36 @@ func (rs *RewardSubmissionsModel) GenerateStateRoot(blockNumber uint64) (types.S combinedResults = append(combinedResults, record) } - fullTree, err := rs.merkelizeState(blockNumber, combinedResults) + inputs := rs.sortValuesForMerkleTree(combinedResults) + + fullTree, err := rs.MerkleizeState(blockNumber, inputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (rs *RewardSubmissionsModel) sortRewardSubmissionsForMerkelization(submissions []*RewardSubmissionDiff) []*RewardSubmissionDiff { - mappedByAvs := make(map[string][]*RewardSubmissionDiff) +func (rs *RewardSubmissionsModel) sortValuesForMerkleTree(submissions []*RewardSubmissionDiff) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) for _, submission := range submissions { - if _, ok := mappedByAvs[submission.RewardSubmission.Avs]; !ok { - mappedByAvs[submission.RewardSubmission.Avs] = make([]*RewardSubmissionDiff, 0) + slotID := NewSlotID(submission.RewardSubmission.RewardHash, submission.RewardSubmission.Strategy) + value := "added" + if submission.IsNoLongerActive { + value = "removed" } - mappedByAvs[submission.RewardSubmission.Avs] = append(mappedByAvs[submission.RewardSubmission.Avs], submission) - } - - for _, sub := range mappedByAvs { - slices.SortFunc(sub, func(i, j *RewardSubmissionDiff) int { - iSlotId := NewSlotId(i.RewardSubmission.RewardHash, i.RewardSubmission.Strategy) - jSlotId := NewSlotId(j.RewardSubmission.RewardHash, j.RewardSubmission.Strategy) - - return strings.Compare(string(iSlotId), string(jSlotId)) + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: slotID, + Value: []byte(value), }) } - avsAddresses := make([]string, 0) - for key := range mappedByAvs { - avsAddresses = append(avsAddresses, key) - } - - sort.Slice(avsAddresses, func(i, j int) bool { - return avsAddresses[i] < avsAddresses[j] + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) }) - sorted := make([]*RewardSubmissionDiff, 0) - for _, avs := range avsAddresses { - sorted = append(sorted, mappedByAvs[avs]...) - } - return sorted -} - -func (rs *RewardSubmissionsModel) merkelizeState(blockNumber uint64, rewardSubmissions []*RewardSubmissionDiff) (*merkletree.MerkleTree, error) { - // Avs -> slot_id -> string (added/removed) - om := orderedmap.New[string, *orderedmap.OrderedMap[SlotId, string]]() - - rewardSubmissions = rs.sortRewardSubmissionsForMerkelization(rewardSubmissions) - - for _, result := range rewardSubmissions { - existingAvs, found := om.Get(result.RewardSubmission.Avs) - if !found { - existingAvs = orderedmap.New[SlotId, string]() - om.Set(result.RewardSubmission.Avs, existingAvs) - - prev := om.GetPair(result.RewardSubmission.Avs).Prev() - if prev != nil && strings.Compare(prev.Key, result.RewardSubmission.Avs) >= 0 { - om.Delete(result.RewardSubmission.Avs) - return nil, fmt.Errorf("avs not in order") - } - } - slotId := NewSlotId(result.RewardSubmission.RewardHash, result.RewardSubmission.Strategy) - var state string - if result.IsNew { - state = "added" - } else if result.IsNoLongerActive { - state = "removed" - } else { - return nil, fmt.Errorf("invalid state change") - } - existingAvs.Set(slotId, state) - - prev := existingAvs.GetPair(slotId).Prev() - if prev != nil && strings.Compare(string(prev.Key), string(slotId)) >= 0 { - existingAvs.Delete(slotId) - return nil, fmt.Errorf("operator not in order") - } - } - - avsLeaves := rs.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - - for avs := om.Oldest(); avs != nil; avs = avs.Next() { - submissionLeafs := make([][]byte, 0) - for submission := avs.Value.Oldest(); submission != nil; submission = submission.Next() { - slotId := submission.Key - state := submission.Value - submissionLeafs = append(submissionLeafs, encodeSubmissionLeaf(slotId, state)) - } - - avsTree, err := merkletree.NewTree( - merkletree.WithData(submissionLeafs), - merkletree.WithHashType(keccak256.New()), - ) - if err != nil { - return nil, err - } - - avsLeaves = append(avsLeaves, encodeAvsLeaf(avs.Key, avsTree.Root())) - } - - return merkletree.NewTree( - merkletree.WithData(avsLeaves), - merkletree.WithHashType(keccak256.New()), - ) -} - -func encodeSubmissionLeaf(slotId SlotId, state string) []byte { - return []byte(fmt.Sprintf("%s:%s", slotId, state)) -} - -func encodeAvsLeaf(avs string, avsSubmissionRoot []byte) []byte { - return append([]byte(avs), avsSubmissionRoot...) + return inputs } func (rs *RewardSubmissionsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return rs.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, rs.Db) + return rs.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, rs.DB) } diff --git a/internal/eigenState/rewardSubmissions/rewardSubmissions_test.go b/internal/eigenState/rewardSubmissions/rewardSubmissions_test.go index 11e56db6..f119df42 100644 --- a/internal/eigenState/rewardSubmissions/rewardSubmissions_test.go +++ b/internal/eigenState/rewardSubmissions/rewardSubmissions_test.go @@ -44,7 +44,7 @@ func teardown(model *RewardSubmissionsModel) { `delete from blocks`, } for _, query := range queries { - res := model.Db.Exec(query) + res := model.DB.Exec(query) if res.Error != nil { fmt.Printf("Failed to run query: %v\n", res.Error) } @@ -57,7 +57,7 @@ func createBlock(model *RewardSubmissionsModel, blockNumber uint64) error { Hash: "some hash", BlockTime: time.Now().Add(time.Hour * time.Duration(blockNumber)), } - res := model.Db.Model(&storage.Block{}).Create(block) + res := model.DB.Model(&storage.Block{}).Create(block) if res.Error != nil { return res.Error } @@ -74,7 +74,7 @@ func Test_RewardSubmissions(t *testing.T) { t.Run("Test each event type", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewRewardSubmissionsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewRewardSubmissionsModel(esm, grm, l, cfg) submissionCounter := 0 @@ -140,7 +140,7 @@ func Test_RewardSubmissions(t *testing.T) { rewards := make([]*RewardSubmission, 0) query := `select * from reward_submissions where block_number = ?` - res := model.Db.Raw(query, blockNumber).Scan(&rewards) + res := model.DB.Raw(query, blockNumber).Scan(&rewards) assert.Nil(t, res.Error) assert.Equal(t, len(strategiesAndMultipliers), len(rewards)) @@ -215,7 +215,7 @@ func Test_RewardSubmissions(t *testing.T) { rewards := make([]*RewardSubmission, 0) query := `select * from reward_submissions where block_number = ?` - res := model.Db.Raw(query, blockNumber).Scan(&rewards) + res := model.DB.Raw(query, blockNumber).Scan(&rewards) assert.Nil(t, res.Error) assert.Equal(t, len(strategiesAndMultipliers), len(rewards)) @@ -287,7 +287,7 @@ func Test_RewardSubmissions(t *testing.T) { rewards := make([]*RewardSubmission, 0) query := `select * from reward_submissions where block_number = ?` - res := model.Db.Raw(query, blockNumber).Scan(&rewards) + res := model.DB.Raw(query, blockNumber).Scan(&rewards) assert.Nil(t, res.Error) assert.Equal(t, len(strategiesAndMultipliers), len(rewards)) @@ -360,7 +360,7 @@ func Test_RewardSubmissions(t *testing.T) { rewards := make([]*RewardSubmission, 0) query := `select * from reward_submissions where block_number = ?` - res := model.Db.Raw(query, blockNumber).Scan(&rewards) + res := model.DB.Raw(query, blockNumber).Scan(&rewards) assert.Nil(t, res.Error) assert.Equal(t, len(strategiesAndMultipliers), len(rewards)) @@ -384,7 +384,7 @@ func Test_RewardSubmissions(t *testing.T) { t.Run("multi-block test", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewRewardSubmissionsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewRewardSubmissionsModel(esm, grm, l, cfg) submissionCounter := 0 @@ -424,7 +424,7 @@ func Test_RewardSubmissions(t *testing.T) { query := `select count(*) from reward_submissions where block_number = ?` var count int - res := model.Db.Raw(query, blockNumber).Scan(&count) + res := model.DB.Raw(query, blockNumber).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, submissionCounter, count) @@ -476,7 +476,7 @@ func Test_RewardSubmissions(t *testing.T) { assert.True(t, len(stateRoot) > 0) query = `select count(*) from reward_submissions where block_number = ?` - res = model.Db.Raw(query, blockNumber).Scan(&count) + res = model.DB.Raw(query, blockNumber).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, submissionCounter, count) @@ -522,7 +522,7 @@ func Test_RewardSubmissions(t *testing.T) { assert.True(t, len(stateRoot) > 0) query = `select count(*) from reward_submissions where block_number = ?` - res = model.Db.Raw(query, blockNumber).Scan(&count) + res = model.DB.Raw(query, blockNumber).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, submissionCounter, count) @@ -568,7 +568,7 @@ func Test_RewardSubmissions(t *testing.T) { assert.True(t, len(stateRoot) > 0) query = `select count(*) from reward_submissions where block_number = ?` - res = model.Db.Raw(query, blockNumber).Scan(&count) + res = model.DB.Raw(query, blockNumber).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, submissionCounter, count) @@ -581,7 +581,7 @@ func Test_RewardSubmissions(t *testing.T) { t.Run("single block, multiple events", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewRewardSubmissionsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewRewardSubmissionsModel(esm, grm, l, cfg) submissionCounter := 0 @@ -658,7 +658,7 @@ func Test_RewardSubmissions(t *testing.T) { // check that we're starting with 0 rows query := `select count(*) from reward_submissions` var count int - res := model.Db.Raw(query).Scan(&count) + res := model.DB.Raw(query).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, 0, count) @@ -674,7 +674,7 @@ func Test_RewardSubmissions(t *testing.T) { // Verify we have the right number of rows query = `select count(*) from reward_submissions where block_number = ?` - res = model.Db.Raw(query, blockNumber).Scan(&count) + res = model.DB.Raw(query, blockNumber).Scan(&count) assert.Nil(t, res.Error) assert.Equal(t, submissionCounter, count) diff --git a/internal/eigenState/stakerDelegations/stakerDelegations.go b/internal/eigenState/stakerDelegations/stakerDelegations.go index 7afed13f..38476f43 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations.go @@ -14,9 +14,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/eigenState/types" "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -39,24 +36,19 @@ type AccumulatedStateChange struct { Delegated bool } -// SlotId represents a unique identifier for a staker/operator pair. -type SlotId string - -func NewSlotId(staker string, operator string) SlotId { - return SlotId(fmt.Sprintf("%s_%s", staker, operator)) +func NewSlotID(staker string, operator string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", staker, operator)) } type StakerDelegationsModel struct { base.BaseEigenState StateTransitions types.StateTransitions[AccumulatedStateChange] - Db *gorm.DB - Network config.Network - Environment config.Environment + DB *gorm.DB logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*AccumulatedStateChange + stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange } type DelegatedStakersDiff struct { @@ -69,8 +61,6 @@ type DelegatedStakersDiff struct { func NewStakerDelegationsModel( esm *stateManager.EigenStateManager, grm *gorm.DB, - Network config.Network, - Environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*StakerDelegationsModel, error) { @@ -78,12 +68,10 @@ func NewStakerDelegationsModel( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*AccumulatedStateChange), + stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), } esm.RegisterState(model, 2) @@ -111,8 +99,8 @@ func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[A staker := strings.ToLower(arguments[0].Value.(string)) operator := strings.ToLower(arguments[1].Value.(string)) - slotId := NewSlotId(staker, operator) - record, ok := s.stateAccumulator[log.BlockNumber][slotId] + slotID := NewSlotID(staker, operator) + record, ok := s.stateAccumulator[log.BlockNumber][slotID] if !ok { // if the record doesn't exist, create a new one record = &AccumulatedStateChange{ @@ -120,15 +108,15 @@ func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[A Operator: operator, BlockNumber: log.BlockNumber, } - s.stateAccumulator[log.BlockNumber][slotId] = record + s.stateAccumulator[log.BlockNumber][slotID] = record } if log.EventName == "StakerUndelegated" { if ok { // In this situation, we've encountered a delegate and undelegate in the same block // which functionally results in no state change at all so we want to remove the record // from the accumulated state. - delete(s.stateAccumulator[log.BlockNumber], slotId) - return nil, nil + delete(s.stateAccumulator[log.BlockNumber], slotID) + return nil, nil //nolint:nilnil } record.Delegated = false } else if log.EventName == "StakerDelegated" { @@ -166,9 +154,9 @@ func (s *StakerDelegationsModel) IsInterestingLog(log *storage.TransactionLog) b return s.BaseEigenState.IsInterestingLog(addresses, log) } -// StartBlockProcessing Initialize state accumulator for the block. +// InitBlockProcessing initialize state accumulator for the block. func (s *StakerDelegationsModel) InitBlockProcessing(blockNumber uint64) error { - s.stateAccumulator[blockNumber] = make(map[SlotId]*AccumulatedStateChange) + s.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) return nil } @@ -189,7 +177,7 @@ func (s *StakerDelegationsModel) HandleStateChange(log *storage.TransactionLog) return change, nil } } - return nil, nil + return nil, nil //nolint:nilnil } func (s *StakerDelegationsModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error { @@ -202,7 +190,7 @@ func (s *StakerDelegationsModel) clonePreviousBlocksToNewBlock(blockNumber uint6 from delegated_stakers where block_number = @previousBlock ` - res := s.Db.Exec(query, + res := s.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -258,7 +246,7 @@ func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error { // TODO(seanmcgary): should probably wrap the operations of this function in a db transaction for _, record := range recordsToDelete { - res := s.Db.Delete(&DelegatedStakers{}, "staker = ? and operator = ? and block_number = ?", record.Staker, record.Operator, blockNumber) + res := s.DB.Delete(&DelegatedStakers{}, "staker = ? and operator = ? and block_number = ?", record.Staker, record.Operator, blockNumber) if res.Error != nil { s.logger.Sugar().Errorw("Failed to delete staker delegation", zap.Error(res.Error), @@ -270,7 +258,7 @@ func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error { } } if len(recordsToInsert) > 0 { - res := s.Db.Model(&DelegatedStakers{}).Clauses(clause.Returning{}).Create(&recordsToInsert) + res := s.DB.Model(&DelegatedStakers{}).Clauses(clause.Returning{}).Create(&recordsToInsert) if res.Error != nil { s.logger.Sugar().Errorw("Failed to insert staker delegations", zap.Error(res.Error)) return res.Error @@ -312,75 +300,29 @@ func (s *StakerDelegationsModel) GenerateStateRoot(blockNumber uint64) (types.St }) } - fullTree, err := s.merkelizeState(blockNumber, combinedResults) + inputs := s.sortValuesForMerkleTree(combinedResults) + + fullTree, err := s.MerkleizeState(blockNumber, inputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -// merkelizeState generates a merkle tree for the given block number and delegated stakers. -// Changes are stored in the following format: -// Operator -> staker:delegated. -func (s *StakerDelegationsModel) merkelizeState(blockNumber uint64, delegatedStakers []DelegatedStakersDiff) (*merkletree.MerkleTree, error) { - om := orderedmap.New[string, *orderedmap.OrderedMap[string, bool]]() - - for _, result := range delegatedStakers { - existingOperator, found := om.Get(result.Operator) - if !found { - existingOperator = orderedmap.New[string, bool]() - om.Set(result.Operator, existingOperator) - - prev := om.GetPair(result.Operator).Prev() - if prev != nil && strings.Compare(prev.Key, result.Operator) >= 0 { - om.Delete(result.Operator) - return nil, fmt.Errorf("operators not in order") - } - } - existingOperator.Set(result.Staker, result.Delegated) - - prev := existingOperator.GetPair(result.Staker).Prev() - if prev != nil && strings.Compare(prev.Key, result.Staker) >= 0 { - existingOperator.Delete(result.Staker) - return nil, fmt.Errorf("stakers not in order") - } - } - - operatorLeaves := s.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - - for op := om.Oldest(); op != nil; op = op.Next() { - stakerLeafs := make([][]byte, 0) - for staker := op.Value.Oldest(); staker != nil; staker = staker.Next() { - operatorAddr := staker.Key - delegated := staker.Value - stakerLeafs = append(stakerLeafs, encodeStakerLeaf(operatorAddr, delegated)) - } - - avsTree, err := merkletree.NewTree( - merkletree.WithData(stakerLeafs), - merkletree.WithHashType(keccak256.New()), - ) - if err != nil { - return nil, err - } - - operatorLeaves = append(operatorLeaves, encodeOperatorLeaf(op.Key, avsTree.Root())) +func (s *StakerDelegationsModel) sortValuesForMerkleTree(diffs []DelegatedStakersDiff) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) + for _, diff := range diffs { + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: NewSlotID(diff.Staker, diff.Operator), + Value: []byte(fmt.Sprintf("%t", diff.Delegated)), + }) } - - return merkletree.NewTree( - merkletree.WithData(operatorLeaves), - merkletree.WithHashType(keccak256.New()), - ) -} - -func encodeStakerLeaf(staker string, delegated bool) []byte { - return []byte(fmt.Sprintf("%s:%t", staker, delegated)) -} - -func encodeOperatorLeaf(operator string, operatorStakersRoot []byte) []byte { - return append([]byte(operator), operatorStakersRoot...) + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + return inputs } func (s *StakerDelegationsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return s.BaseEigenState.DeleteState("delegated_stakers", startBlockNumber, endBlockNumber, s.Db) + return s.BaseEigenState.DeleteState("delegated_stakers", startBlockNumber, endBlockNumber, s.DB) } diff --git a/internal/eigenState/stakerDelegations/stakerDelegations_test.go b/internal/eigenState/stakerDelegations/stakerDelegations_test.go index 85c58534..f4f6e539 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations_test.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations_test.go @@ -37,8 +37,8 @@ func setup() ( } func teardown(model *StakerDelegationsModel) { - model.Db.Exec("delete from staker_delegation_changes") - model.Db.Exec("delete from delegated_stakers") + model.DB.Exec("delete from staker_delegation_changes") + model.DB.Exec("delete from delegated_stakers") } func Test_DelegatedStakersState(t *testing.T) { @@ -50,7 +50,7 @@ func Test_DelegatedStakersState(t *testing.T) { t.Run("Should create a new StakerDelegationsModel", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewStakerDelegationsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerDelegationsModel(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, model) }) @@ -71,7 +71,7 @@ func Test_DelegatedStakersState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerDelegationsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerDelegationsModel(esm, grm, l, cfg) assert.Equal(t, true, model.IsInterestingLog(&log)) @@ -106,7 +106,7 @@ func Test_DelegatedStakersState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerDelegationsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerDelegationsModel(esm, grm, l, cfg) assert.Nil(t, err) assert.Equal(t, true, model.IsInterestingLog(&log)) @@ -126,7 +126,7 @@ func Test_DelegatedStakersState(t *testing.T) { assert.Nil(t, err) states := []DelegatedStakers{} - statesRes := model.Db. + statesRes := model.DB. Model(&DelegatedStakers{}). Raw("select * from delegated_stakers where block_number = @blockNumber", sql.Named("blockNumber", blockNumber)). Scan(&states) @@ -178,7 +178,7 @@ func Test_DelegatedStakersState(t *testing.T) { }, } - model, err := NewStakerDelegationsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerDelegationsModel(esm, grm, l, cfg) assert.Nil(t, err) for _, log := range logs { @@ -195,7 +195,7 @@ func Test_DelegatedStakersState(t *testing.T) { assert.Nil(t, err) states := []DelegatedStakers{} - statesRes := model.Db. + statesRes := model.DB. Model(&DelegatedStakers{}). Raw("select * from delegated_stakers where block_number = @blockNumber", sql.Named("blockNumber", log.BlockNumber)). Scan(&states) diff --git a/internal/eigenState/stakerShares/stakerShares.go b/internal/eigenState/stakerShares/stakerShares.go index 588895be..f09ae9f8 100644 --- a/internal/eigenState/stakerShares/stakerShares.go +++ b/internal/eigenState/stakerShares/stakerShares.go @@ -18,9 +18,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/types/numbers" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -50,41 +47,33 @@ type StakerSharesDiff struct { IsNew bool } -type SlotId string - -func NewSlotId(staker string, strategy string) SlotId { - return SlotId(fmt.Sprintf("%s_%s", staker, strategy)) +func NewSlotID(staker string, strategy string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", staker, strategy)) } type StakerSharesModel struct { base.BaseEigenState StateTransitions types.StateTransitions[AccumulatedStateChange] - Db *gorm.DB - Network config.Network - Environment config.Environment + DB *gorm.DB logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*AccumulatedStateChange + stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange } func NewStakerSharesModel( esm *stateManager.EigenStateManager, grm *gorm.DB, - network config.Network, - environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*StakerSharesModel, error) { model := &StakerSharesModel{ BaseEigenState: base.BaseEigenState{}, - Db: grm, - Network: network, - Environment: environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*AccumulatedStateChange), + stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), } esm.RegisterState(model, 3) @@ -284,7 +273,7 @@ func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.Tran and staker = (select staker from migration) ` logs := make([]storage.TransactionLog, 0) - res := ss.Db. + res := ss.DB. Raw(query, sql.Named("strategyManagerAddress", ss.globalConfig.GetContractsMapForEnvAndNetwork().StrategyManager), sql.Named("logBlockNumber", log.BlockNumber), @@ -422,7 +411,7 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum if parsedRecord == nil { continue } - slotId := NewSlotId(parsedRecord.Staker, parsedRecord.Strategy) + slotId := NewSlotID(parsedRecord.Staker, parsedRecord.Strategy) record, ok := ss.stateAccumulator[log.BlockNumber][slotId] if !ok { record = parsedRecord @@ -471,7 +460,7 @@ func (ss *StakerSharesModel) IsInterestingLog(log *storage.TransactionLog) bool } func (ss *StakerSharesModel) InitBlockProcessing(blockNumber uint64) error { - ss.stateAccumulator[blockNumber] = make(map[SlotId]*AccumulatedStateChange) + ss.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) return nil } @@ -506,7 +495,7 @@ func (ss *StakerSharesModel) clonePreviousBlocksToNewBlock(blockNumber uint64) e from staker_shares where block_number = @previousBlock ` - res := ss.Db.Exec(query, + res := ss.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -529,7 +518,7 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]StakerSharesDif return nil, err } - slotIds := make([]SlotId, 0) + slotIds := make([]types.SlotID, 0) for slotId := range accumulatedState { slotIds = append(slotIds, slotId) } @@ -546,7 +535,7 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]StakerSharesDif and concat(staker, '_', strategy) in @slotIds ` existingRecords := make([]StakerShares, 0) - res := ss.Db.Model(&StakerShares{}). + res := ss.DB.Model(&StakerShares{}). Raw(query, sql.Named("previousBlock", blockNumber-1), sql.Named("slotIds", slotIds), @@ -559,9 +548,9 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]StakerSharesDif } // Map the existing records to a map for easier lookup - mappedRecords := make(map[SlotId]StakerShares) + mappedRecords := make(map[types.SlotID]StakerShares) for _, record := range existingRecords { - slotId := NewSlotId(record.Staker, record.Strategy) + slotId := NewSlotID(record.Staker, record.Strategy) mappedRecords[slotId] = record } @@ -629,7 +618,7 @@ func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error { // Batch insert new records if len(newRecords) > 0 { - res := ss.Db.Model(&StakerShares{}).Clauses(clause.Returning{}).Create(&newRecords) + res := ss.DB.Model(&StakerShares{}).Clauses(clause.Returning{}).Create(&newRecords) if res.Error != nil { ss.logger.Sugar().Errorw("Failed to create new operator_shares records", zap.Error(res.Error)) return res.Error @@ -638,7 +627,7 @@ func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error { // Update existing records that were cloned from the previous block if len(updateRecords) > 0 { for _, record := range updateRecords { - res := ss.Db.Model(&StakerShares{}). + res := ss.DB.Model(&StakerShares{}). Where("staker = ? and strategy = ? and block_number = ?", record.Staker, record.Strategy, record.BlockNumber). Updates(map[string]interface{}{ "shares": record.Shares, @@ -664,75 +653,30 @@ func (ss *StakerSharesModel) GenerateStateRoot(blockNumber uint64) (types.StateR return "", err } - fullTree, err := ss.merkelizeState(blockNumber, diffs) + inputs := ss.sortValuesForMerkleTree(diffs) + + fullTree, err := ss.MerkleizeState(blockNumber, inputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (ss *StakerSharesModel) merkelizeState(blockNumber uint64, diffs []StakerSharesDiff) (*merkletree.MerkleTree, error) { - // Create a merkle tree with the structure: - // strategy: map[staker]: shares - om := orderedmap.New[string, *orderedmap.OrderedMap[string, string]]() - +func (ss *StakerSharesModel) sortValuesForMerkleTree(diffs []StakerSharesDiff) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) for _, diff := range diffs { - existingStrategy, found := om.Get(diff.Strategy) - if !found { - existingStrategy = orderedmap.New[string, string]() - om.Set(diff.Strategy, existingStrategy) - - prev := om.GetPair(diff.Strategy).Prev() - if prev != nil && strings.Compare(prev.Key, diff.Strategy) >= 0 { - om.Delete(diff.Strategy) - return nil, fmt.Errorf("strategy not in order") - } - } - existingStrategy.Set(diff.Staker, diff.Shares.String()) - - prev := existingStrategy.GetPair(diff.Staker).Prev() - if prev != nil && strings.Compare(prev.Key, diff.Staker) >= 0 { - existingStrategy.Delete(diff.Staker) - return nil, fmt.Errorf("operator not in order") - } + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: NewSlotID(diff.Staker, diff.Strategy), + Value: diff.Shares.Bytes(), + }) } + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) - leaves := ss.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - for strat := om.Oldest(); strat != nil; strat = strat.Next() { - stakerLeaves := make([][]byte, 0) - for staker := strat.Value.Oldest(); staker != nil; staker = staker.Next() { - stakerAddr := staker.Key - shares := staker.Value - stakerLeaves = append(stakerLeaves, encodeStakerSharesLeaf(stakerAddr, shares)) - } - - stratTree, err := merkletree.NewTree( - merkletree.WithData(stakerLeaves), - merkletree.WithHashType(keccak256.New()), - ) - if err != nil { - return nil, err - } - leaves = append(leaves, encodeStratTree(strat.Key, stratTree.Root())) - } - return merkletree.NewTree( - merkletree.WithData(leaves), - merkletree.WithHashType(keccak256.New()), - ) -} - -func encodeStakerSharesLeaf(staker string, shares string) []byte { - stakerBytes := []byte(staker) - sharesBytes := []byte(shares) - - return append(stakerBytes, sharesBytes...) -} - -func encodeStratTree(strategy string, stakerTreeRoot []byte) []byte { - strategyBytes := []byte(strategy) - return append(strategyBytes, stakerTreeRoot...) + return inputs } func (ss *StakerSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return ss.BaseEigenState.DeleteState("staker_shares", startBlockNumber, endBlockNumber, ss.Db) + return ss.BaseEigenState.DeleteState("staker_shares", startBlockNumber, endBlockNumber, ss.DB) } diff --git a/internal/eigenState/stakerShares/stakerShares_test.go b/internal/eigenState/stakerShares/stakerShares_test.go index 503db133..3d721b0b 100644 --- a/internal/eigenState/stakerShares/stakerShares_test.go +++ b/internal/eigenState/stakerShares/stakerShares_test.go @@ -46,8 +46,7 @@ func teardown(model *StakerSharesModel) { `delete from transaction_logs`, } for _, query := range queries { - - model.Db.Raw(query) + model.DB.Raw(query) } } @@ -60,7 +59,7 @@ func Test_StakerSharesState(t *testing.T) { t.Run("Should create a new OperatorSharesState", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) assert.Nil(t, err) assert.NotNil(t, model) }) @@ -81,7 +80,7 @@ func Test_StakerSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -118,7 +117,7 @@ func Test_StakerSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -154,7 +153,7 @@ func Test_StakerSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -190,7 +189,7 @@ func Test_StakerSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -287,7 +286,7 @@ func Test_StakerSharesState(t *testing.T) { DeletedAt: time.Time{}, } - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) err = model.InitBlockProcessing(blockNumber) assert.Nil(t, err) @@ -316,7 +315,7 @@ func Test_StakerSharesState(t *testing.T) { query := `select * from staker_shares where block_number = ?` results := []*StakerShares{} - res = model.Db.Raw(query, blockNumber).Scan(&results) + res = model.DB.Raw(query, blockNumber).Scan(&results) assert.Nil(t, res.Error) assert.Equal(t, 1, len(results)) @@ -324,7 +323,7 @@ func Test_StakerSharesState(t *testing.T) { }) t.Run("Should handle an M1 withdrawal and migration to M2 correctly", func(t *testing.T) { esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewStakerSharesModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewStakerSharesModel(esm, grm, l, cfg) assert.Nil(t, err) originBlockNumber := uint64(101) @@ -384,7 +383,7 @@ func Test_StakerSharesState(t *testing.T) { assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) assert.Equal(t, "-246393621132195985", typedChange.Changes[0].Shares.String()) - slotId := NewSlotId(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) + slotId := NewSlotID(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) accumulatedState, ok := model.stateAccumulator[originBlockNumber][slotId] assert.True(t, ok) @@ -423,7 +422,7 @@ func Test_StakerSharesState(t *testing.T) { // verify the M1 withdrawal was processed correctly query := `select * from staker_shares where block_number = ?` results := []*StakerShares{} - res = model.Db.Raw(query, originBlockNumber).Scan(&results) + res = model.DB.Raw(query, originBlockNumber).Scan(&results) assert.Nil(t, res.Error) assert.Equal(t, 1, len(results)) @@ -486,7 +485,7 @@ func Test_StakerSharesState(t *testing.T) { assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) assert.Equal(t, "246393621132195985", typedChange.Changes[0].Shares.String()) - slotId = NewSlotId(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) + slotId = NewSlotID(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) accumulatedState, ok = model.stateAccumulator[blockNumber][slotId] assert.True(t, ok) @@ -504,7 +503,7 @@ func Test_StakerSharesState(t *testing.T) { where block_number = ? ` results = []*StakerShares{} - res = model.Db.Raw(query, blockNumber).Scan(&results) + res = model.DB.Raw(query, blockNumber).Scan(&results) assert.Nil(t, res.Error) assert.Equal(t, 1, len(results)) diff --git a/internal/eigenState/stateManager/stateManager.go b/internal/eigenState/stateManager/stateManager.go index 0504f092..200eb48f 100644 --- a/internal/eigenState/stateManager/stateManager.go +++ b/internal/eigenState/stateManager/stateManager.go @@ -26,14 +26,14 @@ type StateRoot struct { type EigenStateManager struct { StateModels map[int]types.IEigenStateModel logger *zap.Logger - Db *gorm.DB + DB *gorm.DB } func NewEigenStateManager(logger *zap.Logger, grm *gorm.DB) *EigenStateManager { return &EigenStateManager{ StateModels: make(map[int]types.IEigenStateModel), logger: logger, - Db: grm, + DB: grm, } } @@ -138,7 +138,7 @@ func (e *EigenStateManager) WriteStateRoot( StateRoot: string(stateroot), } - result := e.Db.Model(&StateRoot{}).Clauses(clause.Returning{}).Create(&root) + result := e.DB.Model(&StateRoot{}).Clauses(clause.Returning{}).Create(&root) if result.Error != nil { return nil, result.Error } @@ -147,7 +147,7 @@ func (e *EigenStateManager) WriteStateRoot( func (e *EigenStateManager) GetStateRootForBlock(blockNumber uint64) (*StateRoot, error) { root := &StateRoot{} - result := e.Db.Model(&StateRoot{}).Where("eth_block_number = ?", blockNumber).First(&root) + result := e.DB.Model(&StateRoot{}).Where("eth_block_number = ?", blockNumber).First(&root) if result.Error != nil { return nil, result.Error } @@ -173,10 +173,10 @@ func (e *EigenStateManager) GetSortedModelIndexes() []int { func (e *EigenStateManager) GetLatestStateRoot() (*StateRoot, error) { root := &StateRoot{} - result := e.Db.Model(&StateRoot{}).Order("eth_block_number desc").First(&root) + result := e.DB.Model(&StateRoot{}).Order("eth_block_number desc").First(&root) if result.Error != nil { if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, nil + return root, nil } return nil, result.Error } diff --git a/internal/eigenState/stateManager/stateManager_test.go b/internal/eigenState/stateManager/stateManager_test.go index 2c13c876..0ada129f 100644 --- a/internal/eigenState/stateManager/stateManager_test.go +++ b/internal/eigenState/stateManager/stateManager_test.go @@ -38,7 +38,7 @@ func setup() ( } func teardown(model *EigenStateManager) { - model.Db.Exec("delete from state_roots") + model.DB.Exec("delete from state_roots") } func Test_StateManager(t *testing.T) { diff --git a/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots.go b/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots.go index 6ec996be..9ac4e176 100644 --- a/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots.go +++ b/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots.go @@ -16,9 +16,6 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/eigenState/types" "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/utils" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" @@ -36,30 +33,24 @@ type SubmittedDistributionRoots struct { CreatedAtBlockNumber uint64 } -type SlotId string - -func NewSlotId(root string, rootIndex uint64) SlotId { - return SlotId(fmt.Sprintf("%s_%d", root, rootIndex)) +func NewSlotID(root string, rootIndex uint64) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%d", root, rootIndex)) } type SubmittedDistributionRootsModel struct { base.BaseEigenState StateTransitions types.StateTransitions[SubmittedDistributionRoots] - Db *gorm.DB - Network config.Network - Environment config.Environment + DB *gorm.DB logger *zap.Logger globalConfig *config.Config // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[SlotId]*SubmittedDistributionRoots + stateAccumulator map[uint64]map[types.SlotID]*SubmittedDistributionRoots } func NewSubmittedDistributionRootsModel( esm *stateManager.EigenStateManager, grm *gorm.DB, - Network config.Network, - Environment config.Environment, logger *zap.Logger, globalConfig *config.Config, ) (*SubmittedDistributionRootsModel, error) { @@ -67,12 +58,10 @@ func NewSubmittedDistributionRootsModel( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, + DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[SlotId]*SubmittedDistributionRoots), + stateAccumulator: make(map[uint64]map[types.SlotID]*SubmittedDistributionRoots), } esm.RegisterState(model, 4) @@ -157,7 +146,7 @@ func (sdr *SubmittedDistributionRootsModel) GetStateTransitions() (types.StateTr activatedAt := outputData.ActivatedAt - slotId := NewSlotId(root, rootIndex) + slotId := NewSlotID(root, rootIndex) record, ok := sdr.stateAccumulator[log.BlockNumber][slotId] if ok { err := xerrors.Errorf("Duplicate distribution root submitted for slot %s at block %d", slotId, log.BlockNumber) @@ -208,7 +197,7 @@ func (sdr *SubmittedDistributionRootsModel) IsInterestingLog(log *storage.Transa } func (sdr *SubmittedDistributionRootsModel) InitBlockProcessing(blockNumber uint64) error { - sdr.stateAccumulator[blockNumber] = make(map[SlotId]*SubmittedDistributionRoots) + sdr.stateAccumulator[blockNumber] = make(map[types.SlotID]*SubmittedDistributionRoots) return nil } @@ -247,7 +236,7 @@ func (sdr *SubmittedDistributionRootsModel) clonePreviousBlocksToNewBlock(blockN from submitted_distribution_roots where block_number = @previousBlock ` - res := sdr.Db.Exec(query, + res := sdr.DB.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) @@ -270,7 +259,7 @@ func (sdr *SubmittedDistributionRootsModel) prepareState(blockNumber uint64) ([] return nil, err } - slotIds := make([]SlotId, 0) + slotIds := make([]types.SlotID, 0) for slotId := range accumulatedState { slotIds = append(slotIds, slotId) } @@ -291,7 +280,7 @@ func (sdr *SubmittedDistributionRootsModel) prepareState(blockNumber uint64) ([] and concat(root, '_', root_index) in @slotIds ` existingRecords := make([]SubmittedDistributionRoots, 0) - res := sdr.Db.Model(&SubmittedDistributionRoots{}). + res := sdr.DB.Model(&SubmittedDistributionRoots{}). Raw(query, sql.Named("currentBlock", blockNumber), sql.Named("slotIds", slotIds), @@ -337,7 +326,7 @@ func (sdr *SubmittedDistributionRootsModel) CommitFinalState(blockNumber uint64) } if len(records) > 0 { - res := sdr.Db.Model(&SubmittedDistributionRoots{}).Clauses(clause.Returning{}).Create(&records) + res := sdr.DB.Model(&SubmittedDistributionRoots{}).Clauses(clause.Returning{}).Create(&records) if res.Error != nil { sdr.logger.Sugar().Errorw("Failed to create new submitted_distribution_roots records", zap.Error(res.Error)) return res.Error @@ -352,54 +341,36 @@ func (sdr *SubmittedDistributionRootsModel) ClearAccumulatedState(blockNumber ui return nil } +func (sdr *SubmittedDistributionRootsModel) sortValuesForMerkleTree(inputs []SubmittedDistributionRoots) []*base.MerkleTreeInput { + slices.SortFunc(inputs, func(i, j SubmittedDistributionRoots) int { + return int(i.RootIndex - j.RootIndex) + }) + + values := make([]*base.MerkleTreeInput, 0) + for _, input := range inputs { + values = append(values, &base.MerkleTreeInput{ + SlotID: NewSlotID(input.Root, input.RootIndex), + Value: []byte(input.Root), + }) + } + return values +} + func (sdr *SubmittedDistributionRootsModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { diffs, err := sdr.prepareState(blockNumber) if err != nil { return "", err } - fullTree, err := sdr.merkelizeState(blockNumber, diffs) + sortedInputs := sdr.sortValuesForMerkleTree(diffs) + + fullTree, err := sdr.MerkleizeState(blockNumber, sortedInputs) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (sdr *SubmittedDistributionRootsModel) merkelizeState(blockNumber uint64, diffs []SubmittedDistributionRoots) (*merkletree.MerkleTree, error) { - // Create a merkle tree with the structure: - // rootIndex: root - om := orderedmap.New[uint64, string]() - - for _, diff := range diffs { - _, found := om.Get(diff.RootIndex) - if !found { - om.Set(diff.RootIndex, diff.Root) - - prev := om.GetPair(diff.RootIndex).Prev() - if prev != nil && prev.Key > diff.RootIndex { - om.Delete(diff.RootIndex) - return nil, fmt.Errorf("root indexes not in order") - } - } else { - return nil, fmt.Errorf("duplicate root index %d", diff.RootIndex) - } - } - - leaves := sdr.InitializeMerkleTreeBaseStateWithBlock(blockNumber) - for rootIndex := om.Oldest(); rootIndex != nil; rootIndex = rootIndex.Next() { - leaves = append(leaves, encodeRootIndexLeaf(rootIndex.Key, rootIndex.Value)) - } - return merkletree.NewTree( - merkletree.WithData(leaves), - merkletree.WithHashType(keccak256.New()), - ) -} - -func encodeRootIndexLeaf(rootIndex uint64, root string) []byte { - rootIndexBytes := []byte(fmt.Sprintf("%d", rootIndex)) - return append(rootIndexBytes, []byte(root)...) -} - func (sdr *SubmittedDistributionRootsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { - return sdr.BaseEigenState.DeleteState("submitted_distribution_roots", startBlockNumber, endBlockNumber, sdr.Db) + return sdr.BaseEigenState.DeleteState("submitted_distribution_roots", startBlockNumber, endBlockNumber, sdr.DB) } diff --git a/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots_test.go b/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots_test.go index f7ff1cc3..43a9a991 100644 --- a/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots_test.go +++ b/internal/eigenState/submittedDistributionRoots/submittedDistributionRoots_test.go @@ -41,7 +41,7 @@ func teardown(model *SubmittedDistributionRootsModel) { `delete from submitted_distribution_roots`, } for _, query := range queries { - model.Db.Raw(query) + model.DB.Raw(query) } } @@ -53,7 +53,7 @@ func Test_SubmittedDistributionRoots(t *testing.T) { } esm := stateManager.NewEigenStateManager(l, grm) - model, err := NewSubmittedDistributionRootsModel(esm, grm, cfg.Network, cfg.Environment, l, cfg) + model, err := NewSubmittedDistributionRootsModel(esm, grm, l, cfg) insertedRoots := make([]*SubmittedDistributionRoots, 0) @@ -99,7 +99,7 @@ func Test_SubmittedDistributionRoots(t *testing.T) { query := `SELECT * FROM submitted_distribution_roots WHERE block_number = ?` var roots []*SubmittedDistributionRoots - res := model.Db.Raw(query, blockNumber).Scan(&roots) + res := model.DB.Raw(query, blockNumber).Scan(&roots) assert.Nil(t, res.Error) assert.Equal(t, 1, len(roots)) @@ -152,7 +152,7 @@ func Test_SubmittedDistributionRoots(t *testing.T) { query := `SELECT * FROM submitted_distribution_roots WHERE block_number = ?` var roots []*SubmittedDistributionRoots - res := model.Db.Raw(query, blockNumber).Scan(&roots) + res := model.DB.Raw(query, blockNumber).Scan(&roots) assert.Nil(t, res.Error) assert.Equal(t, 2, len(roots)) diff --git a/internal/eigenState/types/types.go b/internal/eigenState/types/types.go index 12e58964..44067185 100644 --- a/internal/eigenState/types/types.go +++ b/internal/eigenState/types/types.go @@ -48,3 +48,5 @@ type IEigenStateModel interface { // StateTransitions // Map of block number to function that will transition the state to the next block. type StateTransitions[T interface{}] map[uint64]func(log *storage.TransactionLog) (*T, error) + +type SlotID string diff --git a/internal/pipeline/pipeline_integration_test.go b/internal/pipeline/pipeline_integration_test.go index 938cf231..4b652e04 100644 --- a/internal/pipeline/pipeline_integration_test.go +++ b/internal/pipeline/pipeline_integration_test.go @@ -98,22 +98,22 @@ func setup() ( sm := stateManager.NewEigenStateManager(l, grm) - if _, err := avsOperators.NewAvsOperators(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := avsOperators.NewAvsOperators(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create AvsOperatorsModel", zap.Error(err)) } - if _, err := operatorShares.NewOperatorSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := operatorShares.NewOperatorSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create OperatorSharesModel", zap.Error(err)) } - if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerDelegations.NewStakerDelegationsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerDelegationsModel", zap.Error(err)) } - if _, err := stakerShares.NewStakerSharesModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := stakerShares.NewStakerSharesModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create StakerSharesModel", zap.Error(err)) } - if _, err := submittedDistributionRoots.NewSubmittedDistributionRootsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := submittedDistributionRoots.NewSubmittedDistributionRootsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create SubmittedDistributionRootsModel", zap.Error(err)) } - if _, err := rewardSubmissions.NewRewardSubmissionsModel(sm, grm, cfg.Network, cfg.Environment, l, cfg); err != nil { + if _, err := rewardSubmissions.NewRewardSubmissionsModel(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to create RewardSubmissionsModel", zap.Error(err)) } diff --git a/internal/sidecar/blockIndexer.go b/internal/sidecar/blockIndexer.go index 0f84b87a..a7be0308 100644 --- a/internal/sidecar/blockIndexer.go +++ b/internal/sidecar/blockIndexer.go @@ -104,6 +104,8 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { shouldShutdown := false + // Spin up a goroutine that listens on a channel for a shutdown signal. + // When the signal is received, set shouldShutdown to true and return. go func() { for { select { @@ -114,6 +116,9 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { } }() + // Every 30 seconds, check to see if the current tip has changed while the backfill/sync + // process is still running. If it has changed, update the value which will extend the loop + // to include the newly discovered blocks. go func() { for { time.Sleep(time.Second * 30) @@ -140,6 +145,7 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { runningAvg := 0 totalDurationMs := 0 lastBlockParsed := latestBlock + for i := latestBlock; i <= int64(ct.Get()); i++ { if shouldShutdown { s.Logger.Sugar().Infow("Shutting down block processor")