Skip to content

Commit

Permalink
feat: crosschain batch tx (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 authored Mar 20, 2023
1 parent cbdf182 commit e2bfd6a
Show file tree
Hide file tree
Showing 36 changed files with 1,068 additions and 505 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
.idea/*
build/*
.local
config/local/*
integrationtest/config/*
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ lint-fix:
@$(golangci_lint_cmd) run --fix --out-format=tab --issues-exit-code=0

format:
@go install mvdan.cc/gofumpt@latest
@go install github.com/golangci/golangci-lint/cmd/golangci-lint@$(golangci_version)
find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" -not -path "./client/docs/statik/statik.go" -not -path "./tests/mocks/*" -not -name "*.pb.go" -not -name "*.pb.gw.go" -not -name "*.pulsar.go" -not -path "./crypto/keys/secp256k1/*" | xargs gofumpt -w -l
golangci-lint run --fix
bash scripts/format.sh

.PHONY: lint lint-fix format
4 changes: 3 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func NewApp(cfg *config.Config) *App {
model.InitBSCTables(db)
model.InitGreenfieldTables(db)
model.InitVoteTables(db)
model.InitSequenceTable(db)

greenfieldDao := dao.NewGreenfieldDao(db)
bscDao := dao.NewBSCDao(db)
voteDao := dao.NewVoteDao(db)
daoManager := dao.NewDaoManager(greenfieldDao, bscDao, voteDao)
seqDao := dao.NewSequenceDao(db)
daoManager := dao.NewDaoManager(greenfieldDao, bscDao, voteDao, seqDao)

greenfieldExecutor := executor.NewGreenfieldExecutor(cfg)
bscExecutor := executor.NewBSCExecutor(cfg)
Expand Down
177 changes: 97 additions & 80 deletions assembler/bsc_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package assembler

import (
"encoding/hex"
"errors"
"fmt"
"time"

"github.com/bnb-chain/greenfield-relayer/common"
"github.com/bnb-chain/greenfield-relayer/config"
"github.com/bnb-chain/greenfield-relayer/db"
"github.com/bnb-chain/greenfield-relayer/db/dao"
"github.com/bnb-chain/greenfield-relayer/db/model"
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/logging"
"github.com/bnb-chain/greenfield-relayer/types"
Expand Down Expand Up @@ -40,125 +41,141 @@ func (a *BSCAssembler) AssemblePackagesAndClaimLoop() {
}

func (a *BSCAssembler) assemblePackagesAndClaimForOracleChannel(channelId types.ChannelId) {
for {
ticker := time.NewTicker(common.RetryInterval)
for range ticker.C {
if err := a.process(channelId); err != nil {
logging.Logger.Errorf("encounter error when relaying packages, err=%s ", err.Error())
time.Sleep(common.RetryInterval)
}
}
}

func (a *BSCAssembler) process(channelId types.ChannelId) error {
nextSequence, err := a.bscExecutor.GetNextDeliveryOracleSequence()
inturnRelayer, err := a.greenfieldExecutor.GetInturnRelayer()
if err != nil {
return err
}
isInturnRelyer := inturnRelayer.BlsPubKey == a.blsPubKey
var startSequence uint64
if isInturnRelyer {
seq, err := a.daoManager.SequenceDao.GetByChannelId(uint8(channelId))
if err != nil {
return err
}
startSequence = uint64(seq.Sequence)

pkgs, err := a.daoManager.BSCDao.GetPackagesByOracleSequenceAndStatus(nextSequence, db.AllVoted)
// in-turn relayer get the start sequence from chain first time, it starts to relay after the sequence gets updated
now := time.Now().Unix()
timeDiff := now - int64(inturnRelayer.RelayInterval.Start)

if timeDiff < a.config.RelayConfig.GreenfieldSequenceUpdateLatency {
if timeDiff < 0 {
return fmt.Errorf("blockchain time and relayer time is not consistent, now %d should be after %d", now, inturnRelayer.RelayInterval.Start)
}
time.Sleep(time.Duration(timeDiff) * time.Second)
startSequence, err = a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
}
if err = a.daoManager.SequenceDao.Upsert(uint8(channelId), startSequence); err != nil {
return err
}
}
logging.Logger.Debug("bsc relay as in-turn relayer")
} else {
// non-inturn relayer retries every 10 second, gets the sequence from chain
time.Sleep(time.Duration(a.config.RelayConfig.GreenfieldSequenceUpdateLatency) * time.Second)
startSequence, err = a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
}
logging.Logger.Debug("bsc relay as out-turn relayer")
if err := a.daoManager.BSCDao.UpdateBatchPackagesStatusToDelivered(startSequence); err != nil {
return err
}
}
endSequence, err := a.daoManager.BSCDao.GetLatestOracleSequenceByStatus(db.AllVoted)
if err != nil {
logging.Logger.Errorf("failed to get all validator voted tx with channel id %d and sequence : %d", channelId, nextSequence)
return err
}
if len(pkgs) == 0 {
if endSequence == -1 {
return nil
}
nonce, err := a.greenfieldExecutor.GetNonce()
if err != nil {
return err
}

var pkgIds []int64
for _, p := range pkgs {
pkgIds = append(pkgIds, p.Id)
for i := startSequence; i <= uint64(endSequence); i++ {
pkgs, err := a.daoManager.BSCDao.GetPackagesByOracleSequence(i)
if err != nil {
return err
}
if len(pkgs) == 0 {
return nil
}
status := pkgs[0].Status
pkgTime := pkgs[0].TxTime

if status != db.AllVoted && status != db.Delivered {
return fmt.Errorf("packages with oracle sequence %d does not get enough votes yet", i)
}

// non-inturn relayer can not relay tx within the timeout of in-turn relayer
if !isInturnRelyer && time.Now().Unix() < pkgTime+a.config.RelayConfig.BSCToGreenfieldInturnRelayerTimeout {
return nil
}
if err := a.processPkgs(pkgs, uint8(channelId), i, nonce, isInturnRelyer); err != nil {
return err
}
logging.Logger.Infof("relayed packages with oracle sequence %d ", i)

nonce++
}
return nil
}

func (a *BSCAssembler) processPkgs(pkgs []*model.BscRelayPackage, channelId uint8, sequence uint64, nonce uint64, isInturnRelyer bool) error {
// Get votes result for a packages, which are already validated and qualified to aggregate sig
votes, err := a.daoManager.VoteDao.GetVotesByChannelIdAndSequence(uint8(channelId), nextSequence)
votes, err := a.daoManager.VoteDao.GetVotesByChannelIdAndSequence(channelId, sequence)
if err != nil {
logging.Logger.Errorf("failed to get votes result for packages for channel %d and sequence %d", channelId, nextSequence)
logging.Logger.Errorf("failed to get votes result for packages for channel %d and sequence %d", channelId, sequence)
return err
}

validators, err := a.greenfieldExecutor.QueryCachedLatestValidators()
if err != nil {
return err
}

aggregatedSignature, valBitSet, err := vote.AggregateSignatureAndValidatorBitSet(votes, validators)
if err != nil {
return err
}

relayerPubKeys, err := a.greenfieldExecutor.GetValidatorsBlsPublicKey()
txHash, err := a.greenfieldExecutor.ClaimPackages(votes[0].ClaimPayload, aggregatedSignature, valBitSet.Bytes(), pkgs[0].TxTime, sequence, nonce)
if err != nil {
return err
}
logging.Logger.Infof("claimed transaction with txHash %s", txHash)

// packages for same oracle sequence share one timestamp
pkgTs := pkgs[0].TxTime

relayerIdx := util.IndexOf(a.blsPubKey, relayerPubKeys)
if relayerIdx == -1 {
return errors.New(" relayer's bls pub key not found. ")
}

firstInturnRelayerIdx := int(pkgTs) % len(relayerPubKeys)
packagesRelayStartTime := pkgTs + a.config.RelayConfig.BSCToGreenfieldRelayingDelayTime
logging.Logger.Infof("packages will be relayed starting at %d", packagesRelayStartTime)

var indexDiff int
if relayerIdx >= firstInturnRelayerIdx {
indexDiff = relayerIdx - firstInturnRelayerIdx
} else {
indexDiff = len(relayerPubKeys) - (firstInturnRelayerIdx - relayerIdx)
}

curRelayerRelayingStartTime := int64(0)
if indexDiff == 0 {
curRelayerRelayingStartTime = packagesRelayStartTime
} else {
curRelayerRelayingStartTime = packagesRelayStartTime + a.config.RelayConfig.FirstInTurnRelayerRelayingWindow + int64(indexDiff-1)*a.config.RelayConfig.InTurnRelayerRelayingWindow
var pkgIds []int64
for _, p := range pkgs {
pkgIds = append(pkgIds, p.Id)
}
logging.Logger.Infof("current relayer starts relaying from %d", curRelayerRelayingStartTime)

filled := make(chan struct{})
errC := make(chan error)
ticker := time.NewTicker(common.RetryInterval)

go a.validateSequenceFilled(filled, errC, nextSequence)

for {
select {
case err = <-errC:
if !isInturnRelyer {
if err = a.daoManager.BSCDao.UpdateBatchPackagesClaimedTxHash(pkgIds, txHash); err != nil {
return err
case <-filled:
if err = a.daoManager.BSCDao.UpdateBatchPackagesStatus(pkgIds, db.Delivered); err != nil {
logging.Logger.Errorf("failed to update packages status to 'Delivered', package Ids =%v", pkgIds)
return err
}
return nil
case <-ticker.C:
if time.Now().Unix() >= curRelayerRelayingStartTime {
txHash, err := a.greenfieldExecutor.ClaimPackages(votes[0].ClaimPayload, aggregatedSignature, valBitSet.Bytes(), pkgTs)
if err != nil {
return err
}
logging.Logger.Infof("claimed transaction with txHash %s", txHash)
if err = a.daoManager.BSCDao.UpdateBatchPackagesClaimedTxHash(pkgIds, txHash); err != nil {
return err
}
time.Sleep(executor.GnfdSequenceUpdateLatency)
return nil
}
}
return nil
}
}

func (a *BSCAssembler) validateSequenceFilled(filled chan struct{}, errC chan error, sequence uint64) {
ticker := time.NewTicker(common.RetrieveSequenceInterval)
defer ticker.Stop()
for range ticker.C {
nextDeliverySequence, err := a.bscExecutor.GetNextDeliveryOracleSequence()
if err != nil {
errC <- err
}
if sequence < nextDeliverySequence {
logging.Logger.Infof("Oracle sequence %d has been filled ", sequence)
filled <- struct{}{}
}
if err = a.daoManager.BSCDao.UpdateBatchPackagesStatusAndClaimedTxHash(pkgIds, db.Delivered, txHash); err != nil {
logging.Logger.Errorf("failed to update packages to 'Delivered', error=%s", err.Error())
return err
}
if err = a.daoManager.SequenceDao.Upsert(channelId, sequence+1); err != nil {
return err
}

return nil
}
Loading

0 comments on commit e2bfd6a

Please sign in to comment.