Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: capping mechanism for relay and service connections #3184

Merged
merged 20 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,12 @@ proc initAndStartApp(
nodeBuilder.withNodeKey(key)
nodeBuilder.withRecord(record)
nodeBUilder.withSwitchConfiguration(maxConnections = some(MaxConnectedPeers))
nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true)

nodeBuilder.withPeerManagerConfig(
maxConnections = MaxConnectedPeers,
relayServiceRatio = "13.33:86.67",
shardAware = true,
)
let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
if res.isErr():
return err("node building error" & $res.error)
Expand Down
57 changes: 30 additions & 27 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,8 @@ procSuite "Peer Manager":

test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections
let peerStoreSize = 5
let maxConnections = 10
let peerStoreSize = 20
let maxConnections = 25

expect(Defect):
let pm = PeerManager.new(
Expand All @@ -962,54 +962,61 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = some(5),
storage = nil,
)

# Create 15 peers and add them to the peerstore
let peers = toSeq(1 .. 15)
# Create 30 peers and add them to the peerstore
let peers = toSeq(1 .. 30)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
.filterIt(it.isOk())
.mapIt(it.value)
for p in peers:
pm.addPeer(p)

# Check that we have 15 peers in the peerstore
# Check that we have 30 peers in the peerstore
check:
pm.wakuPeerStore.peers.len == 15
pm.wakuPeerStore.peers.len == 30

# fake that some peers failed to connected
pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[3].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[4].peerId] = 2

# fake that some peers are connected
pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[10].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[12].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[15].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[18].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[24].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[29].peerId] = Connected

# Prune the peerstore (current=15, target=5)
# Prune the peerstore (current=30, target=25)
pm.prunePeerStore()

check:
# ensure peerstore was pruned
pm.wakuPeerStore.peers.len == 10
pm.wakuPeerStore.peers.len == 25

# ensure connected peers were not pruned
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[10].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[12].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[15].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[18].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[24].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[29].peerId)

# ensure peers that failed were the first to be pruned
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[3].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[4].peerId)

asyncTest "canBeConnected() returns correct value":
let pm = PeerManager.new(
Expand All @@ -1018,14 +1025,13 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
initialBackoffInSec = 1,
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
var p1: PeerId
Expand Down Expand Up @@ -1075,10 +1081,9 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil,
)
Expand All @@ -1091,11 +1096,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)

Expand All @@ -1105,11 +1109,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = some(5),
storage = nil,
)

Expand Down
65 changes: 34 additions & 31 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,49 +106,52 @@ suite "WakuNode":

await allFutures([node1.stop(), node2.stop()])

asyncTest "Maximum connections can be configured":
asyncTest "Maximum connections can be configured with 20 nodes":
let
maxConnections = 2
maxConnections = 20
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(
nodeKey1,
parseIpAddress("0.0.0.0"),
parseIpAddress("127.0.0.1"),
Port(60010),
maxConnections = maxConnections,
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(60012))
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(60013))

check:
# Sanity check, to verify config was applied
node1.switch.connManager.inSema.size == maxConnections

# Node with connection limit set to 1
# Initialize and start node1
await node1.start()
await node1.mountRelay()

# Remote node 1
await node2.start()
await node2.mountRelay()

# Remote node 2
await node3.start()
await node3.mountRelay()

discard
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(3.seconds)
discard
await node1.peerManager.connectPeer(node3.switch.peerInfo.toRemotePeerInfo())

# Create an array to hold the other nodes
var otherNodes: seq[WakuNode] = @[]

# Create and start 20 other nodes
for i in 0 ..< maxConnections + 1:
let
nodeKey = generateSecp256k1Key()
port = 60012 + i * 2 # Ensure unique ports for each node
node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port))
await node.start()
await node.mountRelay()
otherNodes.add(node)

# Connect all other nodes to node1
for node in otherNodes:
discard
await node1.peerManager.connectPeer(node.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(2.seconds) # Small delay to avoid hammering the connection process

# Check that the number of connections matches the maxConnections
check:
# Verify that only the first connection succeeded
node1.switch.isConnected(node2.switch.peerInfo.peerId)
node1.switch.isConnected(node3.switch.peerInfo.peerId) == false

await allFutures([node1.stop(), node2.stop(), node3.stop()])
node1.switch.isConnected(otherNodes[0].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[8].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[14].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[20].switch.peerInfo.peerId) == false

# Stop all nodes
var stopFutures = @[node1.stop()]
for node in otherNodes:
stopFutures.add(node.stop())
await allFutures(stopFutures)

asyncTest "Messages fails with wrong key path":
let nodeKey1 = generateSecp256k1Key()
Expand Down
1 change: 1 addition & 0 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
relayServiceRatio: "60:40",
maxMessageSize: "1024 KiB",
clusterId: DefaultClusterId,
shards: @[DefaultShardId],
Expand Down
27 changes: 26 additions & 1 deletion waku/common/utils/parse_size_units.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import std/strutils, results, regex
import std/[strutils, math], results, regex

proc parseMsgSize*(input: string): Result[uint64, string] =
## Parses size strings such as "1.2 KiB" or "3Kb" and returns the equivalent number of bytes
Expand Down Expand Up @@ -49,3 +49,28 @@ proc parseCorrectMsgSize*(input: string): uint64 =
let ret = parseMsgSize(input).valueOr:
return 0
return ret

proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] =
## Parses a relay/service ratio string to [ float, float ]. The total should sum 100%
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] =
proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] {.raises: [ValueError].}=

Let's not hide the exception, so compiler can help using the function properly.

## e.g., (0.4, 0.6) == parseRelayServiceRatio("40:60")
try:
let elements = ratio.split(":")
if elements.len != 2:
raise newException(ValueError, "expected format 'X:Y', ratio = " & ratio)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry that I didn't mention before. Would it be possible to avoid raising an exception plz? Applies elsewhere. We already have the Resullt returning type to tackle the err conditions, unless I'm missing something :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

let
relayRatio = parseFloat(elements[0])
serviceRatio = parseFloat(elements[1])

if relayRatio < 0 or serviceRatio < 0:
raise newException(
ValueError, "Relay service ratio must be non-negative, ratio = " & ratio
)

let total = relayRatio + serviceRatio
if int(total) != 100:
Ivansete-status marked this conversation as resolved.
Show resolved Hide resolved
raise newException(ValueError, "Total ratio should be 100, total = " & $total)

return ok((relayRatio / 100.0, serviceRatio / 100.0))
except ValueError as e:
raise newException(ValueError, "Invalid Format Error : " & e.msg)
28 changes: 21 additions & 7 deletions waku/factory/builder.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import
std/[options, net],
std/[options, net, math],
results,
chronicles,
libp2p/crypto/crypto,
Expand All @@ -15,7 +15,8 @@ import
../discovery/waku_discv5,
../waku_node,
../node/peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../common/utils/parse_size_units

type
WakuNodeBuilder* = object # General
Expand All @@ -29,7 +30,8 @@ type
peerStorageCapacity: Option[int]

# Peer manager config
maxRelayPeers: Option[int]
maxRelayPeers: int
maxServicePeers: int
colocationLimit: int
shardAware: bool

Expand Down Expand Up @@ -108,10 +110,21 @@ proc withPeerStorage*(
builder.peerStorageCapacity = capacity

proc withPeerManagerConfig*(
builder: var WakuNodeBuilder, maxRelayPeers = none(int), shardAware = false
builder: var WakuNodeBuilder,
maxConnections: int,
relayServiceRatio: string,
shardAware = false,
) =
builder.maxRelayPeers = maxRelayPeers
builder.shardAware = shardAware
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not needed to handle exception now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ! we already handle in function.

let (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get()
var relayPeers = int(ceil(float(maxConnections) * relayRatio))
var servicePeers = int(floor(float(maxConnections) * serviceRatio))

builder.maxServicePeers = servicePeers
builder.maxRelayPeers = relayPeers
builder.shardAware = shardAware
except ValueError as e:
error "Error : ", error = e.msg

proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
builder.colocationLimit = colocationLimit
Expand Down Expand Up @@ -190,7 +203,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = builder.maxRelayPeers,
maxRelayPeers = some(builder.maxRelayPeers),
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
)
Expand Down
19 changes: 14 additions & 5 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,20 @@ type WakuNodeConf* = object
desc: "Maximum allowed number of libp2p connections.",
defaultValue: 50,
name: "max-connections"
.}: uint16
.}: int

maxRelayPeers* {.
desc:
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While checking changes it seems to me the deprecated means here we will not use it at all.
In other words if one not moves right away to the new relayServiceRatio setting, regardless of maxRelayPeers default (60:40) will be applied without notice.
Maybe I'm missing something here, but if not, then it sounds like a breaking change anyway.
Cannot we balance somehow in between until we lead out this option completely?
Also maybe it is ok like this, I cannot decide.
But I can imagine a solution that relayServiceRatio has no default so we can decide the configurator intent.
If none of the two settings are set we apply 60:40 but when maxRelayPeers is set we derive from it. WDYT?

Copy link
Contributor Author

@darshankabariya darshankabariya Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your insight. We've decided to adopt relayServiceRatio without providing backward compatibility for maxRelayPeers. We will log a deprecation notice to inform users of this change. This decision simplifies our configuration and aligns with our long-term support strategy. Given that the previous configuration was incorrect and complicated, we are directly transitioning to a clearer approach. If users continue to use maxRelayPeers, we will notify them of its deprecation. @Ivansete-status right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I buy in to this idea. All I wanted to emphasize that do nothide the unintended/old usage of maxRelayPeers behind a log warning and a --help or docs.
Maybe it is better to stop node early and notice user about bad config, IDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great idea! Users typically don’t check logs unless there’s an error. will stop after detect wrong config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While checking changes it seems to me the deprecated means here we will not use it at all. In other words if one not moves right away to the new relayServiceRatio setting, regardless of maxRelayPeers default (60:40) will be applied without notice. Maybe I'm missing something here, but if not, then it sounds like a breaking change anyway. Cannot we balance somehow in between until we lead out this option completely? Also maybe it is ok like this, I cannot decide. But I can imagine a solution that relayServiceRatio has no default so we can decide the configurator intent. If none of the two settings are set we apply 60:40 but when maxRelayPeers is set we derive from it. WDYT?

Now I understand your concern. I’ve made it backward compatible with maxRelayPeers, ensuring that users who aren’t aware of the deprecation are covered as well. At that time, I didn’t fully grasp your point.

name: "max-relay-peers"
.}: Option[int]

relayServiceRatio* {.
desc:
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
name: "relay-service-ratio",
defaultValue: "60:40" # 60:40 ratio of relay to service peers
.}: string

colocationLimit* {.
desc:
Expand All @@ -206,10 +219,6 @@ type WakuNodeConf* = object
name: "ip-colocation-limit"
.}: int

maxRelayPeers* {.
desc: "Maximum allowed number of relay peers.", name: "max-relay-peers"
.}: Option[int]

Comment on lines -209 to -212
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot directly remove this optional parameter. Instead, we should inform that we are deprecating it and it will be completely removed after two releases. Therefore, we now just need to "tag" it as deprecated:

maxRelayPeers* {.
  desc: "Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.", name: "max-relay-peers"
.}: Option[int]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good approch ! will revert this delete.

peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore.", name: "peer-store-capacity"
.}: Option[int]
Expand Down
9 changes: 8 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,15 @@ proc initNode(
agentString = some(conf.agentString),
)
builder.withColocationLimit(conf.colocationLimit)

# Backword compatibility for maxRelayPeers
if conf.maxRelayPeers.isSome():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition is true, then we need to infer the relayServiceRation based on the given conf.maxConnections and conf.maxRelayPeers. And we need to keep this temporarily within two consecutive versions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review and patience. It’s now backward compatible and parse function without raising exceptions.

error "maxRelayPeers is deprecated, using relayServiceRatio instead ( recommendation ), if relayServiceRatio is not set, by default it will be 60:40, maxRelayPeers and maxServicePeer is calculated accordingly"

builder.withPeerManagerConfig(
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
maxConnections = conf.maxConnections,
relayServiceRatio = conf.relayServiceRatio,
shardAware = conf.relayShardedPeerManagement,
)
builder.withRateLimit(conf.rateLimits)
builder.withCircuitRelay(relay)
Expand Down
Loading
Loading