Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Sep 16, 2024
1 parent ce814ef commit b04fa2d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform.network

import java.util
import java.util.concurrent.{ConcurrentMap, TimeUnit}

import com.wavesplatform.network.Handshake.InvalidHandshakeException
import com.wavesplatform.utils.ScorexLogging
import io.netty.buffer.ByteBuf
Expand All @@ -13,6 +10,9 @@ import io.netty.handler.codec.ReplayingDecoder
import io.netty.util.AttributeKey
import io.netty.util.concurrent.ScheduledFuture

import java.net.InetSocketAddress
import java.util
import java.util.concurrent.{ConcurrentMap, TimeUnit}
import scala.concurrent.duration.FiniteDuration

class HandshakeDecoder(peerDatabase: PeerDatabase) extends ReplayingDecoder[Void] with ScorexLogging {
Expand Down Expand Up @@ -80,16 +80,30 @@ abstract class HandshakeHandler(

import HandshakeHandler.*

private def suspendOrClose(msg: => String, verifiedRemoteAddress: Option[InetSocketAddress], ctx: ChannelHandlerContext): Unit = {
log.debug(s"${id(ctx)} $msg")
verifiedRemoteAddress.foreach(peerDatabase.suspend)
ctx.close()
}

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
case remoteHandshake: Handshake =>
val verifiedDeclaredAddress = remoteHandshake.declaredAddress.filter(_ == ctx.channel().remoteAddress())

if (localHandshake.applicationName != remoteHandshake.applicationName)
peerDatabase.blacklistAndClose(
ctx.channel(),
s"Remote application name ${remoteHandshake.applicationName} does not match local ${localHandshake.applicationName}"
suspendOrClose(
s"Remote application name ${remoteHandshake.applicationName} does not match local ${localHandshake.applicationName}",
verifiedDeclaredAddress,
ctx
)
else if (!versionIsSupported(remoteHandshake.applicationVersion))
peerDatabase.blacklistAndClose(ctx.channel(), s"Remote application version ${remoteHandshake.applicationVersion} is not supported")
suspendOrClose(s"Remote application version ${remoteHandshake.applicationVersion} is not supported", verifiedDeclaredAddress, ctx)
else {
verifiedDeclaredAddress.foreach { vda =>
ctx.channel().attr(NodeDeclaredAddressAttributeKey).set(vda)
peerDatabase.touch(vda)
}

PeerKey(ctx, remoteHandshake.nodeNonce) match {
case None =>
log.warn(s"Can't get PeerKey from ${id(ctx)}")
Expand Down Expand Up @@ -139,9 +153,11 @@ abstract class HandshakeHandler(

object HandshakeHandler {

val NodeNameAttributeKey: AttributeKey[String] = AttributeKey.newInstance[String]("name")
val NodeVersionAttributeKey: AttributeKey[(Int, Int, Int)] = AttributeKey.newInstance[(Int, Int, Int)]("version")
private val ConnectionStartAttributeKey = AttributeKey.newInstance[Long]("connectionStart")
val NodeNameAttributeKey: AttributeKey[String] = AttributeKey.newInstance[String]("name")
val NodeVersionAttributeKey: AttributeKey[(Int, Int, Int)] = AttributeKey.newInstance[(Int, Int, Int)]("version")
val NodeDeclaredAddressAttributeKey: AttributeKey[InetSocketAddress] = AttributeKey.newInstance[InetSocketAddress]("declaredAddress")

private val ConnectionStartAttributeKey = AttributeKey.newInstance[Long]("connectionStart")

def versionIsSupported(remoteVersion: (Int, Int, Int)): Boolean =
(remoteVersion._1 == 0 && remoteVersion._2 >= 13) || (remoteVersion._1 == 1 && remoteVersion._2 >= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ object NetworkServer extends ScorexLogging {
if (outgoingChannels.size() < networkSettings.maxOutboundConnections) {
val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress])
peerDatabase
.randomPeer(excluded = excludedAddresses ++ all)
.nextCandidate(excluded = excludedAddresses ++ all)
.foreach(doConnect)
}

Expand Down
20 changes: 8 additions & 12 deletions node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,26 @@ import io.netty.channel.Channel
trait PeerDatabase extends AutoCloseable {

def addCandidate(socketAddress: InetSocketAddress): Boolean

def touch(socketAddress: InetSocketAddress): Unit

def blacklist(host: InetAddress, reason: String): Unit
def livePeers: Set[InetSocketAddress]

def knownPeers: Map[InetSocketAddress, Long]
def nextCandidate(excluded: Set[InetSocketAddress]): Option[InetSocketAddress]

def blacklist(host: InetAddress, reason: String): Unit
def blacklistAndClose(channel: Channel, reason: String): Unit
def isBlacklisted(address: InetAddress): Boolean
def clearBlacklist(): Unit

def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress]
def knownPeers: Map[InetSocketAddress, Long]

def detailedBlacklist: Map[InetAddress, (Long, String)]

def detailedSuspended: Map[InetAddress, Long]

def clearBlacklist(): Unit

def suspend(host: InetSocketAddress): Unit

def blacklistAndClose(channel: Channel, reason: String): Unit

def suspend(host: InetSocketAddress): Unit
def suspendAndClose(channel: Channel): Unit

def livePeers: Set[InetSocketAddress]
}

object PeerDatabase {
Expand All @@ -44,7 +40,7 @@ object PeerDatabase {

override def knownPeers: Map[InetSocketAddress, Long] = Map.empty

override def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None
override def nextCandidate(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None

override def detailedBlacklist: Map[InetAddress, (Long, String)] = Map.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ class PeerDatabaseImpl(settings: NetworkSettings, ticker: Ticker = Ticker.system

override def suspend(socketAddress: InetSocketAddress): Unit = getAddress(socketAddress).foreach { address =>
unverifiedPeers.synchronized {
unverifiedPeers.removeIf { x =>
Option(x.getAddress).contains(address)
}
log.trace(s"Suspending $socketAddress")
unverifiedPeers.removeIf(_ == socketAddress)
suspension.put(address, ticker.read())
}
}
Expand All @@ -97,7 +96,7 @@ class PeerDatabaseImpl(settings: NetworkSettings, ticker: Ticker = Ticker.system

override def detailedSuspended: immutable.Map[InetAddress, Long] = suspension.asMap().asScala.view.mapValues(_.toLong).toMap

override def randomPeer(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized {
override def nextCandidate(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized {
def excludeAddress(isa: InetSocketAddress): Boolean = {
excluded(isa) || isBlacklisted(isa.getAddress) || isSuspended(isa.getAddress)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ import com.wavesplatform.utils.ScorexLogging
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}

import java.net.InetSocketAddress
import scala.concurrent.duration.FiniteDuration

class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDuration) extends ChannelInboundHandlerAdapter with ScorexLogging {
private var peersRequested = false
// declared address is not empty only when this is an outgoing channel, and its declared address matches remote address
private var declaredAddress = Option.empty[InetSocketAddress]
private var peersRequested = false

def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) {
private def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) {
peersRequested = true
ctx.writeAndFlush(GetPeers)

Expand All @@ -22,24 +19,9 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu
}

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
declaredAddress.foreach(peerDatabase.touch)
Option(ctx.channel().attr(HandshakeHandler.NodeDeclaredAddressAttributeKey).get()).foreach(peerDatabase.touch)
msg match {
case hs: Handshake =>
val rda = for {
rda <- hs.declaredAddress
rdaAddress <- Option(rda.getAddress)
ctxAddress <- ctx.remoteAddress.map(_.getAddress)
if rdaAddress == ctxAddress
} yield rda

rda match {
case None => log.debug(s"${id(ctx)} Declared address $rda does not match actual remote address ${ctx.remoteAddress.map(_.getAddress)}")
case Some(x) =>
log.trace(s"${id(ctx)} Touching declared address")
peerDatabase.touch(x)
declaredAddress = Some(x)
}

case _: Handshake =>
requestPeers(ctx)
super.channelRead(ctx, msg)
case GetPeers =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PeerDatabaseImplSpecification extends FreeSpec {
"new candidate should be returned by randomPeer, but should not be added to knownPeers" in withDatabase(settings1) { database =>
database.knownPeers shouldBe empty
database.addCandidate(address1)
database.randomPeer(Set()) should contain(address1)
database.nextCandidate(Set()) should contain(address1)
database.knownPeers shouldBe empty
}

Expand All @@ -72,7 +72,7 @@ class PeerDatabaseImplSpecification extends FreeSpec {
database.knownPeers.keys should contain(address1)
sleepLong()
database.knownPeers shouldBe empty
database.randomPeer(Set()) shouldBe empty
database.nextCandidate(Set()) shouldBe empty
}

"touching peer prevent it from obsoleting" in withDatabase(settings1) { database =>
Expand All @@ -94,10 +94,10 @@ class PeerDatabaseImplSpecification extends FreeSpec {
database.knownPeers.keys should not contain address1
database.knownPeers should be(empty)

database.randomPeer(Set()) should contain(address2)
database.nextCandidate(Set()) should contain(address2)
database.blacklist(address2.getAddress, "")
database.randomPeer(Set()) should not contain address2
database.randomPeer(Set()) should be(empty)
database.nextCandidate(Set()) should not contain address2
database.nextCandidate(Set()) should be(empty)
}

"random peer should return peers from both from database and buffer" in withDatabase(settings2) { database2 =>
Expand All @@ -107,7 +107,7 @@ class PeerDatabaseImplSpecification extends FreeSpec {
keys should contain(address1)
keys should not contain address2

val set = (1 to 10).flatMap(i => database2.randomPeer(Set())).toSet
val set = (1 to 10).flatMap(i => database2.nextCandidate(Set())).toSet

set should contain(address1)
set should contain(address2)
Expand All @@ -118,16 +118,14 @@ class PeerDatabaseImplSpecification extends FreeSpec {
database.addCandidate(address1)
database.addCandidate(address2)

database.randomPeer(Set(address1)) should contain(address2)
database.nextCandidate(Set(address1)) should contain(address2)
}

"filters out wildcard addresses" in withDatabase(settings1) { database =>
database.addCandidate(new InetSocketAddress("0.0.0.0", 6863))
database.randomPeer(Set(address1, address2)) shouldBe None
database.nextCandidate(Set(address1, address2)) shouldBe None
}

""

"should not add nodes to the blacklist if blacklisting is disabled" in {
val config = ConfigFactory
.parseString(s"""waves.network {
Expand Down

0 comments on commit b04fa2d

Please sign in to comment.