Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stability improvements #3938

Merged
merged 24 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-docker-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

- name: Build sources
run: |
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt buildTarballsForDocker
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt ;buildTarballsForDocker;buildRIDERunnerForDocker

- name: Setup Docker buildx
uses: docker/setup-buildx-action@v2
Expand Down
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ lazy val repl = crossProject(JSPlatform, JVMPlatform)
libraryDependencies ++=
Dependencies.protobuf.value ++
Dependencies.langCompilerPlugins.value ++
Dependencies.circe.value ++
Seq(
"org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0"
),
Dependencies.circe.value,
inConfig(Compile)(
Seq(
PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value,
Expand All @@ -109,6 +106,9 @@ lazy val `repl-jvm` = repl.jvm
)

lazy val `repl-js` = repl.js.dependsOn(`lang-js`)
.settings(
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
)

lazy val `curve25519-test` = project.dependsOn(node)

Expand Down Expand Up @@ -198,6 +198,10 @@ buildTarballsForDocker := {
(`grpc-server` / Universal / packageZipTarball).value,
baseDirectory.value / "docker" / "target" / "waves-grpc-server.tgz"
)
}

lazy val buildRIDERunnerForDocker = taskKey[Unit]("Package RIDE Runner tarball and copy it to docker/target")
buildRIDERunnerForDocker := {
IO.copyFile(
(`ride-runner` / Universal / packageZipTarball).value,
(`ride-runner` / baseDirectory).value / "docker" / "target" / s"${(`ride-runner` / name).value}.tgz"
Expand Down Expand Up @@ -244,7 +248,6 @@ lazy val buildDebPackages = taskKey[Unit]("Build debian packages")
buildDebPackages := {
(`grpc-server` / Debian / packageBin).value
(node / Debian / packageBin).value
(`ride-runner` / Debian / packageBin).value
}

def buildPackages: Command = Command("buildPackages")(_ => Network.networkParser) { (state, args) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.wavesplatform.events

import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.RDB
import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.extensions.{Context, Extension}
Expand All @@ -14,6 +13,7 @@ import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.Ficus.*
import org.rocksdb.RocksDB

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
Expand All @@ -31,9 +31,8 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
)

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
// todo: no need to open column families here
private[this] val rdb = RDB.open(context.settings.dbSettings.copy(directory = context.settings.directory + "/blockchain-updates"))
private[this] val repo = new Repo(rdb.db, context.blocksApi)
private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates")
private[this] val repo = new Repo(rdb, context.blocksApi)

private[this] val grpcServer: Server = NettyServerBuilder
.forAddress(new InetSocketAddress("0.0.0.0", settings.grpcPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ object Curve25519 {
def sign(privateKey: Array[Byte], message: Array[Byte]): Array[Byte] =
provider.calculateSignature(provider.getRandom(SignatureLength), privateKey, message)

def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean = provider.verifySignature(publicKey, message, signature)

def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean =
signature != null && signature.length == SignatureLength &&
publicKey != null && publicKey.length == KeyLength &&
message != null &&
provider.verifySignature(publicKey, message, signature)
}
14 changes: 0 additions & 14 deletions lang/testkit/src/test/resources/logback-test.xml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.wavesplatform.report

import com.wavesplatform.report.QaseReporter.{CaseIdPattern, QaseProjects, TestResult}
import io.qase.api.QaseClient
import io.qase.api.config.QaseConfig
import io.qase.api.utils.IntegrationUtils
import io.qase.client.model.ResultCreate
import org.aeonbits.owner.ConfigFactory
import org.scalatest.Reporter
import org.scalatest.events.*
import play.api.libs.json.{Format, Json}
Expand Down Expand Up @@ -45,7 +46,7 @@ class QaseReporter extends Reporter {
msgOpt: Option[String],
duration: Option[Long]
): Unit =
if (QaseClient.isEnabled) {
if (QaseReporter.isEnabled) {
val errMsg = msgOpt.map(msg => s"\n\n**Error**\n$msg").getOrElse("")
val comment = s"$testName$errMsg"
val stacktrace = throwable.map(IntegrationUtils.getStacktrace)
Expand All @@ -55,7 +56,7 @@ class QaseReporter extends Reporter {
}

private def saveRunResults(): Unit =
if (QaseClient.isEnabled) {
if (QaseReporter.isEnabled) {
results.asScala.foreach { case (projectCode, results) =>
if (results.nonEmpty) {
val writer = new FileWriter(s"./$projectCode-${System.currentTimeMillis()}")
Expand All @@ -73,6 +74,10 @@ class QaseReporter extends Reporter {
}

object QaseReporter {
// this hack prevents QaseClient class from being initialized, which in turn initializes Logback with malformed config
// and prints a warning about unused appender to stdout
private[QaseReporter] val isEnabled = ConfigFactory.create(classOf[QaseConfig]).isEnabled

val RunIdKeyPrefix = "QASE_RUN_ID_"
val CheckPRRunIdKey = "CHECKPR_RUN_ID"
val QaseProjects = Seq("NODE", "RIDE", "BU", "SAPI")
Expand Down
14 changes: 0 additions & 14 deletions lang/tests/src/test/resources/logback-test.xml

This file was deleted.

4 changes: 2 additions & 2 deletions node-it/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ inTask(docker)(
)
)

val packageAll = taskKey[Unit]("build all packages")
docker := docker.dependsOn(LocalProject("waves-node") / packageAll).value
val buildTarballsForDocker = taskKey[Unit]("build all packages")
docker := docker.dependsOn(LocalProject("waves-node") / buildTarballsForDocker).value
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {

private val settings: WavesSettings = wavesSettings.copy(minerSettings = wavesSettings.minerSettings.copy(quorum = 0))

def generateDb(genBlocks: Seq[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
def generateDb(genBlocks: Iterator[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
generateBlockchain(genBlocks, settings.dbSettings.copy(directory = dbDirPath))

def generateBinaryFile(genBlocks: Seq[GenBlock]): Unit = {
def generateBinaryFile(genBlocks: Iterator[GenBlock]): Unit = {
val targetHeight = genBlocks.size + 1
log.info(s"Exporting to $targetHeight")
val outputFilename = s"blockchain-$targetHeight"
Expand All @@ -94,7 +94,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
}

private def generateBlockchain(genBlocks: Seq[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
private def generateBlockchain(genBlocks: Iterator[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
val scheduler = Schedulers.singleThread("appender")
val time = new Time {
val startTime: Long = settings.blockchainSettings.genesisSettings.timestamp
Expand Down Expand Up @@ -185,7 +185,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
case Left(err) => log.error(s"Error appending block: $err")
}
}
}.get
}

private def correctTxTimestamp(genTx: GenTx, time: Time): Transaction =
Expand Down
6 changes: 3 additions & 3 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ waves {
max-cache-size = 100000

max-rollback-depth = 2000
remember-blocks = 3h

# Delete old history entries (Data, WAVES and Asset balances) in this interval before a safe rollback height.
# Comment to disable.
Expand All @@ -40,15 +39,16 @@ waves {
# AA Asset balance history for address.
# cleanup-interval = 500 # Optimal for Xmx2G

use-bloom-filter = false

rocksdb {
main-cache-size = 512M
tx-cache-size = 16M
tx-meta-cache-size = 16M
tx-snapshot-cache-size = 16M
api-cache-size=16M
write-buffer-size = 128M
enable-statistics = false
# When enabled, after writing every SST file of the default column family, reopen it and read all the keys.
paranoid-checks = off
}
}

Expand Down
86 changes: 86 additions & 0 deletions node/src/main/scala/com/wavesplatform/Explorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,92 @@ object Explorer extends ScorexLogging {
log.info(s"Load meta for $id")
val meta = rdb.db.get(Keys.transactionMetaById(TransactionId(ByteStr.decodeBase58(id).get), rdb.txMetaHandle))
log.info(s"Meta: $meta")
case "DH" =>
val address = Address.fromString(argument(1, "address")).explicitGet()
val key = argument(2, "key")
val requestedHeight = argument(3, "height").toInt
log.info(s"Loading address ID for $address")
val addressId = rdb.db.get(Keys.addressId(address)).get
log.info(s"Collecting data history for key $key on $address ($addressId)")
val currentEntry = rdb.db.get(Keys.data(addressId, key))
log.info(s"Current entry: $currentEntry")
val problematicEntry = rdb.db.get(Keys.dataAt(addressId, key)(requestedHeight))
log.info(s"Entry at $requestedHeight: $problematicEntry")
case "DHC" =>
log.info("Looking for data entry history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var key = ""
var addressCount = 0
rdb.db.iterateOver(KeyTags.DataHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
val keyFromKey = new String(e.getKey.drop(10).dropRight(4), "utf-8")
if (addressIdFromKey != thisAddressId) {
thisAddressId = addressIdFromKey
key = keyFromKey
addressCount += 1
} else if (key != keyFromKey) {
key = keyFromKey
} else {
val node = readDataNode(key)(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$address/$key@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")

}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount addresses")
case "ABHC" =>
log.info("Looking for asset balance history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var key = IssuedAsset(ByteStr(new Array[Byte](32)))
var addressCount = 0
rdb.db.iterateOver(KeyTags.AssetBalanceHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(34, 42))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
val keyFromKey = IssuedAsset(ByteStr(e.getKey.slice(2, 34)))
if (keyFromKey != key) {
thisAddressId = addressIdFromKey
key = keyFromKey
addressCount += 1
} else if (thisAddressId != addressIdFromKey) {
thisAddressId = addressIdFromKey
} else {
val node = readBalanceNode(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$key/$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")

}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount assets")
case "BHC" =>
log.info("Looking for balance history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var addressCount = 0
rdb.db.iterateOver(KeyTags.WavesBalanceHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
if (addressIdFromKey != thisAddressId) {
thisAddressId = addressIdFromKey
addressCount += 1
} else {
val node = readBalanceNode(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")
}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount addresses")
}
} finally {
reader.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object GenesisBlockGenerator {
.headOption
.map(new File(_).getAbsoluteFile.ensuring(f => !f.isDirectory && f.getParentFile.isDirectory || f.getParentFile.mkdirs()))

val settings = parseSettings(ConfigFactory.parseFile(inputConfFile))
val settings = parseSettings(ConfigFactory.parseFile(inputConfFile).resolve())
val confBody = createConfig(settings)
outputConfFile.foreach(ocf => Files.write(ocf.toPath, confBody.utf8Bytes))
}
Expand Down
15 changes: 9 additions & 6 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import cats.implicits.catsSyntaxOption
import cats.syntax.apply.*
import com.google.common.io.ByteStreams
import com.google.common.primitives.Ints
import com.google.common.primitives.{Ints, Longs}
import com.wavesplatform.Exporter.Formats
import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonBlocksApi, CommonTransactionsApi}
import com.wavesplatform.block.{Block, BlockHeader}
Expand Down Expand Up @@ -217,6 +217,8 @@ object Importer extends ScorexLogging {
val maxSize = importOptions.maxQueueSize
val queue = new mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])](maxSize)

val CurrentTS = System.currentTimeMillis()

@tailrec
def readBlocks(queue: mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])], remainCount: Int, maxCount: Int): Unit = {
if (remainCount == 0) ()
Expand Down Expand Up @@ -247,11 +249,12 @@ object Importer extends ScorexLogging {
if (blocksToSkip > 0) {
blocksToSkip -= 1
} else {
val blockV5 = blockchain.isFeatureActivated(BlockchainFeatures.BlockV5, blockchain.height + (maxCount - remainCount) + 1)
val rideV6 = blockchain.isFeatureActivated(BlockchainFeatures.RideV6, blockchain.height + (maxCount - remainCount) + 1)
lazy val parsedProtoBlock = PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)

val block = (if (!blockV5) Block.parseBytes(blockBytes) else parsedProtoBlock).orElse(parsedProtoBlock).get
val block = (if (1 < blockBytes.head && blockBytes.head < 5 && Longs.fromByteArray(blockBytes.slice(1, 9)) < CurrentTS)
Block.parseBytes(blockBytes).orElse(parsedProtoBlock)
else
parsedProtoBlock).get
val blockSnapshot = snapshotsBytes.map { bytes =>
BlockSnapshotResponse(
block.id(),
Expand Down Expand Up @@ -308,7 +311,7 @@ object Importer extends ScorexLogging {
case _ =>
counter = counter + 1
}
} else {
} else if (!quit){
log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}")
}
}
Expand Down Expand Up @@ -360,7 +363,7 @@ object Importer extends ScorexLogging {
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
var blocksOffset = 0
var blocksOffset = 0L
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
Expand Down
Loading
Loading