Skip to content

Commit

Permalink
merge 1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
benbaley committed Nov 8, 2023
2 parents 6acb557 + ff9b138 commit c87e9b5
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 95 deletions.
39 changes: 26 additions & 13 deletions consensus/cbft/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,16 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even

isValidatorAfter := vp.isValidator(epoch, vp.nodeID)

nodes := make(map[enode.ID]struct{})
for _, validator := range vp.currentValidators.Nodes {
nodes[validator.NodeID] = struct{}{}
if isValidatorBefore || isValidatorAfter {
nodes := make(map[enode.ID]struct{})
for _, validator := range vp.currentValidators.Nodes {
nodes[validator.NodeID] = struct{}{}
}
eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes})
}
eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes})

removes := make([]*enode.Node, 0)
adds := make([]*enode.Node, 0)

if isValidatorBefore {
// If we are still a consensus node, that adding
Expand All @@ -435,21 +440,22 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even
if isValidatorAfter {
for nodeID, vnode := range vp.prevValidators.Nodes {
if node, _ := vp.currentValidators.FindNodeByID(nodeID); node == nil {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String())
removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID, "isValidatorAfter")
}
}

for nodeID, vnode := range vp.currentValidators.Nodes {
if node, _ := vp.prevValidators.FindNodeByID(nodeID); node == nil {
eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String())
adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to AddValidatorEvent", "nodeID", nodeID)
}
}

} else {
for nodeID, vnode := range vp.prevValidators.Nodes {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String())
removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID)
}
}
} else {
Expand All @@ -459,12 +465,19 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even
// with other validators in the consensus stages.
if isValidatorAfter {
for nodeID, vnode := range vp.currentValidators.Nodes {
eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)})
log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String())
adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0))
log.Trace("add to AddValidatorEvent", "nodeID", nodeID)
}
}
}

if len(removes) > 0 {
eventMux.Post(cbfttypes.RemoveValidatorEvent{Nodes: removes})
log.Trace("Post RemoveValidatorEvent", "num", len(removes), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter)
}
if len(adds) > 0 {
eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds})
log.Trace("Post AddValidatorEvent", "num", len(adds), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/cbfttypes/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type CbftResult struct {
}

type AddValidatorEvent struct {
Node *enode.Node
Nodes []*enode.Node
}

type RemoveValidatorEvent struct {
Node *enode.Node
Nodes []*enode.Node
}

type UpdateValidatorEvent struct {
Expand Down
5 changes: 5 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ package rawdb

import (
"bytes"
"encoding/binary"
"fmt"
"github.com/davecgh/go-spew/spew"
"math/rand"
"os"
"path/filepath"
"reflect"
"sync/atomic"
"testing"
"testing/quick"
"time"

"github.com/PlatONnetwork/PlatON-Go/metrics"
Expand Down
46 changes: 25 additions & 21 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,10 @@ func answerGetPPOSStorageMsgQueryV2(backend Backend, head uint64, peer *Peer) er
f := func(baseBlock uint64, iter iterator.Iterator, blocks []rlp.RawValue) error {
peer.Log().Debug("begin answerGetPPOSStorageMsgQueryV2", "blocks", len(blocks), "baseBlock", baseBlock)
var (
byteSize int
ps PposStoragePacket
count int
byteSize int
ps PposStoragePacket
count int
pposStorageRlps []rlp.RawValue
)
ps.KVs = make([][2][]byte, 0)
for iter.Next() {
Expand Down Expand Up @@ -629,29 +630,32 @@ func answerGetPPOSStorageMsgQueryV2(backend Backend, head uint64, peer *Peer) er
return fmt.Errorf("send last ppos storage message fail,%v", err)
}

var (
pposStorageRlps []rlp.RawValue
)
byteSize = 0

for _, encoded := range blocks {
if byteSize >= softResponseLimit {
id := rand.Uint64()
if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil {
return fmt.Errorf("reply ppos storage v2 message fail,%v", err)
}
pposStorageRlps = []rlp.RawValue{}
byteSize = 0
} else {
pposStorageRlps = append(pposStorageRlps, encoded)
byteSize += len(encoded)
}
}
if len(pposStorageRlps) > 0 {
if head == baseBlock {
id := rand.Uint64()
if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil {
return fmt.Errorf("reply last ppos storage v2 message fail,%v", err)
}
} else {
for _, encoded := range blocks {
if byteSize >= softResponseLimit {
id := rand.Uint64()
if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil {
return fmt.Errorf("reply ppos storage v2 message fail,%v", err)
}
pposStorageRlps = []rlp.RawValue{}
byteSize = 0
} else {
pposStorageRlps = append(pposStorageRlps, encoded)
byteSize += len(encoded)
}
}
if len(pposStorageRlps) > 0 {
id := rand.Uint64()
if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil {
return fmt.Errorf("reply last ppos storage v2 message fail,%v", err)
}
}
}
peer.Log().Debug("end answerGetPPOSStorageMsgQueryV2", "blocks", len(blocks), "baseBlock", baseBlock)
return nil
Expand Down
11 changes: 3 additions & 8 deletions p2p/consensus_dialed.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package p2p

import (
"fmt"
"strings"

"github.com/PlatONnetwork/PlatON-Go/log"
"github.com/PlatONnetwork/PlatON-Go/p2p/enode"
)
Expand Down Expand Up @@ -68,8 +65,6 @@ func (tasks *dialedTasks) RemoveTask(NodeID enode.ID) error {
}

func (tasks *dialedTasks) ListTask() []*dialTask {

log.Info("[after list]Consensus dialed task list after ListTask operation", "task queue", tasks.description())
return tasks.queue
}

Expand Down Expand Up @@ -138,10 +133,10 @@ func (tasks *dialedTasks) isEmpty() bool {
return false
}

func (tasks *dialedTasks) description() string {
func (tasks *dialedTasks) description() []string {
var description []string
for _, t := range tasks.queue {
description = append(description, fmt.Sprintf("%x", t.dest.ID().TerminalString()))
description = append(description, t.dest.ID().TerminalString())
}
return strings.Join(description, ",")
return description
}
5 changes: 5 additions & 0 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ func (d *dialScheduler) startConsensusDials(n int) (started int) {
n = 3
}
}
if n <= 0 {
return
}

log.Debug("startConsensusDials", "maxConsensusPeers", d.MaxConsensusPeers, "consensusPeers", d.consensusPeers, "n", n, "task queue", d.consensusPool.description())

// Create dials for consensus nodes if they are not connected.
for _, t := range d.consensusPool.ListTask() {
Expand Down
46 changes: 4 additions & 42 deletions p2p/discover/v4wire/v4wire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package v4wire

import (
"bytes"
"encoding/hex"
"net"
"reflect"
Expand All @@ -35,7 +34,7 @@ var testPackets = []struct {
wantPacket Packet
}{
/*{
input: "96e7a55f265b738379447058e63d7bad33b05b1e3e93d9537f05be9975090bd72dbe25ab1d57c3de65e0b089f6b884c2273d29b343211483557f7c46f3df0b2650779f3459fabbf34261f113b4f29f6d5f540ea7d3fe5e35b24264517f8529110001ea04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355",
input: "86b1419e9db40e9b03fcebf50a247e8b0b0cb4f6000e8de56c33336fb54c01fff20d1818b4e7bca98129b72f3551c507705addc662a7eefa0000a51c9a39d691577bfb0b4320b6421cbafda7001dc033776ffad4a330ca74027ef38fb84c00990101eb04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355c0",
wantPacket: &Ping{
Version: 4,
From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544},
Expand All @@ -44,14 +43,15 @@ var testPackets = []struct {
},
},*/
{
input: "b045fd6e610bfe8e51393adb7aa058d60259a744cbcbceed6078b006857a7e881e32b9760ea0fd6c07fd9fbd9e73cda03c5ffa5d58c5b1cce8fc309ad2a702fc3f5a496f0e008bba521b77d5486151d391a76690d095b27adfb6a3e77d0193400101eb04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a35503",
input: "54fa00ece4514b03f0ce6380e31c6f882b644ddb3a1c8ec6fb511dd5c8b0d9853c0ec90ddf84859ae4bedef069a12db12ca895fcb8358483bc9c4935b7f7c5ef4e49d6242a82520456c1a788864eb1c510074a928b5151fcab397220ae8855090101ee04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355c1030102",
wantPacket: &Ping{
Version: 4,
From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544},
To: Endpoint{net.ParseIP("::1"), 2222, 3333},
Expiration: 1136239445,
ForkID: []rlp.RawValue{{0x03}},
//ENRSeq: 1,
ENRSeq: 1,
Rest: []rlp.RawValue{{0x02}},
},
},
{
Expand Down Expand Up @@ -121,44 +121,6 @@ func TestForwardCompatibility(t *testing.T) {
}
}

func TestPingEncode(t *testing.T) {
packet, err := rlp.EncodeToBytes(&Ping{
Version: 4,
From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544},
To: Endpoint{net.ParseIP("::1"), 2222, 3333},
Expiration: 1136239445,
ForkID: []rlp.RawValue{{0x03}},
})
if err != nil {
t.Error(err)
}
packet2, err2 := rlp.EncodeToBytes(&Ping{
Version: 4,
From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544},
To: Endpoint{net.ParseIP("::1"), 2222, 3333},
Expiration: 1136239445,
ForkID: []rlp.RawValue{{0x03}},
ENRSeq: 1,
})
if err2 != nil {
t.Error(err2)
}
packet3, err3 := rlp.EncodeToBytes(&PingV1{
Version: 4,
From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544},
To: Endpoint{net.ParseIP("::1"), 2222, 3333},
Expiration: 1136239445,
Rest: []rlp.RawValue{{0x03}},
})
if err3 != nil {
t.Error(err3)
}

if !bytes.Equal(packet, packet2) && !bytes.Equal(packet3, packet2) {
t.Error("should be same")
}
}

func hexPubkey(h string) (ret Pubkey) {
b, err := hex.DecodeString(h)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,7 @@ running:
// This channel is used by RemoveConsensusNode to remove an enode
// from the consensus node set.
srv.log.Trace("Removing consensus node", "node", n.ID())
id := n.ID()
if srv.localnode.ID() == id {
if srv.localnode.ID() == n.ID() {
srv.log.Debug("We are not an consensus node")
srv.consensus = false
}
Expand Down Expand Up @@ -1276,11 +1275,16 @@ func (srv *Server) watching() {

switch data := ev.Data.(type) {
case cbfttypes.AddValidatorEvent:
srv.log.Trace("Received AddValidatorEvent", "nodeID", data.Node.ID())
srv.AddConsensusPeer(data.Node)
srv.log.Trace("Received AddValidatorEvent", "num", len(data.Nodes))
for _, node := range data.Nodes {
srv.AddConsensusPeer(node)
}

case cbfttypes.RemoveValidatorEvent:
srv.log.Trace("Received RemoveValidatorEvent", "nodeID", data.Node.ID())
srv.RemoveConsensusPeer(data.Node)
srv.log.Trace("Received RemoveValidatorEvent", "num", len(data.Nodes))
for _, node := range data.Nodes {
srv.RemoveConsensusPeer(node)
}
case cbfttypes.UpdateValidatorEvent:
consensusPeer := 0
if _, ok := data.Nodes[srv.localnode.ID()]; ok {
Expand Down
8 changes: 5 additions & 3 deletions x/plugin/staking_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,16 @@ func (sk *StakingPlugin) Confirmed(nodeId enode.IDv0, block *types.Block) error
}

func (sk *StakingPlugin) addConsensusNode(nodes staking.ValidatorQueue) {
adds := make([]*enode.Node, 0)
for _, node := range nodes {
pub, err := node.NodeId.Pubkey()
if err != nil {
panic(err)
}
if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(pub, nil, 0, 0)}); nil != err {
log.Error("post AddValidatorEvent failed", "nodeId", node.NodeId.TerminalString(), "err", err)
}
adds = append(adds, enode.NewV4(pub, nil, 0, 0))
}
if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}); nil != err {
log.Error("post AddValidatorEvent failed", "num", len(adds), "err", err)
}
}

Expand Down

0 comments on commit c87e9b5

Please sign in to comment.