Skip to content

Commit

Permalink
feat: making dns discovery async (#3175)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Dec 3, 2024
1 parent b8ad6c1 commit d7d00bf
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 42 deletions.
2 changes: 1 addition & 1 deletion apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =

var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
if wakuDnsDiscovery.isOk:
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
let discoveredPeers = await wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
discoveredNodes = discoveredPeers.get()
Expand Down
25 changes: 14 additions & 11 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ proc crawlNetwork(

proc retrieveDynamicBootstrapNodes(
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
): Result[seq[RemotePeerInfo], string] =
): Future[Result[seq[RemotePeerInfo], string]] {.async.} =
## Retrieve dynamic bootstrap nodes (DNS discovery)

if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery
debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl
Expand All @@ -369,14 +371,15 @@ proc retrieveDynamicBootstrapNodes(
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain = domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
if resolved.len > 0:
return resolved[0] # Use only first answer

var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers().mapErr(
proc(e: cstring): string =
$e
)
return (await wakuDnsDiscovery.get().findPeers()).mapErr(
proc(e: cstring): string =
$e
)
else:
warn "Failed to init Waku DNS discovery"

Expand All @@ -385,11 +388,11 @@ proc retrieveDynamicBootstrapNodes(

proc getBootstrapFromDiscDns(
conf: NetworkMonitorConf
): Result[seq[enr.Record], string] =
): Future[Result[seq[enr.Record], string]] {.async.} =
try:
let dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
let dynamicBootstrapNodesRes =
retrieveDynamicBootstrapNodes(true, conf.dnsDiscoveryUrl, dnsNameServers)
await retrieveDynamicBootstrapNodes(true, conf.dnsDiscoveryUrl, dnsNameServers)
if not dynamicBootstrapNodesRes.isOk():
error("failed discovering peers from DNS")
let dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
Expand All @@ -412,7 +415,7 @@ proc getBootstrapFromDiscDns(

proc initAndStartApp(
conf: NetworkMonitorConf
): Result[(WakuNode, WakuDiscoveryV5), string] =
): Future[Result[(WakuNode, WakuDiscoveryV5), string]] {.async.} =
let bindIp =
try:
parseIpAddress("0.0.0.0")
Expand Down Expand Up @@ -472,7 +475,7 @@ proc initAndStartApp(
else:
nodeRes.get()

var discv5BootstrapEnrsRes = getBootstrapFromDiscDns(conf)
var discv5BootstrapEnrsRes = await getBootstrapFromDiscDns(conf)
if discv5BootstrapEnrsRes.isErr():
error("failed discovering peers from DNS")
var discv5BootstrapEnrs = discv5BootstrapEnrsRes.get()
Expand Down Expand Up @@ -604,7 +607,7 @@ when isMainModule:
let restClient = clientRest.get()

# start waku node
let nodeRes = initAndStartApp(conf)
let nodeRes = waitFor initAndStartApp(conf)
if nodeRes.isErr():
error "could not start node"
quit 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ proc destroyShared(self: ptr DiscoveryRequest) =

proc retrieveBootstrapNodes(
enrTreeUrl: string, ipDnsServer: string
): Result[seq[string], string] =
): Future[Result[seq[string], string]] {.async.} =
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
let discoveredPeers: seq[RemotePeerInfo] = retrieveDynamicBootstrapNodes(
true, enrTreeUrl, dnsNameServers
let discoveredPeers: seq[RemotePeerInfo] = (
await retrieveDynamicBootstrapNodes(true, enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)

Expand Down Expand Up @@ -126,7 +126,9 @@ proc process*(

return ok("discv5 stopped correctly")
of GET_BOOTSTRAP_NODES:
let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr:
let nodes = (
await retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer)
).valueOr:
error "GET_BOOTSTRAP_NODES failed", error = error
return err($error)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_dnsdisc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ suite "Waku DNS Discovery":

var wakuDnsDisc = WakuDnsDiscovery.init(location, resolver).get()

let res = wakuDnsDisc.findPeers()
let res = await wakuDnsDisc.findPeers()

check:
# We have discovered all three nodes
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-dnsdisc
10 changes: 10 additions & 0 deletions waku/discovery/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -448,5 +448,15 @@ proc updateBootstrapRecords*(
return err("wrong enr given: " & enrWithoutQuotes)

self.protocol.bootstrapRecords = newRecords
self.protocol.seedTable()

return ok()

proc updateBootstrapRecords*(
self: var WakuDiscoveryV5, updatedRecords: seq[enr.Record]
): void =
self.protocol.bootstrapRecords = updatedRecords

# If we're updating the table with nodes that already existed, it will log an error when trying
# to add a bootstrap node that was already there. That's ok.
self.protocol.seedTable()
24 changes: 13 additions & 11 deletions waku/discovery/waku_dnsdisc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459

import
std/[options, net],
std/[options, net, sequtils],
chronicles,
chronos,
metrics,
Expand Down Expand Up @@ -40,22 +40,23 @@ proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} =
debug "Empty resolver called", domain = domain
return ""

proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] =
proc findPeers*(
wdd: WakuDnsDiscovery
): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} =
## Find peers to connect to using DNS based discovery

info "Finding peers using Waku DNS discovery"

# Synchronise client tree using configured resolver
var tree: Tree
try:
tree = wdd.client.getTree(wdd.resolver)
# @TODO: this is currently a blocking operation to not violate memory safety
tree = (await syncTree(wdd.resolver, wdd.client.loc)).tryGet()
except Exception:
error "Failed to synchronise client tree"
waku_dnsdisc_errors.inc(labelValues = ["tree_sync_failure"])
return err("Node discovery failed")

let discoveredEnr = wdd.client.getNodeRecords()
let discoveredEnr = tree.getNodes().mapIt(it.record)

if discoveredEnr.len > 0:
info "Successfully discovered ENR", count = discoveredEnr.len
Expand Down Expand Up @@ -97,7 +98,7 @@ proc init*(

proc retrieveDynamicBootstrapNodes*(
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
): Result[seq[RemotePeerInfo], string] =
): Future[Result[seq[RemotePeerInfo], string]] {.async.} =
## Retrieve dynamic bootstrap nodes (DNS discovery)

if dnsDiscovery and dnsDiscoveryUrl != "":
Expand All @@ -113,14 +114,15 @@ proc retrieveDynamicBootstrapNodes*(
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain = domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
if resolved.len > 0:
return resolved[0] # Use only first answer

var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers().mapErr(
proc(e: cstring): string =
$e
)
return (await wakuDnsDiscovery.get().findPeers()).mapErr(
proc(e: cstring): string =
$e
)
else:
warn "Failed to init Waku DNS discovery"

Expand Down
68 changes: 55 additions & 13 deletions waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Waku* = ref object

wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
dnsRetryLoopHandle: Future[void]
discoveryMngr: DiscoveryManager

node*: WakuNode
Expand Down Expand Up @@ -215,17 +216,6 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
return err("Failed to generate key: " & $keyRes.error)
confCopy.nodekey = some(keyRes.get())

debug "Retrieve dynamic bootstrap nodes"
let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes(
confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
return err(
"Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error
)

var relay = newCircuitRelay(confCopy.isRelayClient)

let nodeRes = setupNode(confCopy, rng, relay)
Expand Down Expand Up @@ -255,7 +245,6 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
rng: rng,
key: confCopy.nodekey.get(),
node: node,
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
deliveryMonitor: deliveryMonitor,
)

Expand Down Expand Up @@ -351,7 +340,57 @@ proc updateWaku(waku: ptr Waku): Result[void, string] =

return ok()

proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} =
proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
while true:
await sleepAsync(30.seconds)
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
waku.conf.dnsDiscovery, waku.conf.dnsDiscoveryUrl,
waku.conf.dnsDiscoveryNameServers,
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
continue

waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()

if not waku[].wakuDiscv5.isNil():
let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get().toUri())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in dynamicBootstrapEnrs:
addBootstrapNode(enrUri, discv5BootstrapEnrs)

waku[].wakuDiscv5.updateBootstrapRecords(
waku[].wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs
)

info "Connecting to dynamic bootstrap peers"
try:
await connectToNodes(
waku[].node, waku[].dynamicBootstrapNodes, "dynamic bootstrap"
)
except CatchableError:
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return

proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
debug "Retrieve dynamic bootstrap nodes"

let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
waku.conf.dnsDiscovery, waku.conf.dnsDiscoveryUrl, waku.conf.dnsDiscoveryNameServers
)

if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
# Start Dns Discovery retry loop
waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
else:
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()

if not waku[].conf.discv5Only:
(await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr:
return err("error while calling startNode: " & $error)
Expand Down Expand Up @@ -400,3 +439,6 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =

if not waku.node.isNil():
await waku.node.stop()

if not waku.dnsRetryLoopHandle.isNil():
await waku.dnsRetryLoopHandle.cancelAndWait()

0 comments on commit d7d00bf

Please sign in to comment.