Skip to content

Commit

Permalink
Support domain names in known peers list (#3947)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Jun 3, 2024
1 parent f60dcca commit 74d62e6
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 103 deletions.
3 changes: 2 additions & 1 deletion node-it/src/test/scala/com/wavesplatform/it/Docker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ class Docker(

private def getNodeInfo(containerId: String, settings: WavesSettings): NodeInfo = {
val restApiPort = settings.restAPISettings.port
val networkPort = settings.networkSettings.bindAddress.getPort
// assume test nodes always have an open port
val networkPort = settings.networkSettings.bindAddress.get.getPort

val containerInfo = inspectContainer(containerId)
val wavesIpAddress = containerInfo.networkSettings().networks().get(wavesNetwork.name()).ipAddress()
Expand Down
10 changes: 5 additions & 5 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@ waves {
# Peers and blacklist storage file
file = ${waves.directory}"/peers.dat"

# If defined, the node will bind to this address and accept the incoming connections. When commented out, the node
# will not accept any incoming connections and will only establish outgoing ones. If you're using UPnP for port
# mapping, make sure to specify the correct address here.
bind-address = "0.0.0.0"

# String with IP address and port to send as external address during handshake. Could be set automatically if UPnP
# is enabled.
#
# If `declared-address` is set, which is the common scenario for nodes running in the cloud, the node will just
# listen to incoming connections on `bind-address:port` and broadcast its `declared-address` to its peers. UPnP
# is supposed to be disabled in this scenario.
#
# If declared address is not set and UPnP is not enabled, the node will not listen to incoming connections at all.
#
# If declared address is not set and UPnP is enabled, the node will attempt to connect to an IGD, retrieve its
# external IP address and configure the gateway to allow traffic through. If the node succeeds, the IGD's external
# IP address becomes the node's declared address.
Expand All @@ -79,9 +82,6 @@ waves {
# to `bind-address:port`. Please note, however, that this setup is not recommended.
# declared-address = "1.2.3.4:6863"

# Network address
bind-address = "0.0.0.0"

# Port number
port = 6863

Expand Down
17 changes: 3 additions & 14 deletions node/src/main/resources/network-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ waves.defaults {
port = 6863

known-peers = [
"159.69.126.149:6868"
"94.130.105.239:6868"
"159.69.126.153:6868"
"94.130.172.201:6868"
"35.157.247.122:6868"
"peers-testnet.wavesnodes.com:6868"
]
}
}
Expand Down Expand Up @@ -140,10 +136,7 @@ waves.defaults {
# node-name = "My MAINNET node"

known-peers = [
"168.119.116.189:6868"
"135.181.87.72:6868"
"162.55.39.115:6868"
"168.119.155.201:6868"
"peers.wavesnodes.com:6868"
]
}

Expand All @@ -160,11 +153,7 @@ waves.defaults {
port = 6862

known-peers = [
"88.99.185.128:6868"
"95.216.205.3:6868"
"49.12.15.166:6868"
"88.198.179.16:6868"
"52.58.254.101:6868"
"peers-stagenet.wavesnodes.com:6868"
]
}
}
Expand Down
72 changes: 40 additions & 32 deletions node/src/main/scala/com/wavesplatform/network/NetworkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.wavesplatform.Version
import com.wavesplatform.metrics.Metrics
import com.wavesplatform.network.MessageObserver.Messages
import com.wavesplatform.settings.*
import com.wavesplatform.state.Cast
import com.wavesplatform.transaction.*
import com.wavesplatform.utils.ScorexLogging
import io.netty.bootstrap.{Bootstrap, ServerBootstrap}
Expand All @@ -16,7 +17,7 @@ import io.netty.util.concurrent.DefaultThreadFactory
import monix.reactive.Observable
import org.influxdb.dto.Point

import java.net.{InetSocketAddress, NetworkInterface}
import java.net.{InetSocketAddress, NetworkInterface, SocketAddress}
import java.nio.channels.ClosedChannelException
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.duration.*
Expand Down Expand Up @@ -59,17 +60,17 @@ object NetworkServer extends ScorexLogging {
val trafficLogger = new TrafficLogger(settings.networkSettings.trafficLogger)
val messageCodec = new MessageCodec(peerDatabase)

val excludedAddresses: Set[InetSocketAddress] = {
val bindAddress = settings.networkSettings.bindAddress
val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress)
val localAddresses = if (isLocal) {
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort)))
.toSet
} else Set(bindAddress)
val excludedAddresses: Set[InetSocketAddress] =
settings.networkSettings.bindAddress.fold(Set.empty[InetSocketAddress]) { bindAddress =>
val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress)
val localAddresses = if (isLocal) {
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort)))
.toSet
} else Set(bindAddress)

localAddresses ++ settings.networkSettings.declaredAddress.toSet
}
localAddresses ++ settings.networkSettings.declaredAddress.toSet
}

val lengthFieldPrepender = new LengthFieldPrepender(4)

Expand Down Expand Up @@ -113,7 +114,7 @@ object NetworkServer extends ScorexLogging {
fatalErrorHandler
)

val serverChannel = settings.networkSettings.declaredAddress.map { _ =>
val serverChannel = settings.networkSettings.bindAddress.map { bindAddress =>
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])
Expand All @@ -128,7 +129,7 @@ object NetworkServer extends ScorexLogging {
) ++ pipelineTail
)
)
.bind(settings.networkSettings.bindAddress)
.bind(bindAddress)
.channel()
}

Expand Down Expand Up @@ -166,6 +167,8 @@ object NetworkServer extends ScorexLogging {
s"Channel closed: ${Option(closeFuture.cause()).map(_.getMessage).getOrElse("no message")}"
)
)

logConnections()
}

def handleConnectionAttempt(remoteAddress: InetSocketAddress)(thisConnFuture: ChannelFuture): Unit = {
Expand All @@ -188,6 +191,7 @@ object NetworkServer extends ScorexLogging {
case other => log.debug(formatOutgoingChannelEvent(thisConnFuture.channel(), other.getMessage))
}
}
logConnections()
}

def doConnect(remoteAddress: InetSocketAddress): Unit =
Expand All @@ -201,36 +205,40 @@ object NetworkServer extends ScorexLogging {
}
)

def scheduleConnectTask(): Unit = if (!shutdownInitiated) {
val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) +
(Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously
log.trace(s"Next connection attempt in $delay")
def logConnections(): Unit = {
def mkAddressString(addresses: IterableOnce[SocketAddress]) =
addresses.iterator.map(_.toString).toVector.sorted.mkString("[", ",", "]")

workerGroup.schedule(delay) {
val outgoing = outgoingChannels.keySet.iterator().asScala.toVector
val incoming = peerInfo.values().asScala.view.map(_.remoteAddress).filterNot(outgoingChannels.containsKey)

def outgoingStr = outgoing.map(_.toString).sorted.mkString("[", ", ", "]")
lazy val incomingStr = mkAddressString(incoming)
lazy val outgoingStr = mkAddressString(outgoingChannels.keySet.iterator().asScala)

val all = peerInfo.values().iterator().asScala.flatMap(_.declaredAddress).toVector
val incoming = all.filterNot(outgoing.contains)
val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress])

def incomingStr = incoming.map(_.toString).sorted.mkString("[", ", ", "]")
log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr")

log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr")
Metrics.write(
Point
.measurement("connections")
.addField("outgoing", outgoingStr)
.addField("incoming", incomingStr)
.addField("n", all.size)
)
}

def scheduleConnectTask(): Unit = if (!shutdownInitiated) {
val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) +
(Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously

workerGroup.schedule(delay) {
if (outgoingChannels.size() < settings.networkSettings.maxOutboundConnections) {
val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress])
peerDatabase
.randomPeer(excluded = excludedAddresses ++ all)
.foreach(doConnect)
}

Metrics.write(
Point
.measurement("connections")
.addField("outgoing", outgoingStr)
.addField("incoming", incomingStr)
.addField("n", all.size)
)

scheduleConnectTask()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.wavesplatform.network

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit

import com.google.common.cache.{CacheBuilder, RemovalNotification}
import com.google.common.collect.EvictingQueue
import com.wavesplatform.settings.NetworkSettings
import com.wavesplatform.utils.{JsonFileStorage, ScorexLogging}
import io.netty.channel.Channel
import io.netty.channel.socket.nio.NioSocketChannel

import scala.jdk.CollectionConverters.*
import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.collection.*
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.util.Random
import scala.util.control.NonFatal

Expand All @@ -36,23 +36,12 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor
}

private type PeersPersistenceType = Set[String]
private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime, Some(nonExpiringKnownPeers))
private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime)
private val blacklist = cache[InetAddress](settings.blackListResidenceTime)
private val suspension = cache[InetAddress](settings.suspensionResidenceTime)
private val reasons = mutable.Map.empty[InetAddress, String]
private val unverifiedPeers = EvictingQueue.create[InetSocketAddress](settings.maxUnverifiedPeers)

private val knownPeersAddresses = settings.knownPeers.map(inetSocketAddress(_, 6863))

private def nonExpiringKnownPeers(n: PeerRemoved[InetSocketAddress]): Unit =
if (n.wasEvicted() && knownPeersAddresses.contains(n.getKey))
peersPersistence.put(n.getKey, n.getValue)

for (a <- knownPeersAddresses) {
// add peers from config with max timestamp so they never get evicted from the list of known peers
doTouch(a, Long.MaxValue)
}

for (f <- settings.file if f.exists()) try {
JsonFileStorage.load[PeersPersistenceType](f.getCanonicalPath).foreach(a => touch(inetSocketAddress(a, 6863)))
log.info(s"Loaded ${peersPersistence.size} known peer(s) from ${f.getName}")
Expand All @@ -62,7 +51,7 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor

override def addCandidate(socketAddress: InetSocketAddress): Boolean = unverifiedPeers.synchronized {
val r = !socketAddress.getAddress.isAnyLocalAddress &&
!(socketAddress.getAddress.isLoopbackAddress && socketAddress.getPort == settings.bindAddress.getPort) &&
!(socketAddress.getAddress.isLoopbackAddress && settings.bindAddress.exists(_.getPort == socketAddress.getPort)) &&
Option(peersPersistence.getIfPresent(socketAddress)).isEmpty &&
!unverifiedPeers.contains(socketAddress)
if (r) unverifiedPeers.add(socketAddress)
Expand Down Expand Up @@ -112,26 +101,29 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor
override def suspendedHosts: immutable.Set[InetAddress] = suspension.asMap().asScala.keys.toSet

override def detailedBlacklist: immutable.Map[InetAddress, (Long, String)] =
blacklist.asMap().asScala.view.mapValues(_.toLong).map { case ((h, t)) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap
blacklist.asMap().asScala.view.mapValues(_.toLong).map { case (h, t) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap

override def detailedSuspended: immutable.Map[InetAddress, Long] = suspension.asMap().asScala.view.mapValues(_.toLong).toMap

override def randomPeer(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized {
def excludeAddress(isa: InetSocketAddress): Boolean = {
excluded(isa) || Option(isa.getAddress).exists(blacklistedHosts) || suspendedHosts(isa.getAddress)
}
// excluded only contains local addresses, our declared address, and external declared addresses we already have
// connection to, so it's safe to filter out all matching candidates
unverifiedPeers.removeIf(excluded(_))
val unverified = Option(unverifiedPeers.peek()).filterNot(excludeAddress)
val verified = Random.shuffle(knownPeers.keySet.diff(excluded).toSeq).headOption.filterNot(excludeAddress)

(unverified, verified) match {
case (Some(_), v @ Some(_)) => if (Random.nextBoolean()) Some(unverifiedPeers.poll()) else v
case (Some(_), None) => Some(unverifiedPeers.poll())
case (None, v @ Some(_)) => v
case _ => None

@tailrec
def nextUnverified(): Option[InetSocketAddress] = {
unverifiedPeers.poll() match {
case null => None
case nonNull =>
if (!excludeAddress(nonNull)) Some(nonNull) else nextUnverified()
}
}

nextUnverified() orElse Random
.shuffle(
(knownPeers.keySet ++ settings.knownPeers.map(p => inetSocketAddress(p, 6868))).filterNot(excludeAddress)
)
.headOption
}

def clearBlacklist(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class UPnPSettings(enable: Boolean, gatewayTimeout: FiniteDuration, discove

case class NetworkSettings(
file: Option[File],
bindAddress: InetSocketAddress,
bindAddress: Option[InetSocketAddress],
declaredAddress: Option[InetSocketAddress],
nodeName: String,
nonce: Long,
Expand Down Expand Up @@ -48,7 +48,7 @@ object NetworkSettings {

private[this] def fromConfig(config: Config): NetworkSettings = {
val file = config.getAs[File]("file")
val bindAddress = new InetSocketAddress(config.as[String]("bind-address"), config.as[Int]("port"))
val bindAddress = config.getAs[String]("bind-address").map(addr => new InetSocketAddress(addr, config.as[Int]("port")))
val nonce = config.getOrElse("nonce", randomNonce)
val nodeName = config.getOrElse("node-name", s"Node-$nonce")
require(nodeName.utf8Bytes.length <= MaxNodeNameBytesLength, s"Node name should have length less than $MaxNodeNameBytesLength bytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ class PeerDatabaseImplSpecification extends FreeSpec {
.resolve()
private val settings2 = config2.as[NetworkSettings]("waves.network")

private val config3 = ConfigFactory
.parseString(s"""waves.network {
| file = null
| known-peers = ["$host1:1"]
| peers-data-residence-time = 2s
| enable-peers-exchange = no
|}""".stripMargin)
.withFallback(ConfigFactory.load())
.resolve()
private val settings3 = config3.as[NetworkSettings]("waves.network")

private def withDatabase(settings: NetworkSettings)(f: PeerDatabase => Unit): Unit = {
val pdb = new PeerDatabaseImpl(settings)
f(pdb)
Expand Down Expand Up @@ -78,14 +67,6 @@ class PeerDatabaseImplSpecification extends FreeSpec {
database.randomPeer(Set()) shouldBe empty
}

"known-peers should be always in database" in withDatabase(settings3) { database3 =>
database3.knownPeers.keys should contain(address1)
sleepLong()
database3.knownPeers.keys should contain(address1)
sleepShort()
database3.knownPeers.keys should contain(address1)
}

"touching peer prevent it from obsoleting" in withDatabase(settings1) { database =>
database.addCandidate(address1)
database.touch(address1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NetworkSettingsSpecification extends FlatSpec {
|}""".stripMargin))
val networkSettings = config.as[NetworkSettings]("waves.network")

networkSettings.bindAddress should be(new InetSocketAddress("127.0.0.1", 6868))
networkSettings.bindAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868)))
networkSettings.nodeName should be("default-node-name")
networkSettings.declaredAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868)))
networkSettings.nonce should be(0)
Expand Down

0 comments on commit 74d62e6

Please sign in to comment.