diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index ed415cdaa1..515bdae636 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -463,7 +463,12 @@ proc initAndStartApp( nodeBuilder.withNodeKey(key) nodeBuilder.withRecord(record) nodeBUilder.withSwitchConfiguration(maxConnections = some(MaxConnectedPeers)) - nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true) + + nodeBuilder.withPeerManagerConfig( + maxConnections = MaxConnectedPeers, + relayServiceRatio = "13.33:86.67", + shardAware = true, + ) let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort) if res.isErr(): return err("node building error" & $res.error) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 8e006848df..333433dc23 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -938,8 +938,8 @@ procSuite "Peer Manager": test "peer manager cant have more max connections than peerstore size": # Peerstore size can't be smaller than max connections - let peerStoreSize = 5 - let maxConnections = 10 + let peerStoreSize = 20 + let maxConnections = 25 expect(Defect): let pm = PeerManager.new( @@ -962,54 +962,61 @@ procSuite "Peer Manager": .withRng(rng) .withMplex() .withNoise() - .withPeerStore(10) - .withMaxConnections(5) + .withPeerStore(25) + .withMaxConnections(20) .build(), maxFailedAttempts = 1, - maxRelayPeers = some(5), storage = nil, ) - # Create 15 peers and add them to the peerstore - let peers = toSeq(1 .. 15) + # Create 30 peers and add them to the peerstore + let peers = toSeq(1 .. 30) .mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get())) .filterIt(it.isOk()) .mapIt(it.value) for p in peers: pm.addPeer(p) - # Check that we have 15 peers in the peerstore + # Check that we have 30 peers in the peerstore check: - pm.wakuPeerStore.peers.len == 15 + pm.wakuPeerStore.peers.len == 30 # fake that some peers failed to connected pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2 pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2 pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2 + pm.wakuPeerStore[NumberFailedConnBook][peers[3].peerId] = 2 + pm.wakuPeerStore[NumberFailedConnBook][peers[4].peerId] = 2 # fake that some peers are connected pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[10].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[12].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[15].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[18].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[24].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[29].peerId] = Connected - # Prune the peerstore (current=15, target=5) + # Prune the peerstore (current=30, target=25) pm.prunePeerStore() check: # ensure peerstore was pruned - pm.wakuPeerStore.peers.len == 10 + pm.wakuPeerStore.peers.len == 25 # ensure connected peers were not pruned pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId) pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[10].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[12].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[15].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[18].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[24].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[29].peerId) # ensure peers that failed were the first to be pruned not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId) not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId) not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId) + not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[3].peerId) + not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[4].peerId) asyncTest "canBeConnected() returns correct value": let pm = PeerManager.new( @@ -1018,14 +1025,13 @@ procSuite "Peer Manager": .withRng(rng) .withMplex() .withNoise() - .withPeerStore(10) - .withMaxConnections(5) + .withPeerStore(25) + .withMaxConnections(20) .build(), initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs. backoffFactor = 2, maxFailedAttempts = 10, - maxRelayPeers = some(5), storage = nil, ) var p1: PeerId @@ -1075,10 +1081,9 @@ procSuite "Peer Manager": .withRng(rng) .withMplex() .withNoise() - .withPeerStore(10) - .withMaxConnections(5) + .withPeerStore(25) + .withMaxConnections(20) .build(), - maxRelayPeers = some(5), maxFailedAttempts = 150, storage = nil, ) @@ -1091,11 +1096,10 @@ procSuite "Peer Manager": .withRng(rng) .withMplex() .withNoise() - .withPeerStore(10) - .withMaxConnections(5) + .withPeerStore(25) + .withMaxConnections(20) .build(), maxFailedAttempts = 10, - maxRelayPeers = some(5), storage = nil, ) @@ -1105,11 +1109,10 @@ procSuite "Peer Manager": .withRng(rng) .withMplex() .withNoise() - .withPeerStore(10) - .withMaxConnections(5) + .withPeerStore(25) + .withMaxConnections(20) .build(), maxFailedAttempts = 5, - maxRelayPeers = some(5), storage = nil, ) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index b148684fe0..2213b7f8ee 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -106,49 +106,52 @@ suite "WakuNode": await allFutures([node1.stop(), node2.stop()]) - asyncTest "Maximum connections can be configured": + asyncTest "Maximum connections can be configured with 20 nodes": let - maxConnections = 2 + maxConnections = 20 nodeKey1 = generateSecp256k1Key() node1 = newTestWakuNode( nodeKey1, - parseIpAddress("0.0.0.0"), + parseIpAddress("127.0.0.1"), Port(60010), maxConnections = maxConnections, ) - nodeKey2 = generateSecp256k1Key() - node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(60012)) - nodeKey3 = generateSecp256k1Key() - node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(60013)) - check: - # Sanity check, to verify config was applied - node1.switch.connManager.inSema.size == maxConnections - - # Node with connection limit set to 1 + # Initialize and start node1 await node1.start() await node1.mountRelay() - # Remote node 1 - await node2.start() - await node2.mountRelay() - - # Remote node 2 - await node3.start() - await node3.mountRelay() - - discard - await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) - await sleepAsync(3.seconds) - discard - await node1.peerManager.connectPeer(node3.switch.peerInfo.toRemotePeerInfo()) - + # Create an array to hold the other nodes + var otherNodes: seq[WakuNode] = @[] + + # Create and start 20 other nodes + for i in 0 ..< maxConnections + 1: + let + nodeKey = generateSecp256k1Key() + port = 60012 + i * 2 # Ensure unique ports for each node + node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port)) + await node.start() + await node.mountRelay() + otherNodes.add(node) + + # Connect all other nodes to node1 + for node in otherNodes: + discard + await node1.peerManager.connectPeer(node.switch.peerInfo.toRemotePeerInfo()) + await sleepAsync(2.seconds) # Small delay to avoid hammering the connection process + + # Check that the number of connections matches the maxConnections check: - # Verify that only the first connection succeeded - node1.switch.isConnected(node2.switch.peerInfo.peerId) - node1.switch.isConnected(node3.switch.peerInfo.peerId) == false - - await allFutures([node1.stop(), node2.stop(), node3.stop()]) + node1.switch.isConnected(otherNodes[0].switch.peerInfo.peerId) + node1.switch.isConnected(otherNodes[8].switch.peerInfo.peerId) + node1.switch.isConnected(otherNodes[14].switch.peerInfo.peerId) + node1.switch.isConnected(otherNodes[20].switch.peerInfo.peerId) == false + + # Stop all nodes + var stopFutures = @[node1.stop()] + for node in otherNodes: + stopFutures.add(node.stop()) + await allFutures(stopFutures) asyncTest "Messages fails with wrong key path": let nodeKey1 = generateSecp256k1Key() diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 8cacf5317c..1c9b8ec831 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -36,6 +36,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], nat: "any", maxConnections: 50, + relayServiceRatio: "60:40", maxMessageSize: "1024 KiB", clusterId: DefaultClusterId, shards: @[DefaultShardId], diff --git a/waku/common/utils/parse_size_units.nim b/waku/common/utils/parse_size_units.nim index df2f5dac4e..14f41a9334 100644 --- a/waku/common/utils/parse_size_units.nim +++ b/waku/common/utils/parse_size_units.nim @@ -1,4 +1,4 @@ -import std/strutils, results, regex +import std/[strutils, math], results, regex proc parseMsgSize*(input: string): Result[uint64, string] = ## Parses size strings such as "1.2 KiB" or "3Kb" and returns the equivalent number of bytes @@ -49,3 +49,26 @@ proc parseCorrectMsgSize*(input: string): uint64 = let ret = parseMsgSize(input).valueOr: return 0 return ret + +proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] = + ## Parses a relay/service ratio string to [ float, float ]. The total should sum 100% + ## e.g., (0.4, 0.6) == parseRelayServiceRatio("40:60") + let elements = ratio.split(":") + if elements.len != 2: + return err("expected format 'X:Y', ratio = " & ratio) + + var relayRatio, serviceRatio: float + try: + relayRatio = parseFloat(elements[0]) + serviceRatio = parseFloat(elements[1]) + except ValueError: + return err("failed to parse ratio numbers: " & ratio) + + if relayRatio < 0 or serviceRatio < 0: + return err("relay service ratio must be non-negative, ratio = " & ratio) + + let total = relayRatio + serviceRatio + if int(total) != 100: + return err("total ratio should be 100, total = " & $total) + + ok((relayRatio / 100.0, serviceRatio / 100.0)) diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index abd347b848..e896ecdbb5 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[options, net], + std/[options, net, math], results, chronicles, libp2p/crypto/crypto, @@ -15,7 +15,8 @@ import ../discovery/waku_discv5, ../waku_node, ../node/peer_manager, - ../common/rate_limit/setting + ../common/rate_limit/setting, + ../common/utils/parse_size_units type WakuNodeBuilder* = object # General @@ -29,7 +30,8 @@ type peerStorageCapacity: Option[int] # Peer manager config - maxRelayPeers: Option[int] + maxRelayPeers: int + maxServicePeers: int colocationLimit: int shardAware: bool @@ -108,9 +110,17 @@ proc withPeerStorage*( builder.peerStorageCapacity = capacity proc withPeerManagerConfig*( - builder: var WakuNodeBuilder, maxRelayPeers = none(int), shardAware = false + builder: var WakuNodeBuilder, + maxConnections: int, + relayServiceRatio: string, + shardAware = false, ) = - builder.maxRelayPeers = maxRelayPeers + let (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get() + var relayPeers = int(ceil(float(maxConnections) * relayRatio)) + var servicePeers = int(floor(float(maxConnections) * serviceRatio)) + + builder.maxServicePeers = servicePeers + builder.maxRelayPeers = relayPeers builder.shardAware = shardAware proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) = @@ -190,7 +200,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = let peerManager = PeerManager.new( switch = switch, storage = builder.peerStorage.get(nil), - maxRelayPeers = builder.maxRelayPeers, + maxRelayPeers = some(builder.maxRelayPeers), + maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, ) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index fc45389f51..3ba24d54d8 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -197,7 +197,20 @@ type WakuNodeConf* = object desc: "Maximum allowed number of libp2p connections.", defaultValue: 50, name: "max-connections" - .}: uint16 + .}: int + + maxRelayPeers* {. + desc: + "Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.", + name: "max-relay-peers" + .}: Option[int] + + relayServiceRatio* {. + desc: + "This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)", + name: "relay-service-ratio", + defaultValue: "60:40" # 60:40 ratio of relay to service peers + .}: string colocationLimit* {. desc: @@ -206,10 +219,6 @@ type WakuNodeConf* = object name: "ip-colocation-limit" .}: int - maxRelayPeers* {. - desc: "Maximum allowed number of relay peers.", name: "max-relay-peers" - .}: Option[int] - peerStoreCapacity* {. desc: "Maximum stored peers in the peerstore.", name: "peer-store-capacity" .}: Option[int] diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d427a3d76b..33e84f8dee 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -102,9 +102,27 @@ proc initNode( agentString = some(conf.agentString), ) builder.withColocationLimit(conf.colocationLimit) - builder.withPeerManagerConfig( - maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement - ) + + if conf.maxRelayPeers.isSome(): + let + maxRelayPeers = conf.maxRelayPeers.get() + maxConnections = conf.maxConnections + # Calculate the ratio as percentages + relayRatio = (maxRelayPeers.float / maxConnections.float) * 100 + serviceRatio = 100 - relayRatio + + builder.withPeerManagerConfig( + maxConnections = conf.maxConnections, + relayServiceRatio = $relayRatio & ":" & $serviceRatio, + shardAware = conf.relayShardedPeerManagement, + ) + error "maxRelayPeers is deprecated. It is recommended to use relayServiceRatio instead. If relayServiceRatio is not set, it will be automatically calculated based on maxConnections and maxRelayPeers." + else: + builder.withPeerManagerConfig( + maxConnections = conf.maxConnections, + relayServiceRatio = conf.relayServiceRatio, + shardAware = conf.relayShardedPeerManagement, + ) builder.withRateLimit(conf.rateLimits) builder.withCircuitRelay(relay) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 4f7d454482..70fc9f5649 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -79,7 +79,7 @@ proc logConfig(conf: WakuNodeConf) = lightpush = conf.lightpush, peerExchange = conf.peerExchange - info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers + info "Configuration. Network", cluster = conf.clusterId for shard in conf.shards: info "Configuration. Shards", shard = shard diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 385da471f1..171b4c6fde 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[options, sets, sequtils, times, strutils, math, random], + std/[options, sets, sequtils, times, strformat, strutils, math, random], chronos, chronicles, metrics, @@ -13,8 +13,10 @@ import import ../../common/nimchronos, ../../common/enr, + ../../common/utils/parse_size_units, ../../waku_core, ../../waku_relay, + ../../waku_relay/protocol, ../../waku_enr/sharding, ../../waku_enr/capabilities, ../../waku_metadata, @@ -84,7 +86,9 @@ type PeerManager* = ref object of RootObj maxFailedAttempts*: int storage*: PeerStorage serviceSlots*: Table[string, RemotePeerInfo] + relayServiceRatio*: string maxRelayPeers*: int + maxServicePeers*: int outRelayPeersTarget: int inRelayPeersTarget: int ipTable*: Table[string, seq[PeerId]] @@ -265,6 +269,12 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str warn "Can't add relay peer to service peers slots" return + # Check if the number of service peers has reached the maximum limit + if pm.serviceSlots.len >= pm.maxServicePeers: + warn "Maximum number of service peers reached. Cannot add more.", + peerId = remotePeerInfo.peerId, service = proto + return + info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]]) @@ -970,6 +980,8 @@ proc new*( switch: Switch, wakuMetadata: WakuMetadata = nil, maxRelayPeers: Option[int] = none(int), + maxServicePeers: Option[int] = none(int), + relayServiceRatio: string = "60:40", storage: PeerStorage = nil, initialBackoffInSec = InitialBackoffInSec, backoffFactor = BackoffFactor, @@ -986,23 +998,26 @@ proc new*( Defect, "Max number of connections can't be greater than PeerManager capacity" ) - var maxRelayPeersValue = 0 - if maxRelayPeers.isSome(): - if maxRelayPeers.get() > maxConnections: - error "Max number of relay peers can't be greater than the max amount of connections", - maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() - raise newException( - Defect, - "Max number of relay peers can't be greater than the max amount of connections", - ) + var relayRatio: float64 + var serviceRatio: float64 + (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get() - if maxRelayPeers.get() == maxConnections: - warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers", - maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() - maxRelayPeersValue = maxRelayPeers.get() - else: - # Leave by default 20% of connections for service peers - maxRelayPeersValue = maxConnections - (maxConnections div 5) + var relayPeers = int(ceil(float(maxConnections) * relayRatio)) + var servicePeers = int(floor(float(maxConnections) * serviceRatio)) + + let minRelayPeers = WakuRelay.getDHigh() + + if relayPeers < minRelayPeers: + let errorMsg = + fmt"""Doesn't fulfill minimum criteria for relay (which increases the chance of the node becoming isolated.) + relayPeers: {relayPeers}, should be greater or equal than minRelayPeers: {minRelayPeers} + relayServiceRatio: {relayServiceRatio} + maxConnections: {maxConnections}""" + error "Wrong relay peers config", error = errorMsg + return + + let outRelayPeersTarget = relayPeers div 3 + let inRelayPeersTarget = relayPeers - outRelayPeersTarget # attempt to calculate max backoff to prevent potential overflows or unreasonably high values let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts) @@ -1010,8 +1025,6 @@ proc new*( error "Max backoff time can't be over 1 week", maxBackoff = backoff raise newException(Defect, "Max backoff time can't be over 1 week") - let outRelayPeersTarget = maxRelayPeersValue div 3 - let pm = PeerManager( switch: switch, wakuMetadata: wakuMetadata, @@ -1019,9 +1032,10 @@ proc new*( storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, + maxRelayPeers: relayPeers, + maxServicePeers: servicePeers, outRelayPeersTarget: outRelayPeersTarget, - inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget, - maxRelayPeers: maxRelayPeersValue, + inRelayPeersTarget: inRelayPeersTarget, maxFailedAttempts: maxFailedAttempts, colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 1d79e73366..080f12edfb 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -314,6 +314,9 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = ## Observes when a message is sent/received from the GossipSub PoV procCall GossipSub(w).addObserver(observer) +proc getDHigh*(T: type WakuRelay): int = + return GossipsubParameters.dHigh + proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = ## Returns the number of peers in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object.