From b474a5aa7c4efc544ecd2b7e1893d8e08be9084f Mon Sep 17 00:00:00 2001 From: clearly <910372762@qq.com> Date: Wed, 8 May 2024 17:05:26 +0800 Subject: [PATCH 1/2] fix --- eth/backend.go | 2 ++ p2p/consensus_dialed.go | 4 ++-- p2p/dial.go | 11 +++++++++++ p2p/server.go | 11 +++++++++++ 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index a218757e76..1e4effe384 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -648,6 +648,8 @@ func (s *Ethereum) Start() error { func (s *Ethereum) Stop() error { s.ethDialCandidates.Close() s.snapDialCandidates.Close() + s.p2pServer.CloseConsensusDial() + s.p2pServer.CloseDiscovery() s.handler.Stop() // Then stop everything else. diff --git a/p2p/consensus_dialed.go b/p2p/consensus_dialed.go index ad7ac4a7a2..4b88d2dc07 100644 --- a/p2p/consensus_dialed.go +++ b/p2p/consensus_dialed.go @@ -111,7 +111,7 @@ func (tasks *dialedTasks) size() int { } // clear queue -/*func (tasks *dialedTasks) clear() bool { +func (tasks *dialedTasks) clear() bool { if tasks.isEmpty() { log.Info("queue is empty!") return false @@ -121,7 +121,7 @@ func (tasks *dialedTasks) size() int { } tasks.queue = nil return true -}*/ +} // whether the queue is empty func (tasks *dialedTasks) isEmpty() bool { diff --git a/p2p/dial.go b/p2p/dial.go index bccbbf4b10..32a498c2f3 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -108,6 +108,7 @@ type dialScheduler struct { addconsensus chan *enode.Node removeconsensus chan *enode.Node + clearConsensus chan struct{} // Everything below here belongs to loop and // should only be accessed by code on the loop goroutine. @@ -185,6 +186,7 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF remPeerCh: make(chan *conn), addconsensus: make(chan *enode.Node), removeconsensus: make(chan *enode.Node), + clearConsensus: make(chan struct{}), updateConsensusPeersCh: make(chan int), consensusPool: NewDialedTasks(config.MaxConsensusPeers*2, nil), } @@ -239,6 +241,13 @@ func (d *dialScheduler) removeConsensus(n *enode.Node) { } } +func (d *dialScheduler) closeConsensusDial() { + select { + case d.clearConsensus <- struct{}{}: + case <-d.ctx.Done(): + } +} + func (d *dialScheduler) removeConsensusFromQueue(n *enode.Node) { d.history.remove(string(n.ID().Bytes())) } @@ -346,6 +355,8 @@ loop: d.consensusPool.AddTask(newDialTask(node, dynDialedConn|consensusDialedConn)) case node := <-d.removeconsensus: d.consensusPool.RemoveTask(node.ID()) + case <-d.clearConsensus: + d.consensusPool.clear() case num := <-d.updateConsensusPeersCh: d.log.Debug("update added consensus peers num", "num", num) d.consensusPeers = num diff --git a/p2p/server.go b/p2p/server.go index 79cf364571..35343393e6 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -399,6 +399,11 @@ func (srv *Server) RemoveConsensusPeer(node *enode.Node) { } } +func (srv *Server) CloseConsensusDial() { + srv.dialsched.closeConsensusDial() + srv.log.Info("Close consensus Dial") +} + // AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. func (srv *Server) AddTrustedPeer(node *enode.Node) { @@ -1267,6 +1272,12 @@ func (srv *Server) PeersInfo() []*PeerInfo { return infos } +func (srv *Server) CloseDiscovery() { + srv.ntab.Close() + srv.discmix.Close() + srv.log.Info("Close ntab and discmix") +} + func (srv *Server) StartWatching(eventMux *event.TypeMux) { srv.eventMux = eventMux go srv.watching() From 4111119ff9060195f230eb1ad5b28248aabeed33 Mon Sep 17 00:00:00 2001 From: clearly <910372762@qq.com> Date: Wed, 8 May 2024 17:10:05 +0800 Subject: [PATCH 2/2] fix --- eth/backend.go | 1 - p2p/server.go | 6 ------ 2 files changed, 7 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 1e4effe384..e3c598be0b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -649,7 +649,6 @@ func (s *Ethereum) Stop() error { s.ethDialCandidates.Close() s.snapDialCandidates.Close() s.p2pServer.CloseConsensusDial() - s.p2pServer.CloseDiscovery() s.handler.Stop() // Then stop everything else. diff --git a/p2p/server.go b/p2p/server.go index 35343393e6..1ce06becf9 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1272,12 +1272,6 @@ func (srv *Server) PeersInfo() []*PeerInfo { return infos } -func (srv *Server) CloseDiscovery() { - srv.ntab.Close() - srv.discmix.Close() - srv.log.Info("Close ntab and discmix") -} - func (srv *Server) StartWatching(eventMux *event.TypeMux) { srv.eventMux = eventMux go srv.watching()