diff --git a/consensus/bft_mock.go b/consensus/bft_mock.go index 1cdd3bd71c..95fc8b0874 100644 --- a/consensus/bft_mock.go +++ b/consensus/bft_mock.go @@ -287,6 +287,10 @@ func (bm *BftMock) ConsensusNodes() ([]enode.ID, error) { return nil, nil } +func (bm *BftMock) ConsensusValidators() []*cbfttypes.ValidateNode { + return nil +} + // ShouldSeal returns whether the current node is out of the block func (bm *BftMock) ShouldSeal(curTime time.Time) (bool, error) { return true, nil diff --git a/consensus/cbft/cbft.go b/consensus/cbft/cbft.go index 6e60c17662..02ddf1c5c6 100644 --- a/consensus/cbft/cbft.go +++ b/consensus/cbft/cbft.go @@ -1185,6 +1185,16 @@ func (cbft *Cbft) ConsensusNodes() ([]enode.ID, error) { return cbft.validatorPool.ValidatorList(cbft.state.Epoch()), nil } +// ConsensusNodes returns to the list of consensus nodes. +func (cbft *Cbft) ConsensusValidators() []*cbfttypes.ValidateNode { + vs := cbft.validatorPool.Validators(cbft.state.Epoch()) + nodeList := make([]*cbfttypes.ValidateNode, 0) + for _, node := range vs.Nodes { + nodeList = append(nodeList, node) + } + return nodeList +} + // ShouldSeal check if we can seal block. func (cbft *Cbft) ShouldSeal(curTime time.Time) (bool, error) { if cbft.isLoading() || !cbft.isStart() || !cbft.running() { diff --git a/consensus/cbft/validator/validator.go b/consensus/cbft/validator/validator.go index 909ee05ecc..1579c04644 100644 --- a/consensus/cbft/validator/validator.go +++ b/consensus/cbft/validator/validator.go @@ -419,11 +419,16 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even isValidatorAfter := vp.isValidator(epoch, vp.nodeID) - nodes := make(map[enode.ID]struct{}) - for _, validator := range vp.currentValidators.Nodes { - nodes[validator.NodeID] = struct{}{} + if isValidatorBefore || isValidatorAfter { + nodes := make(map[enode.ID]struct{}) + for _, validator := range vp.currentValidators.Nodes { + nodes[validator.NodeID] = struct{}{} + } + eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes}) } - eventMux.Post(cbfttypes.UpdateValidatorEvent{Nodes: nodes}) + + removes := make([]*enode.Node, 0) + adds := make([]*enode.Node, 0) if isValidatorBefore { // If we are still a consensus node, that adding @@ -433,23 +438,24 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even // in the consensus stages. Also we are not needed // to keep connect with old validators. if isValidatorAfter { - for nodeID, vnode := range vp.currentValidators.Nodes { - if node, _ := vp.prevValidators.FindNodeByID(nodeID); node == nil { - eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String()) + for nodeID, vnode := range vp.prevValidators.Nodes { + if node, _ := vp.currentValidators.FindNodeByID(nodeID); node == nil { + removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID, "isValidatorAfter") } } - for nodeID, vnode := range vp.prevValidators.Nodes { - if node, _ := vp.currentValidators.FindNodeByID(nodeID); node == nil { - eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String()) + for nodeID, vnode := range vp.currentValidators.Nodes { + if node, _ := vp.prevValidators.FindNodeByID(nodeID); node == nil { + adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to AddValidatorEvent", "nodeID", nodeID) } } + } else { for nodeID, vnode := range vp.prevValidators.Nodes { - eventMux.Post(cbfttypes.RemoveValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post RemoveValidatorEvent", "nodeID", nodeID.String()) + removes = append(removes, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to RemoveValidatorEvent", "nodeID", nodeID) } } } else { @@ -459,12 +465,19 @@ func (vp *ValidatorPool) Update(blockNumber uint64, epoch uint64, eventMux *even // with other validators in the consensus stages. if isValidatorAfter { for nodeID, vnode := range vp.currentValidators.Nodes { - eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(vnode.PubKey, nil, 0, 0)}) - log.Trace("Post AddValidatorEvent", "nodeID", nodeID.String()) + adds = append(adds, enode.NewV4(vnode.PubKey, nil, 0, 0)) + log.Trace("add to AddValidatorEvent", "nodeID", nodeID) } } } - + if len(removes) > 0 { + eventMux.Post(cbfttypes.RemoveValidatorEvent{Nodes: removes}) + log.Trace("Post RemoveValidatorEvent", "num", len(removes), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter) + } + if len(adds) > 0 { + eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}) + log.Trace("Post AddValidatorEvent", "num", len(adds), "isValidatorBefore", isValidatorBefore, "isValidatorAfter", isValidatorAfter) + } return nil } diff --git a/consensus/consensus.go b/consensus/consensus.go index eff698936a..f91ccff918 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -180,6 +180,8 @@ type Bft interface { // Returns the current consensus node address list. ConsensusNodes() ([]enode.ID, error) + ConsensusValidators() []*cbfttypes.ValidateNode + // Returns whether the current node is out of the block ShouldSeal(curTime time.Time) (bool, error) diff --git a/core/cbfttypes/type.go b/core/cbfttypes/type.go index fd50b15f39..206a3f2e97 100644 --- a/core/cbfttypes/type.go +++ b/core/cbfttypes/type.go @@ -47,11 +47,11 @@ type CbftResult struct { } type AddValidatorEvent struct { - Node *enode.Node + Nodes []*enode.Node } type RemoveValidatorEvent struct { - Node *enode.Node + Nodes []*enode.Node } type UpdateValidatorEvent struct { diff --git a/eth/backend.go b/eth/backend.go index b42925b6f7..b29e9256fd 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -653,13 +653,13 @@ func (s *Ethereum) Start() error { // Start the networking layer and the light server if requested s.handler.Start(maxPeers) - //log.Debug("node start", "srvr.Config.PrivateKey", srvr.Config.PrivateKey) if cbftEngine, ok := s.engine.(consensus.Bft); ok { if flag := cbftEngine.IsConsensusNode(); flag { - for _, n := range s.blockchain.Config().Cbft.InitialNodes { - // todo: Mock point. + for _, n := range cbftEngine.ConsensusValidators() { if !node.FakeNetEnable { - s.p2pServer.AddConsensusPeer(n.Node) + enode := enode.NewV4(n.PubKey, nil, 0, 0) + log.Trace("PlatON start, adding consensus node", "nodeID", enode.IDv0()) + s.p2pServer.AddConsensusPeer(enode) } } } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index e2c00560c1..79d6d33c1e 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -597,9 +597,10 @@ func answerGetPPOSStorageMsgQueryV2(backend Backend, head uint64, peer *Peer) er f := func(baseBlock uint64, iter iterator.Iterator, blocks []rlp.RawValue) error { peer.Log().Debug("begin answerGetPPOSStorageMsgQueryV2", "blocks", len(blocks), "baseBlock", baseBlock) var ( - byteSize int - ps PposStoragePacket - count int + byteSize int + ps PposStoragePacket + count int + pposStorageRlps []rlp.RawValue ) ps.KVs = make([][2][]byte, 0) for iter.Next() { @@ -629,29 +630,32 @@ func answerGetPPOSStorageMsgQueryV2(backend Backend, head uint64, peer *Peer) er return fmt.Errorf("send last ppos storage message fail,%v", err) } - var ( - pposStorageRlps []rlp.RawValue - ) byteSize = 0 - - for _, encoded := range blocks { - if byteSize >= softResponseLimit { - id := rand.Uint64() - if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil { - return fmt.Errorf("reply ppos storage v2 message fail,%v", err) - } - pposStorageRlps = []rlp.RawValue{} - byteSize = 0 - } else { - pposStorageRlps = append(pposStorageRlps, encoded) - byteSize += len(encoded) - } - } - if len(pposStorageRlps) > 0 { + if head == baseBlock { id := rand.Uint64() if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil { return fmt.Errorf("reply last ppos storage v2 message fail,%v", err) } + } else { + for _, encoded := range blocks { + if byteSize >= softResponseLimit { + id := rand.Uint64() + if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil { + return fmt.Errorf("reply ppos storage v2 message fail,%v", err) + } + pposStorageRlps = []rlp.RawValue{} + byteSize = 0 + } else { + pposStorageRlps = append(pposStorageRlps, encoded) + byteSize += len(encoded) + } + } + if len(pposStorageRlps) > 0 { + id := rand.Uint64() + if err := peer.ReplyPPOSStorageV2(id, baseBlock, pposStorageRlps); err != nil { + return fmt.Errorf("reply last ppos storage v2 message fail,%v", err) + } + } } peer.Log().Debug("end answerGetPPOSStorageMsgQueryV2", "blocks", len(blocks), "baseBlock", baseBlock) return nil diff --git a/go.mod b/go.mod index ea1e1258f3..8c90d7d5e8 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,6 @@ require ( require ( github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c - github.com/herumi/bls v1.37.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 golang.org/x/net v0.17.0 ) diff --git a/go.sum b/go.sum index a6e308a2f6..febe08a485 100644 --- a/go.sum +++ b/go.sum @@ -134,12 +134,11 @@ github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs= github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/herumi/bls v1.37.0 h1:EKPaFujxWsxSMlfN1NeR9GTfVOeAsAaNRGbdBfn9lBE= -github.com/herumi/bls v1.37.0/go.mod h1:CnmR5QZ/QBnBE8Z55O+OtmUc6ICUdrOW9fwSRQwz5Bo= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= -github.com/holiman/uint256 v1.1.1 h1:4JywC80b+/hSfljFlEBLHrrh+CIONLDz9NuFl0af4Mw= github.com/holiman/uint256 v1.1.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= +github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM= +github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v0.0.0-20161224104101-679507af18f3/go.mod h1:MZ2ZmwcBpvOoJ22IJsc7va19ZwoheaBk43rKg12SKag= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88 h1:bcAj8KroPf552TScjFPIakjH2/tdIrIH8F+cc4v4SRo= diff --git a/p2p/consensus_dialed.go b/p2p/consensus_dialed.go index 459a87882f..b06b853b19 100644 --- a/p2p/consensus_dialed.go +++ b/p2p/consensus_dialed.go @@ -1,9 +1,6 @@ package p2p import ( - "fmt" - "strings" - "github.com/PlatONnetwork/PlatON-Go/log" "github.com/PlatONnetwork/PlatON-Go/p2p/enode" ) @@ -68,8 +65,6 @@ func (tasks *dialedTasks) RemoveTask(NodeID enode.ID) error { } func (tasks *dialedTasks) ListTask() []*dialTask { - - log.Info("[after list]Consensus dialed task list after ListTask operation", "task queue", tasks.description()) return tasks.queue } @@ -138,10 +133,10 @@ func (tasks *dialedTasks) isEmpty() bool { return false } -func (tasks *dialedTasks) description() string { +func (tasks *dialedTasks) description() []string { var description []string for _, t := range tasks.queue { - description = append(description, fmt.Sprintf("%x", t.dest.ID().TerminalString())) + description = append(description, t.dest.ID().TerminalString()) } - return strings.Join(description, ",") + return description } diff --git a/p2p/dial.go b/p2p/dial.go index 5ee758b51f..bccbbf4b10 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -342,7 +342,7 @@ loop: } } case node := <-d.addconsensus: - log.Warn("dial adding consensus node", "node", node) + log.Warn("dial adding consensus node", "node", node.ID()) d.consensusPool.AddTask(newDialTask(node, dynDialedConn|consensusDialedConn)) case node := <-d.removeconsensus: d.consensusPool.RemoveTask(node.ID()) @@ -484,6 +484,11 @@ func (d *dialScheduler) startConsensusDials(n int) (started int) { n = 3 } } + if n <= 0 { + return + } + + log.Debug("startConsensusDials", "maxConsensusPeers", d.MaxConsensusPeers, "consensusPeers", d.consensusPeers, "n", n, "task queue", d.consensusPool.description()) // Create dials for consensus nodes if they are not connected. for _, t := range d.consensusPool.ListTask() { diff --git a/p2p/discover/v4wire/v4wire_test.go b/p2p/discover/v4wire/v4wire_test.go index 6ec57903e8..bc1780b4f7 100644 --- a/p2p/discover/v4wire/v4wire_test.go +++ b/p2p/discover/v4wire/v4wire_test.go @@ -17,7 +17,6 @@ package v4wire import ( - "bytes" "encoding/hex" "net" "reflect" @@ -35,7 +34,7 @@ var testPackets = []struct { wantPacket Packet }{ /*{ - input: "96e7a55f265b738379447058e63d7bad33b05b1e3e93d9537f05be9975090bd72dbe25ab1d57c3de65e0b089f6b884c2273d29b343211483557f7c46f3df0b2650779f3459fabbf34261f113b4f29f6d5f540ea7d3fe5e35b24264517f8529110001ea04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355", + input: "86b1419e9db40e9b03fcebf50a247e8b0b0cb4f6000e8de56c33336fb54c01fff20d1818b4e7bca98129b72f3551c507705addc662a7eefa0000a51c9a39d691577bfb0b4320b6421cbafda7001dc033776ffad4a330ca74027ef38fb84c00990101eb04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355c0", wantPacket: &Ping{ Version: 4, From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544}, @@ -44,14 +43,15 @@ var testPackets = []struct { }, },*/ { - input: "b045fd6e610bfe8e51393adb7aa058d60259a744cbcbceed6078b006857a7e881e32b9760ea0fd6c07fd9fbd9e73cda03c5ffa5d58c5b1cce8fc309ad2a702fc3f5a496f0e008bba521b77d5486151d391a76690d095b27adfb6a3e77d0193400101eb04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a35503", + input: "54fa00ece4514b03f0ce6380e31c6f882b644ddb3a1c8ec6fb511dd5c8b0d9853c0ec90ddf84859ae4bedef069a12db12ca895fcb8358483bc9c4935b7f7c5ef4e49d6242a82520456c1a788864eb1c510074a928b5151fcab397220ae8855090101ee04cb847f000001820cfa8215a8d790000000000000000000000000000000018208ae820d058443b9a355c1030102", wantPacket: &Ping{ Version: 4, From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544}, To: Endpoint{net.ParseIP("::1"), 2222, 3333}, Expiration: 1136239445, ForkID: []rlp.RawValue{{0x03}}, - //ENRSeq: 1, + ENRSeq: 1, + Rest: []rlp.RawValue{{0x02}}, }, }, { @@ -121,44 +121,6 @@ func TestForwardCompatibility(t *testing.T) { } } -func TestPingEncode(t *testing.T) { - packet, err := rlp.EncodeToBytes(&Ping{ - Version: 4, - From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544}, - To: Endpoint{net.ParseIP("::1"), 2222, 3333}, - Expiration: 1136239445, - ForkID: []rlp.RawValue{{0x03}}, - }) - if err != nil { - t.Error(err) - } - packet2, err2 := rlp.EncodeToBytes(&Ping{ - Version: 4, - From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544}, - To: Endpoint{net.ParseIP("::1"), 2222, 3333}, - Expiration: 1136239445, - ForkID: []rlp.RawValue{{0x03}}, - ENRSeq: 1, - }) - if err2 != nil { - t.Error(err2) - } - packet3, err3 := rlp.EncodeToBytes(&PingV1{ - Version: 4, - From: Endpoint{net.ParseIP("127.0.0.1").To4(), 3322, 5544}, - To: Endpoint{net.ParseIP("::1"), 2222, 3333}, - Expiration: 1136239445, - Rest: []rlp.RawValue{{0x03}}, - }) - if err3 != nil { - t.Error(err3) - } - - if !bytes.Equal(packet, packet2) && !bytes.Equal(packet3, packet2) { - t.Error("should be same") - } -} - func hexPubkey(h string) (ret Pubkey) { b, err := hex.DecodeString(h) if err != nil { diff --git a/p2p/server.go b/p2p/server.go index b140895696..7903c04efb 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -683,7 +683,7 @@ func (srv *Server) setupDialScheduler() { srv.doPeerOp(func(peers map[enode.ID]*Peer) { srv.dialsched.removeConsensusFromQueue(node) if p, ok := peers[node.ID()]; ok { - p.Disconnect(DiscRequested) + p.rw.set(consensusDialedConn, false) } }) } @@ -778,7 +778,7 @@ running: case n := <-srv.addconsensus: // This channel is used by AddConsensusNode to add an enode // to the consensus node set. - srv.log.Trace("Adding consensus node", "node", n) + srv.log.Trace("Adding consensus node", "node", n.ID()) id := n.ID() if srv.localnode.ID() == id { srv.log.Debug("We are become an consensus node") @@ -795,9 +795,8 @@ running: case n := <-srv.removeconsensus: // This channel is used by RemoveConsensusNode to remove an enode // from the consensus node set. - srv.log.Trace("Removing consensus node", "node", n) - id := n.ID() - if srv.localnode.ID() == id { + srv.log.Trace("Removing consensus node", "node", n.ID()) + if srv.localnode.ID() == n.ID() { srv.log.Debug("We are not an consensus node") srv.consensus = false } @@ -817,7 +816,7 @@ running: case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add a node // to the trusted node set. - srv.log.Trace("Adding trusted node", "node", n) + srv.log.Trace("Adding trusted node", "node", n.ID()) trusted[n.ID()] = true if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, true) @@ -826,7 +825,7 @@ running: case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove a node // from the trusted node set. - srv.log.Trace("Removing trusted node", "node", n) + srv.log.Trace("Removing trusted node", "node", n.ID()) delete(trusted, n.ID()) if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) @@ -1276,12 +1275,18 @@ func (srv *Server) watching() { switch data := ev.Data.(type) { case cbfttypes.AddValidatorEvent: - srv.log.Trace("Received AddValidatorEvent", "nodeID", data.Node.ID().String()) - srv.AddConsensusPeer(data.Node) + srv.log.Trace("Received AddValidatorEvent", "num", len(data.Nodes)) + for _, node := range data.Nodes { + srv.AddConsensusPeer(node) + } + case cbfttypes.RemoveValidatorEvent: - srv.log.Trace("Received RemoveValidatorEvent", "nodeID", data.Node.ID().String()) - srv.RemoveConsensusPeer(data.Node) + srv.log.Trace("Received RemoveValidatorEvent", "num", len(data.Nodes)) + for _, node := range data.Nodes { + srv.RemoveConsensusPeer(node) + } case cbfttypes.UpdateValidatorEvent: + consensusPeer := 0 if _, ok := data.Nodes[srv.localnode.ID()]; ok { srv.doPeerOp(func(peers map[enode.ID]*Peer) { for id, peer := range peers { @@ -1293,7 +1298,7 @@ func (srv *Server) watching() { } srv.log.Debug("We are become an consensus node") srv.consensus = true - srv.dialsched.updateConsensusNun(srv.numConsensusPeer(peers)) + consensusPeer = srv.numConsensusPeer(peers) }) } else { srv.doPeerOp(func(peers map[enode.ID]*Peer) { @@ -1302,9 +1307,9 @@ func (srv *Server) watching() { } srv.log.Debug("We are not an consensus node") srv.consensus = false - srv.dialsched.updateConsensusNun(0) }) } + srv.dialsched.updateConsensusNun(consensusPeer) default: srv.log.Error("Received unexcepted event") } diff --git a/x/plugin/staking_plugin.go b/x/plugin/staking_plugin.go index 3b22286584..457899f69f 100644 --- a/x/plugin/staking_plugin.go +++ b/x/plugin/staking_plugin.go @@ -390,14 +390,16 @@ func (sk *StakingPlugin) Confirmed(nodeId enode.IDv0, block *types.Block) error } func (sk *StakingPlugin) addConsensusNode(nodes staking.ValidatorQueue) { + adds := make([]*enode.Node, 0) for _, node := range nodes { pub, err := node.NodeId.Pubkey() if err != nil { panic(err) } - if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Node: enode.NewV4(pub, nil, 0, 0)}); nil != err { - log.Error("post AddValidatorEvent failed", "nodeId", node.NodeId.TerminalString(), "err", err) - } + adds = append(adds, enode.NewV4(pub, nil, 0, 0)) + } + if err := sk.eventMux.Post(cbfttypes.AddValidatorEvent{Nodes: adds}); nil != err { + log.Error("post AddValidatorEvent failed", "num", len(adds), "err", err) } }