diff --git a/node/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala b/node/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala index 834f06fee3..f1451c943e 100644 --- a/node/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala +++ b/node/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala @@ -1,8 +1,5 @@ package com.wavesplatform.network -import java.util -import java.util.concurrent.{ConcurrentMap, TimeUnit} - import com.wavesplatform.network.Handshake.InvalidHandshakeException import com.wavesplatform.utils.ScorexLogging import io.netty.buffer.ByteBuf @@ -13,6 +10,9 @@ import io.netty.handler.codec.ReplayingDecoder import io.netty.util.AttributeKey import io.netty.util.concurrent.ScheduledFuture +import java.net.InetSocketAddress +import java.util +import java.util.concurrent.{ConcurrentMap, TimeUnit} import scala.concurrent.duration.FiniteDuration class HandshakeDecoder(peerDatabase: PeerDatabase) extends ReplayingDecoder[Void] with ScorexLogging { @@ -80,16 +80,30 @@ abstract class HandshakeHandler( import HandshakeHandler.* + private def suspendOrClose(msg: => String, verifiedRemoteAddress: Option[InetSocketAddress], ctx: ChannelHandlerContext): Unit = { + log.debug(s"${id(ctx)} $msg") + verifiedRemoteAddress.foreach(peerDatabase.suspend) + ctx.close() + } + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { case remoteHandshake: Handshake => + val verifiedDeclaredAddress = remoteHandshake.declaredAddress.filter(_ == ctx.channel().remoteAddress()) + if (localHandshake.applicationName != remoteHandshake.applicationName) - peerDatabase.blacklistAndClose( - ctx.channel(), - s"Remote application name ${remoteHandshake.applicationName} does not match local ${localHandshake.applicationName}" + suspendOrClose( + s"Remote application name ${remoteHandshake.applicationName} does not match local ${localHandshake.applicationName}", + verifiedDeclaredAddress, + ctx ) else if (!versionIsSupported(remoteHandshake.applicationVersion)) - peerDatabase.blacklistAndClose(ctx.channel(), s"Remote application version ${remoteHandshake.applicationVersion} is not supported") + suspendOrClose(s"Remote application version ${remoteHandshake.applicationVersion} is not supported", verifiedDeclaredAddress, ctx) else { + verifiedDeclaredAddress.foreach { vda => + ctx.channel().attr(NodeDeclaredAddressAttributeKey).set(vda) + peerDatabase.touch(vda) + } + PeerKey(ctx, remoteHandshake.nodeNonce) match { case None => log.warn(s"Can't get PeerKey from ${id(ctx)}") @@ -139,9 +153,11 @@ abstract class HandshakeHandler( object HandshakeHandler { - val NodeNameAttributeKey: AttributeKey[String] = AttributeKey.newInstance[String]("name") - val NodeVersionAttributeKey: AttributeKey[(Int, Int, Int)] = AttributeKey.newInstance[(Int, Int, Int)]("version") - private val ConnectionStartAttributeKey = AttributeKey.newInstance[Long]("connectionStart") + val NodeNameAttributeKey: AttributeKey[String] = AttributeKey.newInstance[String]("name") + val NodeVersionAttributeKey: AttributeKey[(Int, Int, Int)] = AttributeKey.newInstance[(Int, Int, Int)]("version") + val NodeDeclaredAddressAttributeKey: AttributeKey[InetSocketAddress] = AttributeKey.newInstance[InetSocketAddress]("declaredAddress") + + private val ConnectionStartAttributeKey = AttributeKey.newInstance[Long]("connectionStart") def versionIsSupported(remoteVersion: (Int, Int, Int)): Boolean = (remoteVersion._1 == 0 && remoteVersion._2 >= 13) || (remoteVersion._1 == 1 && remoteVersion._2 >= 0) diff --git a/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala b/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala index 921ceface7..b95f6eff97 100644 --- a/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala +++ b/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala @@ -210,7 +210,7 @@ object NetworkServer extends ScorexLogging { if (outgoingChannels.size() < networkSettings.maxOutboundConnections) { val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress]) peerDatabase - .randomPeer(excluded = excludedAddresses ++ all) + .nextCandidate(excluded = excludedAddresses ++ all) .foreach(doConnect) } diff --git a/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala b/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala index 5f7d68e282..9b44d7260d 100644 --- a/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala +++ b/node/src/main/scala/com/wavesplatform/network/PeerDatabase.scala @@ -7,30 +7,26 @@ import io.netty.channel.Channel trait PeerDatabase extends AutoCloseable { def addCandidate(socketAddress: InetSocketAddress): Boolean - def touch(socketAddress: InetSocketAddress): Unit - def blacklist(host: InetAddress, reason: String): Unit + def livePeers: Set[InetSocketAddress] - def knownPeers: Map[InetSocketAddress, Long] + def nextCandidate(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] + def blacklist(host: InetAddress, reason: String): Unit + def blacklistAndClose(channel: Channel, reason: String): Unit def isBlacklisted(address: InetAddress): Boolean + def clearBlacklist(): Unit - def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] + def knownPeers: Map[InetSocketAddress, Long] def detailedBlacklist: Map[InetAddress, (Long, String)] - def detailedSuspended: Map[InetAddress, Long] - def clearBlacklist(): Unit - - def suspend(host: InetSocketAddress): Unit - def blacklistAndClose(channel: Channel, reason: String): Unit + def suspend(host: InetSocketAddress): Unit def suspendAndClose(channel: Channel): Unit - - def livePeers: Set[InetSocketAddress] } object PeerDatabase { @@ -44,7 +40,7 @@ object PeerDatabase { override def knownPeers: Map[InetSocketAddress, Long] = Map.empty - override def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None + override def nextCandidate(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None override def detailedBlacklist: Map[InetAddress, (Long, String)] = Map.empty diff --git a/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala b/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala index 3be7a19419..6425b4e3a6 100644 --- a/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala +++ b/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala @@ -71,9 +71,8 @@ class PeerDatabaseImpl(settings: NetworkSettings, ticker: Ticker = Ticker.system override def suspend(socketAddress: InetSocketAddress): Unit = getAddress(socketAddress).foreach { address => unverifiedPeers.synchronized { - unverifiedPeers.removeIf { x => - Option(x.getAddress).contains(address) - } + log.trace(s"Suspending $socketAddress") + unverifiedPeers.removeIf(_ == socketAddress) suspension.put(address, ticker.read()) } } @@ -97,7 +96,7 @@ class PeerDatabaseImpl(settings: NetworkSettings, ticker: Ticker = Ticker.system override def detailedSuspended: immutable.Map[InetAddress, Long] = suspension.asMap().asScala.view.mapValues(_.toLong).toMap - override def randomPeer(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized { + override def nextCandidate(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized { def excludeAddress(isa: InetSocketAddress): Boolean = { excluded(isa) || isBlacklisted(isa.getAddress) || isSuspended(isa.getAddress) } diff --git a/node/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala b/node/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala index 9fb59e48be..75a9d6c97a 100644 --- a/node/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala +++ b/node/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala @@ -4,15 +4,12 @@ import com.wavesplatform.utils.ScorexLogging import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} -import java.net.InetSocketAddress import scala.concurrent.duration.FiniteDuration class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDuration) extends ChannelInboundHandlerAdapter with ScorexLogging { - private var peersRequested = false - // declared address is not empty only when this is an outgoing channel, and its declared address matches remote address - private var declaredAddress = Option.empty[InetSocketAddress] + private var peersRequested = false - def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) { + private def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) { peersRequested = true ctx.writeAndFlush(GetPeers) @@ -22,24 +19,9 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu } override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { - declaredAddress.foreach(peerDatabase.touch) + Option(ctx.channel().attr(HandshakeHandler.NodeDeclaredAddressAttributeKey).get()).foreach(peerDatabase.touch) msg match { - case hs: Handshake => - val rda = for { - rda <- hs.declaredAddress - rdaAddress <- Option(rda.getAddress) - ctxAddress <- ctx.remoteAddress.map(_.getAddress) - if rdaAddress == ctxAddress - } yield rda - - rda match { - case None => log.debug(s"${id(ctx)} Declared address $rda does not match actual remote address ${ctx.remoteAddress.map(_.getAddress)}") - case Some(x) => - log.trace(s"${id(ctx)} Touching declared address") - peerDatabase.touch(x) - declaredAddress = Some(x) - } - + case _: Handshake => requestPeers(ctx) super.channelRead(ctx, msg) case GetPeers => diff --git a/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala b/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala index f98c575ba4..be14a40127 100644 --- a/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala @@ -55,7 +55,7 @@ class PeerDatabaseImplSpecification extends FreeSpec { "new candidate should be returned by randomPeer, but should not be added to knownPeers" in withDatabase(settings1) { database => database.knownPeers shouldBe empty database.addCandidate(address1) - database.randomPeer(Set()) should contain(address1) + database.nextCandidate(Set()) should contain(address1) database.knownPeers shouldBe empty } @@ -72,7 +72,7 @@ class PeerDatabaseImplSpecification extends FreeSpec { database.knownPeers.keys should contain(address1) sleepLong() database.knownPeers shouldBe empty - database.randomPeer(Set()) shouldBe empty + database.nextCandidate(Set()) shouldBe empty } "touching peer prevent it from obsoleting" in withDatabase(settings1) { database => @@ -94,10 +94,10 @@ class PeerDatabaseImplSpecification extends FreeSpec { database.knownPeers.keys should not contain address1 database.knownPeers should be(empty) - database.randomPeer(Set()) should contain(address2) + database.nextCandidate(Set()) should contain(address2) database.blacklist(address2.getAddress, "") - database.randomPeer(Set()) should not contain address2 - database.randomPeer(Set()) should be(empty) + database.nextCandidate(Set()) should not contain address2 + database.nextCandidate(Set()) should be(empty) } "random peer should return peers from both from database and buffer" in withDatabase(settings2) { database2 => @@ -107,7 +107,7 @@ class PeerDatabaseImplSpecification extends FreeSpec { keys should contain(address1) keys should not contain address2 - val set = (1 to 10).flatMap(i => database2.randomPeer(Set())).toSet + val set = (1 to 10).flatMap(i => database2.nextCandidate(Set())).toSet set should contain(address1) set should contain(address2) @@ -118,16 +118,14 @@ class PeerDatabaseImplSpecification extends FreeSpec { database.addCandidate(address1) database.addCandidate(address2) - database.randomPeer(Set(address1)) should contain(address2) + database.nextCandidate(Set(address1)) should contain(address2) } "filters out wildcard addresses" in withDatabase(settings1) { database => database.addCandidate(new InetSocketAddress("0.0.0.0", 6863)) - database.randomPeer(Set(address1, address2)) shouldBe None + database.nextCandidate(Set(address1, address2)) shouldBe None } - "" - "should not add nodes to the blacklist if blacklisting is disabled" in { val config = ConfigFactory .parseString(s"""waves.network {