From c94bf22b6f2ab269a6dddc3a63e232aea79cd269 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 5 Feb 2024 14:21:58 +0400 Subject: [PATCH] Stability improvements (#3938) --- .github/workflows/publish-docker-node.yaml | 2 +- build.sbt | 13 +- .../events/BlockchainUpdates.scala | 7 +- .../com/wavesplatform/crypto/Curve25519.scala | 7 +- .../src/test/resources/logback-test.xml | 14 -- .../wavesplatform/report/QaseReporter.scala | 11 +- .../tests/src/test/resources/logback-test.xml | 14 -- node-it/build.sbt | 4 +- .../test/BlockchainGenerator.scala | 8 +- node/src/main/resources/application.conf | 6 +- .../scala/com/wavesplatform/Explorer.scala | 86 ++++++++++ .../wavesplatform/GenesisBlockGenerator.scala | 2 +- .../scala/com/wavesplatform/Importer.scala | 15 +- .../api/common/AddressTransactions.scala | 69 ++++---- .../api/common/CommonAccountsApi.scala | 2 +- .../api/common/lease/AddressLeaseInfo.scala | 8 +- .../common/lease/LeaseByAddressIterator.scala | 11 +- .../wavesplatform/api/common/package.scala | 8 +- .../com/wavesplatform/database/Caches.scala | 16 +- .../wavesplatform/database/DBResource.scala | 12 +- .../wavesplatform/database/KeyHelpers.scala | 5 +- .../com/wavesplatform/database/Keys.scala | 36 +++-- .../com/wavesplatform/database/RDB.scala | 32 ++-- .../database/RocksDBWriter.scala | 147 ++++++++++-------- .../com/wavesplatform/database/package.scala | 14 +- .../history/StorageFactory.scala | 2 +- .../wavesplatform/settings/DBSettings.scala | 5 +- .../settings/RocksDBSettings.scala | 4 +- node/src/test/resources/application.conf | 3 +- .../database/TestStorageFactory.scala | 1 - .../com/wavesplatform/db/InterferableDB.scala | 9 +- .../wavesplatform/db/TxBloomFilterSpec.scala | 34 ++++ .../com/wavesplatform/db/WithState.scala | 4 +- .../history/BlockchainUpdaterNFTTest.scala | 3 +- .../com/wavesplatform/history/Domain.scala | 2 +- .../wavesplatform/state/DataKeyRollback.scala | 60 +++++++ project/Dependencies.scala | 20 ++- project/plugins.sbt | 2 +- repl/jvm/src/test/logback-test.xml | 14 -- .../database/rocksdb/KeyTags.scala | 1 - 40 files changed, 457 insertions(+), 256 deletions(-) delete mode 100644 lang/testkit/src/test/resources/logback-test.xml delete mode 100644 lang/tests/src/test/resources/logback-test.xml create mode 100644 node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala create mode 100644 node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala delete mode 100644 repl/jvm/src/test/logback-test.xml diff --git a/.github/workflows/publish-docker-node.yaml b/.github/workflows/publish-docker-node.yaml index 804c39d8b5c..8fddb878c95 100644 --- a/.github/workflows/publish-docker-node.yaml +++ b/.github/workflows/publish-docker-node.yaml @@ -31,7 +31,7 @@ jobs: - name: Build sources run: | - sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt buildTarballsForDocker + sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt ;buildTarballsForDocker;buildRIDERunnerForDocker - name: Setup Docker buildx uses: docker/setup-buildx-action@v2 diff --git a/build.sbt b/build.sbt index fbe44e4f3a8..f56a44abf22 100644 --- a/build.sbt +++ b/build.sbt @@ -84,10 +84,7 @@ lazy val repl = crossProject(JSPlatform, JVMPlatform) libraryDependencies ++= Dependencies.protobuf.value ++ Dependencies.langCompilerPlugins.value ++ - Dependencies.circe.value ++ - Seq( - "org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0" - ), + Dependencies.circe.value, inConfig(Compile)( Seq( PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value, @@ -109,6 +106,9 @@ lazy val `repl-jvm` = repl.jvm ) lazy val `repl-js` = repl.js.dependsOn(`lang-js`) + .settings( + libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1" + ) lazy val `curve25519-test` = project.dependsOn(node) @@ -198,6 +198,10 @@ buildTarballsForDocker := { (`grpc-server` / Universal / packageZipTarball).value, baseDirectory.value / "docker" / "target" / "waves-grpc-server.tgz" ) +} + +lazy val buildRIDERunnerForDocker = taskKey[Unit]("Package RIDE Runner tarball and copy it to docker/target") +buildRIDERunnerForDocker := { IO.copyFile( (`ride-runner` / Universal / packageZipTarball).value, (`ride-runner` / baseDirectory).value / "docker" / "target" / s"${(`ride-runner` / name).value}.tgz" @@ -244,7 +248,6 @@ lazy val buildDebPackages = taskKey[Unit]("Build debian packages") buildDebPackages := { (`grpc-server` / Debian / packageBin).value (node / Debian / packageBin).value - (`ride-runner` / Debian / packageBin).value } def buildPackages: Command = Command("buildPackages")(_ => Network.networkParser) { (state, args) => diff --git a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala index 166f41d5a47..270a99986ff 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala @@ -2,7 +2,6 @@ package com.wavesplatform.events import com.wavesplatform.block.{Block, MicroBlock} import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.database.RDB import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc import com.wavesplatform.events.settings.BlockchainUpdatesSettings import com.wavesplatform.extensions.{Context, Extension} @@ -14,6 +13,7 @@ import io.grpc.{Metadata, Server, ServerStreamTracer, Status} import monix.execution.schedulers.SchedulerService import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter} import net.ceedubs.ficus.Ficus.* +import org.rocksdb.RocksDB import java.net.InetSocketAddress import java.util.concurrent.TimeUnit @@ -31,9 +31,8 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco ) private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates") - // todo: no need to open column families here - private[this] val rdb = RDB.open(context.settings.dbSettings.copy(directory = context.settings.directory + "/blockchain-updates")) - private[this] val repo = new Repo(rdb.db, context.blocksApi) + private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates") + private[this] val repo = new Repo(rdb, context.blocksApi) private[this] val grpcServer: Server = NettyServerBuilder .forAddress(new InetSocketAddress("0.0.0.0", settings.grpcPort)) diff --git a/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala b/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala index 40f876471ff..5c31009e71a 100644 --- a/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala +++ b/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala @@ -21,6 +21,9 @@ object Curve25519 { def sign(privateKey: Array[Byte], message: Array[Byte]): Array[Byte] = provider.calculateSignature(provider.getRandom(SignatureLength), privateKey, message) - def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean = provider.verifySignature(publicKey, message, signature) - + def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean = + signature != null && signature.length == SignatureLength && + publicKey != null && publicKey.length == KeyLength && + message != null && + provider.verifySignature(publicKey, message, signature) } diff --git a/lang/testkit/src/test/resources/logback-test.xml b/lang/testkit/src/test/resources/logback-test.xml deleted file mode 100644 index 471a19efaff..00000000000 --- a/lang/testkit/src/test/resources/logback-test.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - %date %-5level [%.15thread] %logger{26} - %msg%n - - - - - - - - - diff --git a/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala b/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala index fc3dbc834c6..92d692a1a7f 100644 --- a/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala +++ b/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala @@ -1,9 +1,10 @@ package com.wavesplatform.report import com.wavesplatform.report.QaseReporter.{CaseIdPattern, QaseProjects, TestResult} -import io.qase.api.QaseClient +import io.qase.api.config.QaseConfig import io.qase.api.utils.IntegrationUtils import io.qase.client.model.ResultCreate +import org.aeonbits.owner.ConfigFactory import org.scalatest.Reporter import org.scalatest.events.* import play.api.libs.json.{Format, Json} @@ -45,7 +46,7 @@ class QaseReporter extends Reporter { msgOpt: Option[String], duration: Option[Long] ): Unit = - if (QaseClient.isEnabled) { + if (QaseReporter.isEnabled) { val errMsg = msgOpt.map(msg => s"\n\n**Error**\n$msg").getOrElse("") val comment = s"$testName$errMsg" val stacktrace = throwable.map(IntegrationUtils.getStacktrace) @@ -55,7 +56,7 @@ class QaseReporter extends Reporter { } private def saveRunResults(): Unit = - if (QaseClient.isEnabled) { + if (QaseReporter.isEnabled) { results.asScala.foreach { case (projectCode, results) => if (results.nonEmpty) { val writer = new FileWriter(s"./$projectCode-${System.currentTimeMillis()}") @@ -73,6 +74,10 @@ class QaseReporter extends Reporter { } object QaseReporter { + // this hack prevents QaseClient class from being initialized, which in turn initializes Logback with malformed config + // and prints a warning about unused appender to stdout + private[QaseReporter] val isEnabled = ConfigFactory.create(classOf[QaseConfig]).isEnabled + val RunIdKeyPrefix = "QASE_RUN_ID_" val CheckPRRunIdKey = "CHECKPR_RUN_ID" val QaseProjects = Seq("NODE", "RIDE", "BU", "SAPI") diff --git a/lang/tests/src/test/resources/logback-test.xml b/lang/tests/src/test/resources/logback-test.xml deleted file mode 100644 index b1f394b2e41..00000000000 --- a/lang/tests/src/test/resources/logback-test.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - %date %-5level [%.15thread] %logger{26} - %msg%n - - - - - - - - - diff --git a/node-it/build.sbt b/node-it/build.sbt index d2d0af510ca..7e736881401 100644 --- a/node-it/build.sbt +++ b/node-it/build.sbt @@ -11,5 +11,5 @@ inTask(docker)( ) ) -val packageAll = taskKey[Unit]("build all packages") -docker := docker.dependsOn(LocalProject("waves-node") / packageAll).value +val buildTarballsForDocker = taskKey[Unit]("build all packages") +docker := docker.dependsOn(LocalProject("waves-node") / buildTarballsForDocker).value diff --git a/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala b/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala index d66ce80616a..a8932b09bc2 100644 --- a/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala +++ b/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala @@ -64,10 +64,10 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging { private val settings: WavesSettings = wavesSettings.copy(minerSettings = wavesSettings.minerSettings.copy(quorum = 0)) - def generateDb(genBlocks: Seq[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit = + def generateDb(genBlocks: Iterator[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit = generateBlockchain(genBlocks, settings.dbSettings.copy(directory = dbDirPath)) - def generateBinaryFile(genBlocks: Seq[GenBlock]): Unit = { + def generateBinaryFile(genBlocks: Iterator[GenBlock]): Unit = { val targetHeight = genBlocks.size + 1 log.info(s"Exporting to $targetHeight") val outputFilename = s"blockchain-$targetHeight" @@ -94,7 +94,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging { } } - private def generateBlockchain(genBlocks: Seq[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = { + private def generateBlockchain(genBlocks: Iterator[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = { val scheduler = Schedulers.singleThread("appender") val time = new Time { val startTime: Long = settings.blockchainSettings.genesisSettings.timestamp @@ -185,7 +185,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging { } case Left(err) => log.error(s"Error appending block: $err") } - } + }.get } private def correctTxTimestamp(genTx: GenTx, time: Time): Transaction = diff --git a/node/src/main/resources/application.conf b/node/src/main/resources/application.conf index f642aa6f81b..8501dbf4b1b 100644 --- a/node/src/main/resources/application.conf +++ b/node/src/main/resources/application.conf @@ -23,7 +23,6 @@ waves { max-cache-size = 100000 max-rollback-depth = 2000 - remember-blocks = 3h # Delete old history entries (Data, WAVES and Asset balances) in this interval before a safe rollback height. # Comment to disable. @@ -40,15 +39,16 @@ waves { # AA Asset balance history for address. # cleanup-interval = 500 # Optimal for Xmx2G - use-bloom-filter = false - rocksdb { main-cache-size = 512M tx-cache-size = 16M tx-meta-cache-size = 16M tx-snapshot-cache-size = 16M + api-cache-size=16M write-buffer-size = 128M enable-statistics = false + # When enabled, after writing every SST file of the default column family, reopen it and read all the keys. + paranoid-checks = off } } diff --git a/node/src/main/scala/com/wavesplatform/Explorer.scala b/node/src/main/scala/com/wavesplatform/Explorer.scala index 96bae8692fc..b51f99e5e97 100644 --- a/node/src/main/scala/com/wavesplatform/Explorer.scala +++ b/node/src/main/scala/com/wavesplatform/Explorer.scala @@ -390,6 +390,92 @@ object Explorer extends ScorexLogging { log.info(s"Load meta for $id") val meta = rdb.db.get(Keys.transactionMetaById(TransactionId(ByteStr.decodeBase58(id).get), rdb.txMetaHandle)) log.info(s"Meta: $meta") + case "DH" => + val address = Address.fromString(argument(1, "address")).explicitGet() + val key = argument(2, "key") + val requestedHeight = argument(3, "height").toInt + log.info(s"Loading address ID for $address") + val addressId = rdb.db.get(Keys.addressId(address)).get + log.info(s"Collecting data history for key $key on $address ($addressId)") + val currentEntry = rdb.db.get(Keys.data(addressId, key)) + log.info(s"Current entry: $currentEntry") + val problematicEntry = rdb.db.get(Keys.dataAt(addressId, key)(requestedHeight)) + log.info(s"Entry at $requestedHeight: $problematicEntry") + case "DHC" => + log.info("Looking for data entry history corruptions") + var thisAddressId = 0L + var prevHeight = 0 + var key = "" + var addressCount = 0 + rdb.db.iterateOver(KeyTags.DataHistory.prefixBytes, None) { e => + val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10)) + val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4)) + val keyFromKey = new String(e.getKey.drop(10).dropRight(4), "utf-8") + if (addressIdFromKey != thisAddressId) { + thisAddressId = addressIdFromKey + key = keyFromKey + addressCount += 1 + } else if (key != keyFromKey) { + key = keyFromKey + } else { + val node = readDataNode(key)(e.getValue) + if (node.prevHeight != prevHeight) { + val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId))) + log.warn(s"$address/$key@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight") + + } + } + prevHeight = heightFromKey + } + log.info(s"Checked $addressCount addresses") + case "ABHC" => + log.info("Looking for asset balance history corruptions") + var thisAddressId = 0L + var prevHeight = 0 + var key = IssuedAsset(ByteStr(new Array[Byte](32))) + var addressCount = 0 + rdb.db.iterateOver(KeyTags.AssetBalanceHistory.prefixBytes, None) { e => + val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(34, 42)) + val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4)) + val keyFromKey = IssuedAsset(ByteStr(e.getKey.slice(2, 34))) + if (keyFromKey != key) { + thisAddressId = addressIdFromKey + key = keyFromKey + addressCount += 1 + } else if (thisAddressId != addressIdFromKey) { + thisAddressId = addressIdFromKey + } else { + val node = readBalanceNode(e.getValue) + if (node.prevHeight != prevHeight) { + val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId))) + log.warn(s"$key/$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight") + + } + } + prevHeight = heightFromKey + } + log.info(s"Checked $addressCount assets") + case "BHC" => + log.info("Looking for balance history corruptions") + var thisAddressId = 0L + var prevHeight = 0 + var addressCount = 0 + rdb.db.iterateOver(KeyTags.WavesBalanceHistory.prefixBytes, None) { e => + val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10)) + val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4)) + if (addressIdFromKey != thisAddressId) { + thisAddressId = addressIdFromKey + addressCount += 1 + } else { + val node = readBalanceNode(e.getValue) + if (node.prevHeight != prevHeight) { + val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId))) + log.warn(s"$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight") + } + } + prevHeight = heightFromKey + } + log.info(s"Checked $addressCount addresses") } } finally { reader.close() diff --git a/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala b/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala index f41d5874a4b..51444fddf50 100644 --- a/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala +++ b/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala @@ -96,7 +96,7 @@ object GenesisBlockGenerator { .headOption .map(new File(_).getAbsoluteFile.ensuring(f => !f.isDirectory && f.getParentFile.isDirectory || f.getParentFile.mkdirs())) - val settings = parseSettings(ConfigFactory.parseFile(inputConfFile)) + val settings = parseSettings(ConfigFactory.parseFile(inputConfFile).resolve()) val confBody = createConfig(settings) outputConfFile.foreach(ocf => Files.write(ocf.toPath, confBody.utf8Bytes)) } diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 7f20c716e23..2b959f8030e 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import cats.implicits.catsSyntaxOption import cats.syntax.apply.* import com.google.common.io.ByteStreams -import com.google.common.primitives.Ints +import com.google.common.primitives.{Ints, Longs} import com.wavesplatform.Exporter.Formats import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonBlocksApi, CommonTransactionsApi} import com.wavesplatform.block.{Block, BlockHeader} @@ -217,6 +217,8 @@ object Importer extends ScorexLogging { val maxSize = importOptions.maxQueueSize val queue = new mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])](maxSize) + val CurrentTS = System.currentTimeMillis() + @tailrec def readBlocks(queue: mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])], remainCount: Int, maxCount: Int): Unit = { if (remainCount == 0) () @@ -247,11 +249,12 @@ object Importer extends ScorexLogging { if (blocksToSkip > 0) { blocksToSkip -= 1 } else { - val blockV5 = blockchain.isFeatureActivated(BlockchainFeatures.BlockV5, blockchain.height + (maxCount - remainCount) + 1) val rideV6 = blockchain.isFeatureActivated(BlockchainFeatures.RideV6, blockchain.height + (maxCount - remainCount) + 1) lazy val parsedProtoBlock = PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true) - - val block = (if (!blockV5) Block.parseBytes(blockBytes) else parsedProtoBlock).orElse(parsedProtoBlock).get + val block = (if (1 < blockBytes.head && blockBytes.head < 5 && Longs.fromByteArray(blockBytes.slice(1, 9)) < CurrentTS) + Block.parseBytes(blockBytes).orElse(parsedProtoBlock) + else + parsedProtoBlock).get val blockSnapshot = snapshotsBytes.map { bytes => BlockSnapshotResponse( block.id(), @@ -308,7 +311,7 @@ object Importer extends ScorexLogging { case _ => counter = counter + 1 } - } else { + } else if (!quit){ log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") } } @@ -360,7 +363,7 @@ object Importer extends ScorexLogging { val blocksFileOffset = importOptions.format match { case Formats.Binary => - var blocksOffset = 0 + var blocksOffset = 0L rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e => e.getKey match { case Array(_, _, 0, 0, 0, 1) => // Skip genesis diff --git a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala index 4fe340eac38..bfbc56bd260 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala @@ -33,28 +33,33 @@ object AddressTransactions { } .toSeq - private def loadInvokeScriptResult(resource: DBResource, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[InvokeScriptResult] = + private def loadInvokeScriptResult( + resource: DBResource, + txMetaHandle: RDB.TxMetaHandle, + apiHandle: RDB.ApiHandle, + txId: ByteStr + ): Option[InvokeScriptResult] = for { tm <- resource.get(Keys.transactionMetaById(TransactionId(txId), txMetaHandle)) - scriptResult <- resource.get(Keys.invokeScriptResult(tm.height, TxNum(tm.num.toShort))) + scriptResult <- resource.get(Keys.invokeScriptResult(tm.height, TxNum(tm.num.toShort), apiHandle)) } yield scriptResult - def loadInvokeScriptResult(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[InvokeScriptResult] = - db.withResource(r => loadInvokeScriptResult(r, txMetaHandle, txId)) + def loadInvokeScriptResult(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, apiHandle: RDB.ApiHandle, txId: ByteStr): Option[InvokeScriptResult] = + db.withResource(r => loadInvokeScriptResult(r, txMetaHandle, apiHandle, txId)) - def loadInvokeScriptResult(db: RocksDB, height: Height, txNum: TxNum): Option[InvokeScriptResult] = - db.get(Keys.invokeScriptResult(height, txNum)) + def loadInvokeScriptResult(db: RocksDB, apiHandle: RDB.ApiHandle, height: Height, txNum: TxNum): Option[InvokeScriptResult] = + db.get(Keys.invokeScriptResult(height, txNum, apiHandle)) - def loadEthereumMetadata(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[EthereumTransactionMeta] = db.withResource { - resource => + def loadEthereumMetadata(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, apiHandle: RDB.ApiHandle, txId: ByteStr): Option[EthereumTransactionMeta] = + db.withResource { resource => for { tm <- resource.get(Keys.transactionMetaById(TransactionId(txId), txMetaHandle)) - m <- resource.get(Keys.ethereumTransactionMeta(Height(tm.height), TxNum(tm.num.toShort))) + m <- resource.get(Keys.ethereumTransactionMeta(Height(tm.height), TxNum(tm.num.toShort), apiHandle)) } yield m - } + } - def loadEthereumMetadata(db: RocksDB, height: Height, txNum: TxNum): Option[EthereumTransactionMeta] = - db.get(Keys.ethereumTransactionMeta(height, txNum)) + def loadEthereumMetadata(db: RocksDB, apiHandle: RDB.ApiHandle, height: Height, txNum: TxNum): Option[EthereumTransactionMeta] = + db.get(Keys.ethereumTransactionMeta(height, txNum, apiHandle)) def allAddressTransactions( rdb: RDB, @@ -82,24 +87,25 @@ object AddressTransactions { sender: Option[Address], types: Set[Transaction.Type], fromId: Option[ByteStr] - ): Observable[(TxMeta, Transaction, Option[TxNum])] = rdb.db.resourceObservable.flatMap { dbResource => - dbResource - .get(Keys.addressId(subject)) - .fold(Observable.empty[(TxMeta, Transaction, Option[TxNum])]) { addressId => - val (maxHeight, maxTxNum) = - fromId - .flatMap(id => rdb.db.get(Keys.transactionMetaById(TransactionId(id), rdb.txMetaHandle))) - .fold[(Height, TxNum)](Height(Int.MaxValue) -> TxNum(Short.MaxValue)) { tm => - Height(tm.height) -> TxNum(tm.num.toShort) - } + ): Observable[(TxMeta, Transaction, Option[TxNum])] = + rdb.db.resourceObservable(rdb.apiHandle.handle).flatMap { dbResource => + dbResource + .get(Keys.addressId(subject)) + .fold(Observable.empty[(TxMeta, Transaction, Option[TxNum])]) { addressId => + val (maxHeight, maxTxNum) = + fromId + .flatMap(id => rdb.db.get(Keys.transactionMetaById(TransactionId(id), rdb.txMetaHandle))) + .fold[(Height, TxNum)](Height(Int.MaxValue) -> TxNum(Short.MaxValue)) { tm => + Height(tm.height) -> TxNum(tm.num.toShort) + } - Observable - .fromIterator( - Task(new TxByAddressIterator(dbResource, rdb.txHandle, addressId, maxHeight, maxTxNum, sender, types).asScala) - ) - .concatMapIterable(identity) - } - } + Observable + .fromIterator( + Task(new TxByAddressIterator(dbResource, rdb.txHandle, rdb.apiHandle, addressId, maxHeight, maxTxNum, sender, types).asScala) + ) + .concatMapIterable(identity) + } + } private def transactionsFromSnapshot( maybeSnapshot: Option[(Height, StateSnapshot)], @@ -121,14 +127,15 @@ object AddressTransactions { private class TxByAddressIterator( db: DBResource, txHandle: RDB.TxHandle, + apiHandle: RDB.ApiHandle, addressId: AddressId, maxHeight: Int, maxTxNum: Int, sender: Option[Address], types: Set[Transaction.Type] ) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] { - private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId)) - db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr).keyBytes))() + private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId, apiHandle)) + db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))() final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator => val keysBuffer = new ArrayBuffer[Key[Option[(TxMeta, Transaction)]]]() diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala index 812e38d4304..4faaf1b6c13 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala @@ -94,7 +94,7 @@ object CommonAccountsApi { } override def nftList(address: Address, after: Option[IssuedAsset]): Observable[Seq[(IssuedAsset, AssetDescription)]] = { - rdb.db.resourceObservable.flatMap { resource => + rdb.db.resourceObservable(rdb.apiHandle.handle).flatMap { resource => Observable .fromIterator(Task(nftIterator(resource, address, compositeBlockchain().snapshot, after, blockchain.assetDescription))) } diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala index 9bb9d621846..85b6e9cd9e1 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala @@ -39,15 +39,15 @@ object AddressLeaseInfo { private def leasesFromDb(rdb: RDB, subject: Address): Observable[LeaseInfo] = for { - dbResource <- rdb.db.resourceObservable + dbResource <- rdb.db.resourceObservable(rdb.apiHandle.handle) (leaseId, details) <- dbResource .get(Keys.addressId(subject)) - .map(fromLeaseDbIterator(dbResource, _)) + .map(fromLeaseDbIterator(dbResource, rdb.apiHandle, _)) .getOrElse(Observable.empty) } yield LeaseInfo.fromLeaseDetails(leaseId, details) - private def fromLeaseDbIterator(dbResource: DBResource, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] = + private def fromLeaseDbIterator(dbResource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] = Observable - .fromIterator(Task(new LeaseByAddressIterator(dbResource, addressId).asScala)) + .fromIterator(Task(new LeaseByAddressIterator(dbResource, apiHandle, addressId).asScala)) .concatMapIterable(identity) } diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala index 6f18da0c217..f9821a7d214 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala @@ -3,18 +3,19 @@ package com.wavesplatform.api.common.lease import com.google.common.collect.AbstractIterator import com.wavesplatform.common.state.ByteStr import com.wavesplatform.database -import com.wavesplatform.database.{AddressId, DBResource, Keys} +import com.wavesplatform.database.{AddressId, DBResource, Keys, RDB} import com.wavesplatform.state.LeaseDetails -private class LeaseByAddressIterator(resource: DBResource, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] { - private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId)) - resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr).keyBytes))() +private class LeaseByAddressIterator(resource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId) + extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] { + private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId, apiHandle)) + resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))() final override def computeNext(): Seq[(ByteStr, LeaseDetails)] = resource.withSafePrefixIterator { iterator => if (iterator.isValid) { val details = for { - id <- database.readLeaseIdSeq(iterator.value()) + id <- database.readLeaseIdSeq(iterator.value()) details <- database.loadLease(resource, id) if details.isActive } yield (id, details) iterator.prev() diff --git a/node/src/main/scala/com/wavesplatform/api/common/package.scala b/node/src/main/scala/com/wavesplatform/api/common/package.scala index 6b2e1e20437..a1d0dc60478 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/package.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/package.scala @@ -28,12 +28,12 @@ package object common { def loadISR(t: Transaction) = maybeDiff .flatMap { case (_, diff) => diff.scriptResults.get(t.id()) } - .orElse(txNumOpt.flatMap(loadInvokeScriptResult(rdb.db, m.height, _))) + .orElse(txNumOpt.flatMap(loadInvokeScriptResult(rdb.db, rdb.apiHandle, m.height, _))) def loadETM(t: Transaction) = maybeDiff .flatMap { case (_, diff) => diff.ethereumTransactionMeta.get(t.id()) } - .orElse(txNumOpt.flatMap(loadEthereumMetadata(rdb.db, m.height, _))) + .orElse(txNumOpt.flatMap(loadEthereumMetadata(rdb.db, rdb.apiHandle, m.height, _))) TransactionMeta.create( m.height, @@ -90,11 +90,11 @@ package object common { ist => maybeSnapshot .flatMap { case (_, s) => s.scriptResults.get(ist.id()) } - .orElse(loadInvokeScriptResult(rdb.db, rdb.txMetaHandle, ist.id())), + .orElse(loadInvokeScriptResult(rdb.db, rdb.txMetaHandle, rdb.apiHandle, ist.id())), et => maybeSnapshot .flatMap { case (_, s) => s.ethereumTransactionMeta.get(et.id()) } - .orElse(loadEthereumMetadata(rdb.db, rdb.txMetaHandle, et.id())) + .orElse(loadEthereumMetadata(rdb.db, rdb.txMetaHandle, rdb.apiHandle, et.id())) ) } } diff --git a/node/src/main/scala/com/wavesplatform/database/Caches.scala b/node/src/main/scala/com/wavesplatform/database/Caches.scala index 38a51d6f37d..5129ce7c6b2 100644 --- a/node/src/main/scala/com/wavesplatform/database/Caches.scala +++ b/node/src/main/scala/com/wavesplatform/database/Caches.scala @@ -128,7 +128,7 @@ abstract class Caches extends Blockchain with Storage { VolumeAndFee(curVf.volume, curVf.fee) } - private val memMeter = MemoryMeter.builder().build() + protected val memMeter = MemoryMeter.builder().build() private val scriptCache: LoadingCache[Address, Option[AccountScriptInfo]] = CacheBuilder @@ -183,7 +183,7 @@ abstract class Caches extends Blockchain with Storage { protected def discardAccountData(addressWithKey: (Address, String)): Unit = accountDataCache.invalidate(addressWithKey) protected def loadAccountData(acc: Address, key: String): CurrentData - protected def loadEntryHeights(keys: Iterable[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] + protected def loadEntryHeights(keys: Seq[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] private[database] def addressId(address: Address): Option[AddressId] = addressIdCache.get(address) private[database] def addressIds(addresses: Seq[Address]): Map[Address, Option[AddressId]] = @@ -299,15 +299,15 @@ abstract class Caches extends Blockchain with Storage { (key, entry) <- entries } yield ((address, key), entry) - val cachedEntries = accountDataCache.getAllPresent(newEntries.keys.asJava).asScala - val loadedPrevEntries = loadEntryHeights(newEntries.keys.filterNot(cachedEntries.contains), addressIdWithFallback(_, newAddressIds)) + val cachedEntries = accountDataCache.getAllPresent(newEntries.keys.asJava).asScala + val loadedPrevEntryHeights = loadEntryHeights(newEntries.keys.filterNot(cachedEntries.contains).toSeq, addressIdWithFallback(_, newAddressIds)) val updatedDataWithNodes = (for { - (k, currentEntry) <- cachedEntries.view.mapValues(_.height) ++ loadedPrevEntries - newEntry <- newEntries.get(k) + (k, heightOfPreviousEntry) <- cachedEntries.view.mapValues(_.height) ++ loadedPrevEntryHeights + newEntry <- newEntries.get(k) } yield k -> ( - CurrentData(newEntry, Height(height), currentEntry), - DataNode(newEntry, currentEntry) + CurrentData(newEntry, Height(height), heightOfPreviousEntry), + DataNode(newEntry, heightOfPreviousEntry) )).toMap val orderFillsWithNodes = for { diff --git a/node/src/main/scala/com/wavesplatform/database/DBResource.scala b/node/src/main/scala/com/wavesplatform/database/DBResource.scala index 5217cbbcffb..7d3f17515b8 100644 --- a/node/src/main/scala/com/wavesplatform/database/DBResource.scala +++ b/node/src/main/scala/com/wavesplatform/database/DBResource.scala @@ -1,6 +1,6 @@ package com.wavesplatform.database -import org.rocksdb.{ReadOptions, RocksDB, RocksIterator} +import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator} import scala.collection.View import scala.collection.mutable.ArrayBuffer @@ -18,9 +18,9 @@ trait DBResource extends AutoCloseable { } object DBResource { - def apply(db: RocksDB): DBResource = new DBResource { + def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource { private[this] val snapshot = db.getSnapshot - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) + private[this] val readOptions = new ReadOptions().setSnapshot(snapshot) override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) @@ -35,9 +35,11 @@ object DBResource { def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = db.multiGet(readOptions, keys, valBufferSize) - override lazy val prefixIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) + override lazy val prefixIterator: RocksIterator = + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) - override lazy val fullIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(true)) + override lazy val fullIterator: RocksIterator = + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true)) override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed diff --git a/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala b/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala index 74bd3fe903f..3176c591661 100644 --- a/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala +++ b/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala @@ -3,6 +3,7 @@ package com.wavesplatform.database import com.google.common.primitives.{Bytes, Ints, Longs, Shorts} import com.wavesplatform.state import com.wavesplatform.state.{Height, TxNum} +import org.rocksdb.ColumnFamilyHandle import java.nio.ByteBuffer @@ -32,8 +33,8 @@ object KeyHelpers { Ints.toByteArray ) - def bytesSeqNr(keyTag: KeyTags.KeyTag, suffix: Array[Byte], default: Int = 0): Key[Int] = - Key(keyTag, suffix, v => if (v != null && v.length >= Ints.BYTES) Ints.fromByteArray(v) else default, Ints.toByteArray) + def bytesSeqNr(keyTag: KeyTags.KeyTag, suffix: Array[Byte], default: Int = 0, cfh: Option[ColumnFamilyHandle] = None): Key[Int] = + Key(keyTag, suffix, v => if (v != null && v.length >= Ints.BYTES) Ints.fromByteArray(v) else default, Ints.toByteArray, cfh) def unsupported[A](message: String): A => Array[Byte] = _ => throw new UnsupportedOperationException(message) } diff --git a/node/src/main/scala/com/wavesplatform/database/Keys.scala b/node/src/main/scala/com/wavesplatform/database/Keys.scala index 064366ace27..0323a236dac 100644 --- a/node/src/main/scala/com/wavesplatform/database/Keys.scala +++ b/node/src/main/scala/com/wavesplatform/database/Keys.scala @@ -167,7 +167,7 @@ object Keys { Some(cfHandle.handle) ) - def transactionStateSnapshotAt(height: Height, n: TxNum, cfHandle: RDB.TxHandle): Key[Option[TransactionStateSnapshot]] = + def transactionStateSnapshotAt(height: Height, n: TxNum, cfHandle: RDB.TxSnapshotHandle): Key[Option[TransactionStateSnapshot]] = Key.opt[TransactionStateSnapshot]( NthTransactionStateSnapshotAtHeight, hNum(height, n), @@ -176,26 +176,28 @@ object Keys { Some(cfHandle.handle) ) - def addressTransactionSeqNr(addressId: AddressId): Key[Int] = - bytesSeqNr(AddressTransactionSeqNr, addressId.toByteArray) + def addressTransactionSeqNr(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] = + bytesSeqNr(AddressTransactionSeqNr, addressId.toByteArray, cfh = Some(cfh.handle)) - def addressTransactionHN(addressId: AddressId, seqNr: Int): Key[Option[(Height, Seq[(Byte, TxNum, Int)])]] = + def addressTransactionHN(addressId: AddressId, seqNr: Int, cfh: RDB.ApiHandle): Key[Option[(Height, Seq[(Byte, TxNum, Int)])]] = Key.opt( AddressTransactionHeightTypeAndNums, hBytes(addressId.toByteArray, seqNr), readTransactionHNSeqAndType, - writeTransactionHNSeqAndType + writeTransactionHNSeqAndType, + Some(cfh.handle) ) - def addressLeaseSeqNr(addressId: AddressId): Key[Int] = - bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray) + def addressLeaseSeqNr(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] = + bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray, cfh = Some(cfh.handle)) - def addressLeaseSeq(addressId: AddressId, seqNr: Int): Key[Option[Seq[ByteStr]]] = + def addressLeaseSeq(addressId: AddressId, seqNr: Int, cfh: RDB.ApiHandle): Key[Option[Seq[ByteStr]]] = Key.opt( AddressLeaseInfoSeq, hBytes(addressId.toByteArray, seqNr), readLeaseIdSeq, - writeLeaseIdSeq + writeLeaseIdSeq, + Some(cfh.handle) ) def transactionMetaById(txId: TransactionId, cfh: RDB.TxMetaHandle): Key[Option[TransactionMeta]] = @@ -207,8 +209,8 @@ object Keys { Some(cfh.handle) ) - def invokeScriptResult(height: Int, txNum: TxNum): Key[Option[InvokeScriptResult]] = - Key.opt(InvokeScriptResultTag, hNum(height, txNum), InvokeScriptResult.fromBytes, InvokeScriptResult.toBytes) + def invokeScriptResult(height: Int, txNum: TxNum, cfh: RDB.ApiHandle): Key[Option[InvokeScriptResult]] = + Key.opt(InvokeScriptResultTag, hNum(height, txNum), InvokeScriptResult.fromBytes, InvokeScriptResult.toBytes, Some(cfh.handle)) val disabledAliases: Key[Set[Alias]] = Key( DisabledAliases, @@ -223,11 +225,11 @@ object Keys { def assetStaticInfo(addr: ERC20Address): Key[Option[StaticAssetInfo]] = Key.opt(AssetStaticInfo, addr.arr, StaticAssetInfo.parseFrom, _.toByteArray) - def nftCount(addressId: AddressId): Key[Int] = - Key(NftCount, addressId.toByteArray, Option(_).fold(0)(Ints.fromByteArray), Ints.toByteArray) + def nftCount(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] = + Key(NftCount, addressId.toByteArray, Option(_).fold(0)(Ints.fromByteArray), Ints.toByteArray, Some(cfh.handle)) - def nftAt(addressId: AddressId, index: Int, assetId: IssuedAsset): Key[Option[Unit]] = - Key.opt(NftPossession, addressId.toByteArray ++ Longs.toByteArray(index) ++ assetId.id.arr, _ => (), _ => Array.emptyByteArray) + def nftAt(addressId: AddressId, index: Int, assetId: IssuedAsset, cfh: RDB.ApiHandle): Key[Option[Unit]] = + Key.opt(NftPossession, addressId.toByteArray ++ Longs.toByteArray(index) ++ assetId.id.arr, _ => (), _ => Array.emptyByteArray, Some(cfh.handle)) def stateHash(height: Int): Key[Option[StateHash]] = Key.opt(StateHash, h(height), readStateHash, writeStateHash) @@ -235,8 +237,8 @@ object Keys { def blockStateHash(height: Int): Key[ByteStr] = Key(BlockStateHash, h(height), Option(_).fold(TxStateSnapshotHashBuilder.InitStateHash)(ByteStr(_)), _.arr) - def ethereumTransactionMeta(height: Height, txNum: TxNum): Key[Option[EthereumTransactionMeta]] = - Key.opt(EthereumTransactionMetaTag, hNum(height, txNum), EthereumTransactionMeta.parseFrom, _.toByteArray) + def ethereumTransactionMeta(height: Height, txNum: TxNum, cfh: RDB.ApiHandle): Key[Option[EthereumTransactionMeta]] = + Key.opt(EthereumTransactionMetaTag, hNum(height, txNum), EthereumTransactionMeta.parseFrom, _.toByteArray, Some(cfh.handle)) def maliciousMinerBanHeights(addressBytes: Array[Byte]): Key[Seq[Int]] = historyKey(MaliciousMinerBanHeights, addressBytes) diff --git a/node/src/main/scala/com/wavesplatform/database/RDB.scala b/node/src/main/scala/com/wavesplatform/database/RDB.scala index 00a3e70077e..7c9bb13a791 100644 --- a/node/src/main/scala/com/wavesplatform/database/RDB.scala +++ b/node/src/main/scala/com/wavesplatform/database/RDB.scala @@ -1,7 +1,7 @@ package com.wavesplatform.database import com.typesafe.scalalogging.StrictLogging -import com.wavesplatform.database.RDB.{TxHandle, TxMetaHandle} +import com.wavesplatform.database.RDB.{ApiHandle, TxHandle, TxMetaHandle, TxSnapshotHandle} import com.wavesplatform.settings.DBSettings import com.wavesplatform.utils.* import org.rocksdb.* @@ -16,7 +16,8 @@ final class RDB( val db: RocksDB, val txMetaHandle: TxMetaHandle, val txHandle: TxHandle, - val txSnapshotHandle: TxHandle, + val txSnapshotHandle: TxSnapshotHandle, + val apiHandle: ApiHandle, acquiredResources: Seq[RocksObject] ) extends AutoCloseable { override def close(): Unit = { @@ -28,6 +29,9 @@ final class RDB( object RDB extends StrictLogging { final class TxMetaHandle private[RDB] (val handle: ColumnFamilyHandle) final class TxHandle private[RDB] (val handle: ColumnFamilyHandle) + final class TxSnapshotHandle private[RDB] (val handle: ColumnFamilyHandle) + final class ApiHandle private[RDB] (val handle: ColumnFamilyHandle) + case class OptionsWithResources[A](options: A, resources: Seq[RocksObject]) def open(settings: DBSettings): RDB = { @@ -36,18 +40,15 @@ object RDB extends StrictLogging { logger.debug(s"Open DB at ${settings.directory}") val dbOptions = createDbOptions(settings) - - val dbDir = file.getAbsoluteFile + val dbDir = file.getAbsoluteFile dbDir.getParentFile.mkdirs() - val handles = new util.ArrayList[ColumnFamilyHandle]() - val defaultCfOptions = newColumnFamilyOptions(12.0, 16 << 10, settings.rocksdb.mainCacheSize, 0.6, settings.rocksdb.writeBufferSize) - val defaultCfCompressionForLevels = CompressionType.NO_COMPRESSION :: // Disable compaction for L0, because it is predictable and small - List.fill(defaultCfOptions.options.numLevels() - 1)(CompressionType.LZ4_COMPRESSION) - + val handles = new util.ArrayList[ColumnFamilyHandle]() + val defaultCfOptions = newColumnFamilyOptions(12.0, 16 << 10, settings.rocksdb.mainCacheSize, 0.6, settings.rocksdb.writeBufferSize) val txMetaCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txMetaCacheSize, 0.9, settings.rocksdb.writeBufferSize) val txCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txCacheSize, 0.9, settings.rocksdb.writeBufferSize) val txSnapshotCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txSnapshotCacheSize, 0.9, settings.rocksdb.writeBufferSize) + val apiCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.apiCacheSize, 0.9, settings.rocksdb.writeBufferSize) val db = RocksDB.open( dbOptions.options, settings.directory, @@ -56,7 +57,6 @@ object RDB extends StrictLogging { RocksDB.DEFAULT_COLUMN_FAMILY, defaultCfOptions.options .setMaxWriteBufferNumber(3) - .setCompressionPerLevel(defaultCfCompressionForLevels.asJava) .setCfPaths(Seq(new DbPath(new File(dbDir, "default").toPath, 0L)).asJava) ), new ColumnFamilyDescriptor( @@ -75,6 +75,11 @@ object RDB extends StrictLogging { "tx-snapshot".utf8Bytes, txSnapshotCfOptions.options .setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava) + ), + new ColumnFamilyDescriptor( + "api".utf8Bytes, + apiCfOptions.options + .setCfPaths(Seq(new DbPath(new File(dbDir, "api").toPath, 0L)).asJava) ) ).asJava, handles @@ -84,7 +89,8 @@ object RDB extends StrictLogging { db, new TxMetaHandle(handles.get(1)), new TxHandle(handles.get(2)), - new TxHandle(handles.get(3)), + new TxSnapshotHandle(handles.get(3)), + new ApiHandle(handles.get(4)), dbOptions.resources ++ defaultCfOptions.resources ++ txMetaCfOptions.resources ++ txCfOptions.resources ++ txSnapshotCfOptions.resources ) } @@ -109,7 +115,6 @@ object RDB extends StrictLogging { .setPinL0FilterAndIndexBlocksInCache(true) .setFormatVersion(5) .setBlockSize(blockSize) - .setChecksumType(ChecksumType.kNoChecksum) .setBlockCache(blockCache) .setCacheIndexAndFilterBlocksWithHighPriority(true) .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) @@ -132,12 +137,13 @@ object RDB extends StrictLogging { private def createDbOptions(settings: DBSettings): OptionsWithResources[DBOptions] = { val dbOptions = new DBOptions() .setCreateIfMissing(true) - .setParanoidChecks(true) + .setParanoidChecks(settings.rocksdb.paranoidChecks) .setIncreaseParallelism(6) .setBytesPerSync(2 << 20) .setCreateMissingColumnFamilies(true) .setMaxOpenFiles(100) .setMaxSubcompactions(2) // Write stalls expected without this option. Can lead to max_background_jobs * max_subcompactions background threads + .setMaxManifestFileSize(200 << 20) if (settings.rocksdb.enableStatistics) { val statistics = new Statistics() diff --git a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala index aab88daa932..c2f8361fceb 100644 --- a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala +++ b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory import sun.nio.ch.Util import java.nio.ByteBuffer +import java.time.Duration import java.util import java.util.concurrent.* import scala.annotation.tailrec @@ -115,14 +116,12 @@ object RocksDBWriter extends ScorexLogging { settings: BlockchainSettings, dbSettings: DBSettings, isLightMode: Boolean, - bfBlockInsertions: Int = 10000, forceCleanupExecutorService: Option[ExecutorService] = None ): RocksDBWriter = new RocksDBWriter( rdb, settings, dbSettings, isLightMode, - bfBlockInsertions, dbSettings.cleanupInterval match { case None => MoreExecutors.newDirectExecutorService() // We don't care if disabled case Some(_) => @@ -147,7 +146,6 @@ class RocksDBWriter( val settings: BlockchainSettings, val dbSettings: DBSettings, isLightMode: Boolean, - bfBlockInsertions: Int = 10000, cleanupExecutorService: ExecutorService ) extends Caches with AutoCloseable { @@ -215,20 +213,18 @@ class RocksDBWriter( writableDB.get(Keys.data(addressId, key)) } - override protected def loadEntryHeights(keys: Iterable[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] = { - val keyBufs = database.getKeyBuffersFromKeys(keys.map { case (addr, k) => Keys.data(addressIdOf(addr), k) }.toVector) - val valBufs = database.getValueBuffers(keys.size, 8) - val valueBuf = new Array[Byte](8) + override protected def loadEntryHeights(keys: Seq[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] = { + val keyBufs = database.getKeyBuffersFromKeys(keys.view.map { case (addr, k) => Keys.data(addressIdOf(addr), k) }.toVector) + val valBufs = database.getValueBuffers(keys.size, 4) val result = rdb.db .multiGetByteBuffers(keyBufs.asJava, valBufs.asJava) .asScala .view .zip(keys) - .map { case (status, k @ (_, key)) => + .map { case (status, k) => if (status.status.getCode == Status.Code.Ok) { - status.value.get(valueBuf) - k -> readCurrentData(key)(valueBuf).height + k -> Height(status.value.getInt) } else k -> Height(0) } .toMap @@ -399,11 +395,11 @@ class RocksDBWriter( } for ((addressId, nftIds) <- updatedNftLists.asMap().asScala) { - val kCount = Keys.nftCount(AddressId(addressId.toLong)) + val kCount = Keys.nftCount(AddressId(addressId.toLong), rdb.apiHandle) val previousNftCount = rw.get(kCount) rw.put(kCount, previousNftCount + nftIds.size()) for ((id, idx) <- nftIds.asScala.zipWithIndex) { - rw.put(Keys.nftAt(AddressId(addressId.toLong), previousNftCount + idx, id), Some(())) + rw.put(Keys.nftAt(AddressId(addressId.toLong), previousNftCount + idx, id, rdb.apiHandle), Some(())) } } @@ -430,32 +426,52 @@ class RocksDBWriter( } } - // todo: instead of fixed-size block batches, store fixed-time batches - private val BlockStep = 200 - private def mkFilter() = BloomFilter.create[Array[Byte]](Funnels.byteArrayFunnel(), BlockStep * bfBlockInsertions, 0.01f) - private def initFilters(): (BloomFilter[Array[Byte]], BloomFilter[Array[Byte]]) = { - def loadFilter(heights: Seq[Int]): BloomFilter[Array[Byte]] = { - val filter = mkFilter() - heights.filter(_ > 0).foreach { h => - loadTransactions(Height(h), rdb).foreach { case (_, tx) => filter.put(tx.id().arr) } + private var TxFilterResetTs = lastBlock.fold(0L)(_.header.timestamp) + private def mkFilter() = BloomFilter.create[Array[Byte]](Funnels.byteArrayFunnel(), 1_000_000, 0.001f) + private var currentTxFilter = mkFilter() + private var prevTxFilter = lastBlock match { + case Some(b) => + TxFilterResetTs = b.header.timestamp + val prevFilter = mkFilter() + + var fromHeight = height + Using(writableDB.newIterator()) { iter => + iter.seek(Keys.blockMetaAt(Height(height)).keyBytes) + var lastBlockTs = TxFilterResetTs + + while ( + iter.isValid && + iter.key().startsWith(KeyTags.BlockInfoAtHeight.prefixBytes) && + (TxFilterResetTs - lastBlockTs) < settings.functionalitySettings.maxTransactionTimeBackOffset.toMillis * 2 + ) { + lastBlockTs = readBlockMeta(iter.value()).getHeader.timestamp + fromHeight = Ints.fromByteArray(iter.key().drop(2)) + iter.prev() + } } - filter - } - val lastFilterStart = (height / BlockStep) * BlockStep + 1 - val prevFilterStart = lastFilterStart - BlockStep - val (bf0Heights, bf1Heights) = if ((height / BlockStep) % 2 == 0) { - (lastFilterStart to height, prevFilterStart until lastFilterStart) - } else { - (prevFilterStart until lastFilterStart, lastFilterStart to height) - } - (loadFilter(bf0Heights), loadFilter(bf1Heights)) - } + Using(writableDB.newIterator(rdb.txHandle.handle)) { iter => + var counter = 0 + iter.seek(Keys.transactionAt(Height(fromHeight), TxNum(0.toShort), rdb.txHandle).keyBytes) + while ( + iter.isValid && + iter.key().startsWith(KeyTags.NthTransactionInfoAtHeight.prefixBytes) && + Ints.fromByteArray(iter.key().slice(2, 6)) <= height + ) { + counter += 1 + prevFilter.put(readTransaction(Height(0))(iter.value())._2.id().arr) + iter.next() + } + log.debug(s"Loaded $counter tx IDs from [$fromHeight, $height]. Filter size is ${memMeter.measureDeep(prevFilter)} bytes") + } - private var (bf0, bf1) = initFilters() + prevFilter + case None => + mkFilter() + } override def containsTransaction(tx: Transaction): Boolean = - (bf0.mightContain(tx.id().arr) || bf1.mightContain(tx.id().arr)) && { + (prevTxFilter.mightContain(tx.id().arr) || currentTxFilter.mightContain(tx.id().arr)) && { writableDB.get(Keys.transactionMetaById(TransactionId(tx.id()), rdb.txMetaHandle)).isDefined } @@ -586,14 +602,13 @@ class RocksDBWriter( rw.put(Keys.assetScript(asset)(height), Some(script)) } - if (height % BlockStep == 1) { - if ((height / BlockStep) % 2 == 0) { - bf0 = mkFilter() - } else { - bf1 = mkFilter() - } + if (blockMeta.getHeader.timestamp - TxFilterResetTs > settings.functionalitySettings.maxTransactionTimeBackOffset.toMillis * 2) { + log.trace(s"Rotating filter at $height, prev ts = $TxFilterResetTs, new ts = ${blockMeta.getHeader.timestamp}, interval = ${Duration + .ofMillis(blockMeta.getHeader.timestamp - TxFilterResetTs)}") + TxFilterResetTs = blockMeta.getHeader.timestamp + prevTxFilter = currentTxFilter + currentTxFilter = mkFilter() } - val targetBf = if ((height / BlockStep) % 2 == 0) bf0 else bf1 val transactionsWithSize = snapshot.transactions.zipWithIndex.map { case ((id, txInfo), i) => @@ -608,14 +623,14 @@ class RocksDBWriter( Some(PBSnapshots.toProtobuf(txInfo.snapshot, txInfo.status)) ) rw.put(Keys.transactionMetaById(txId, rdb.txMetaHandle), Some(TransactionMeta(height, num, tx.tpe.id, meta.status.protobuf, 0, size))) - targetBf.put(id.arr) + currentTxFilter.put(id.arr) txId -> (num, tx, size) }.toMap if (dbSettings.storeTransactionsByAddress) { val addressTxs = addressTransactions.asScala.toSeq.map { case (aid, txIds) => - (aid, txIds, Keys.addressTransactionSeqNr(aid)) + (aid, txIds, Keys.addressTransactionSeqNr(aid, rdb.apiHandle)) } rw.multiGetInts(addressTxs.view.map(_._3).toVector) .zip(addressTxs) @@ -625,7 +640,7 @@ class RocksDBWriter( val (num, tx, size) = transactionsWithSize(txId) (tx.tpe.id.toByte, num, size) }.toSeq - rw.put(Keys.addressTransactionHN(addressId, nextSeqNr), Some((Height(height), txTypeNumSeq.sortBy(-_._2)))) + rw.put(Keys.addressTransactionHN(addressId, nextSeqNr, rdb.apiHandle), Some((Height(height), txTypeNumSeq.sortBy(-_._2)))) rw.put(txSeqNrKey, nextSeqNr) } } @@ -637,13 +652,15 @@ class RocksDBWriter( address <- Seq(details.recipientAddress, details.sender.toAddress) addressId = this.addressIdWithFallback(address, newAddresses) } yield (addressId, leaseId) - val leaseIdsByAddressId = addressIdWithLeaseIds.groupMap { case (addressId, _) => (addressId, Keys.addressLeaseSeqNr(addressId)) }(_._2).toSeq + val leaseIdsByAddressId = addressIdWithLeaseIds.groupMap { case (addressId, _) => + (addressId, Keys.addressLeaseSeqNr(addressId, rdb.apiHandle)) + }(_._2).toSeq rw.multiGetInts(leaseIdsByAddressId.view.map(_._1._2).toVector) .zip(leaseIdsByAddressId) .foreach { case (prevSeqNr, ((addressId, leaseSeqKey), leaseIds)) => val nextSeqNr = prevSeqNr.getOrElse(0) + 1 - rw.put(Keys.addressLeaseSeq(addressId, nextSeqNr), Some(leaseIds)) + rw.put(Keys.addressLeaseSeq(addressId, nextSeqNr, rdb.apiHandle), Some(leaseIds)) rw.put(leaseSeqKey, nextSeqNr) } } @@ -698,7 +715,7 @@ class RocksDBWriter( }) .getOrElse(throw new IllegalArgumentException(s"Couldn't find transaction height and num: $txId")) - try rw.put(Keys.invokeScriptResult(txHeight, txNum), Some(result)) + try rw.put(Keys.invokeScriptResult(txHeight, txNum, rdb.apiHandle), Some(result)) catch { case NonFatal(e) => throw new RuntimeException(s"Error storing invoke script result for $txId: $result", e) @@ -707,7 +724,7 @@ class RocksDBWriter( for ((txId, pbMeta) <- snapshot.ethereumTransactionMeta) { val txNum = transactionsWithSize(TransactionId @@ txId)._1 - val key = Keys.ethereumTransactionMeta(Height(height), txNum) + val key = Keys.ethereumTransactionMeta(Height(height), txNum, rdb.apiHandle) rw.put(key, Some(pbMeta)) } @@ -975,11 +992,9 @@ class RocksDBWriter( for ((addressId, address) <- changedAddresses) { for (k <- rw.get(Keys.changedDataKeys(currentHeight, addressId))) { - log.trace(s"Discarding $k for $address at $currentHeight") accountDataToInvalidate += (address -> k) - rw.delete(Keys.dataAt(addressId, k)(currentHeight)) - rollbackDataHistory(rw, Keys.data(addressId, k), Keys.dataAt(addressId, k)(_), currentHeight) + rollbackDataEntry(rw, k, address, addressId, currentHeight) } rw.delete(Keys.changedDataKeys(currentHeight, addressId)) @@ -993,9 +1008,9 @@ class RocksDBWriter( discardLeaseBalance(address) if (dbSettings.storeTransactionsByAddress) { - val kTxSeqNr = Keys.addressTransactionSeqNr(addressId) + val kTxSeqNr = Keys.addressTransactionSeqNr(addressId, rdb.apiHandle) val txSeqNr = rw.get(kTxSeqNr) - val kTxHNSeq = Keys.addressTransactionHN(addressId, txSeqNr) + val kTxHNSeq = Keys.addressTransactionHN(addressId, txSeqNr, rdb.apiHandle) rw.get(kTxHNSeq).collect { case (`currentHeight`, _) => rw.delete(kTxHNSeq) @@ -1004,9 +1019,9 @@ class RocksDBWriter( } if (dbSettings.storeLeaseStatesByAddress) { - val leaseSeqNrKey = Keys.addressLeaseSeqNr(addressId) + val leaseSeqNrKey = Keys.addressLeaseSeqNr(addressId, rdb.apiHandle) val leaseSeqNr = rw.get(leaseSeqNrKey) - val leaseSeqKey = Keys.addressLeaseSeq(addressId, leaseSeqNr) + val leaseSeqKey = Keys.addressLeaseSeq(addressId, leaseSeqNr, rdb.apiHandle) rw.get(leaseSeqKey) .flatMap(_.headOption) .flatMap(leaseDetails) @@ -1054,7 +1069,7 @@ class RocksDBWriter( case _: DataTransaction => // see changed data keys removal case _: InvokeScriptTransaction | _: InvokeExpressionTransaction => - rw.delete(Keys.invokeScriptResult(currentHeight, num)) + rw.delete(Keys.invokeScriptResult(currentHeight, num, rdb.apiHandle)) case tx: CreateAliasTransaction => rw.delete(Keys.addressIdOfAlias(tx.alias)) @@ -1063,7 +1078,7 @@ class RocksDBWriter( ordersToInvalidate += rollbackOrderFill(rw, tx.buyOrder.id(), currentHeight) ordersToInvalidate += rollbackOrderFill(rw, tx.sellOrder.id(), currentHeight) case _: EthereumTransaction => - rw.delete(Keys.ethereumTransactionMeta(currentHeight, num)) + rw.delete(Keys.ethereumTransactionMeta(currentHeight, num, rdb.apiHandle)) } if (tx.tpe != TransactionType.Genesis) { @@ -1132,14 +1147,20 @@ class RocksDBWriter( discardedBlocks.reverse } - private def rollbackDataHistory(rw: RW, currentDataKey: Key[CurrentData], dataNodeKey: Height => Key[DataNode], currentHeight: Height): Unit = { - val currentData = rw.get(currentDataKey) + private def rollbackDataEntry(rw: RW, key: String, address: Address, addressId: AddressId, currentHeight: Height): Unit = { + val currentDataKey = Keys.data(addressId, key) + val currentData = rw.get(currentDataKey) + rw.delete(Keys.dataAt(addressId, key)(currentHeight)) if (currentData.height == currentHeight) { - val prevDataNode = rw.get(dataNodeKey(currentData.prevHeight)) - rw.delete(dataNodeKey(currentHeight)) - prevDataNode.entry match { - case _: EmptyDataEntry => rw.delete(currentDataKey) - case _ => rw.put(currentDataKey, CurrentData(prevDataNode.entry, currentData.prevHeight, prevDataNode.prevHeight)) + if (currentData.prevHeight > 0) { + val prevDataNode = rw.get(Keys.dataAt(addressId, key)(currentData.prevHeight)) + log.trace( + s"PUT $address($addressId)/$key: ${currentData.entry}@$currentHeight => ${prevDataNode.entry}@${currentData.prevHeight}>${prevDataNode.prevHeight}" + ) + rw.put(currentDataKey, CurrentData(prevDataNode.entry, currentData.prevHeight, prevDataNode.prevHeight)) + } else { + log.trace(s"DEL $address($addressId)/$key: ${currentData.entry}@$currentHeight => EMPTY@${currentData.prevHeight}") + rw.delete(currentDataKey) } } } diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index b147da9fa5a..b0fc6bc5400 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -423,7 +423,7 @@ package object database { def withReadOptions[A](f: ReadOptions => A): A = { val snapshot = db.getSnapshot - val ro = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) + val ro = new ReadOptions().setSnapshot(snapshot) try f(ro) finally { ro.close() @@ -536,7 +536,11 @@ package object database { } finally iterator.close() } - def resourceObservable: Observable[DBResource] = Observable.resource(Task(DBResource(db)))(r => Task(r.close())) + def resourceObservable: Observable[DBResource] = + Observable.resource(Task(DBResource(db, None)))(r => Task(r.close())) + + def resourceObservable(iteratorCfHandle: ColumnFamilyHandle): Observable[DBResource] = + Observable.resource(Task(DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close())) def withResource[A](f: DBResource => A): A = { val resource = DBResource(db) @@ -544,6 +548,12 @@ package object database { finally resource.close() } + def withResource[A](iteratorCfHandle: ColumnFamilyHandle)(f: DBResource => A): A = { + val resource = DBResource(db, Some(iteratorCfHandle)) + try f(resource) + finally resource.close() + } + private def multiGetOpt[A]( readOptions: ReadOptions, keys: collection.IndexedSeq[Key[Option[A]]], diff --git a/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala b/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala index 3c3ece9bf70..9fd841ebb93 100644 --- a/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala +++ b/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala @@ -9,7 +9,7 @@ import com.wavesplatform.utils.{ScorexLogging, Time, UnsupportedFeature, forceSt import org.rocksdb.RocksDB object StorageFactory extends ScorexLogging { - private val StorageVersion = 1 + private val StorageVersion = 2 def apply( settings: WavesSettings, diff --git a/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala b/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala index 511bd6861d6..906e085557c 100644 --- a/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala @@ -1,5 +1,4 @@ package com.wavesplatform.settings -import scala.concurrent.duration.FiniteDuration case class DBSettings( directory: String, @@ -10,7 +9,5 @@ case class DBSettings( maxCacheSize: Int, maxRollbackDepth: Int, cleanupInterval: Option[Int] = None, - rememberBlocks: FiniteDuration, - useBloomFilter: Boolean, - rocksdb: RocksDBSettings + rocksdb: RocksDBSettings, ) diff --git a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala index a31434526b2..30ad8d172a7 100644 --- a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala @@ -5,6 +5,8 @@ case class RocksDBSettings( txCacheSize: SizeInBytes, txMetaCacheSize: SizeInBytes, txSnapshotCacheSize: SizeInBytes, + apiCacheSize: SizeInBytes, writeBufferSize: SizeInBytes, - enableStatistics: Boolean + enableStatistics: Boolean, + paranoidChecks: Boolean ) diff --git a/node/src/test/resources/application.conf b/node/src/test/resources/application.conf index 1604353306e..98841a94ec5 100644 --- a/node/src/test/resources/application.conf +++ b/node/src/test/resources/application.conf @@ -4,8 +4,7 @@ waves { wallet.password = "some string as password" db { - cleanup-interval = null # Disable in tests by default - + max-cache-size = 1 rocksdb { main-cache-size = 1K tx-cache-size = 1K diff --git a/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala b/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala index 5bc42e0dfb6..b8aab174712 100644 --- a/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala +++ b/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala @@ -18,7 +18,6 @@ object TestStorageFactory { settings.blockchainSettings, settings.dbSettings, settings.enableLightMode, - 100, Some(MoreExecutors.newDirectExecutorService()) ) ( diff --git a/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala b/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala index 58bd629cef3..5e07e7de21b 100644 --- a/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala +++ b/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala @@ -1,10 +1,17 @@ package com.wavesplatform.db -import org.rocksdb.{ReadOptions, RocksDB, RocksIterator} +import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator} import java.util.concurrent.locks.Lock case class InterferableDB(db: RocksDB, startRead: Lock) extends RocksDB(db.getNativeHandle) { + override def getDefaultColumnFamily: ColumnFamilyHandle = db.getDefaultColumnFamily + + override def newIterator(columnFamilyHandle: ColumnFamilyHandle, readOptions: ReadOptions): RocksIterator = { + startRead.lock() + db.newIterator(columnFamilyHandle, readOptions) + } + override def newIterator(options: ReadOptions): RocksIterator = { startRead.lock() db.newIterator(options) diff --git a/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala b/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala new file mode 100644 index 00000000000..fd43e9b3e21 --- /dev/null +++ b/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala @@ -0,0 +1,34 @@ +package com.wavesplatform.db + +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.settings.WavesSettings +import com.wavesplatform.test.* +import com.wavesplatform.transaction.TxHelpers + +class TxBloomFilterSpec extends PropSpec with SharedDomain { + private val richAccount = TxHelpers.signer(1200) + + override def settings: WavesSettings = DomainPresets.TransactionStateSnapshot + + override def genesisBalances: Seq[AddrWithBalance] = Seq(AddrWithBalance(richAccount.toAddress, 10000.waves)) + + property("Filter rotation works") { + val transfer = TxHelpers.transfer(richAccount, TxHelpers.address(1201), 10.waves) + 1 to 8 foreach { _ => domain.appendBlock() } + domain.blockchain.height shouldEqual 9 + domain.appendBlock(transfer) // transfer at height 10 + domain.appendBlock() // height = 11 + domain.appendBlock() // solid state height = 11, filters are rotated + domain.appendBlockE(transfer) should produce("AlreadyInTheState") + + domain.appendBlock() + val tf2 = TxHelpers.transfer(richAccount, TxHelpers.address(1202), 20.waves) + domain.appendBlock(tf2) + 1 to 20 foreach { _ => + withClue(s"height = ${domain.blockchain.height}") { + domain.appendBlockE(tf2) should produce("AlreadyInTheState") + } + domain.appendBlock() + } + } +} diff --git a/node/src/test/scala/com/wavesplatform/db/WithState.scala b/node/src/test/scala/com/wavesplatform/db/WithState.scala index 05446794346..cab873284ee 100644 --- a/node/src/test/scala/com/wavesplatform/db/WithState.scala +++ b/node/src/test/scala/com/wavesplatform/db/WithState.scala @@ -72,7 +72,7 @@ trait WithState extends BeforeAndAfterAll with DBCacheSettings with Matchers wit ) Using.resource(rdw)(test) } finally { - Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txMetaHandle.handle).foreach { cfh => + Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txMetaHandle.handle, rdb.apiHandle.handle).foreach { cfh => rdb.db.deleteRange(cfh, MinKey, MaxKey) } } @@ -395,7 +395,7 @@ trait WithDomain extends WithState { _: Suite => try { val wrappedDb = wrapDB(rdb.db) assert(wrappedDb.getNativeHandle == rdb.db.getNativeHandle, "wrap function should not create new database instance") - domain = Domain(new RDB(wrappedDb, rdb.txMetaHandle, rdb.txHandle, rdb.txSnapshotHandle, Seq.empty), bcu, blockchain, settings) + domain = Domain(new RDB(wrappedDb, rdb.txMetaHandle, rdb.txHandle, rdb.txSnapshotHandle, rdb.apiHandle, Seq.empty), bcu, blockchain, settings) val genesis = balances.map { case AddrWithBalance(address, amount) => TxHelpers.genesis(address, amount) } diff --git a/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala b/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala index 0b9ff018b22..bee4b3eb7fa 100644 --- a/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala +++ b/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala @@ -89,7 +89,7 @@ class BlockchainUpdaterNFTTest extends PropSpec with DomainScenarioDrivenPropert val persistedNfts = Seq.newBuilder[IssuedAsset] d.rdb.db.readOnly { ro => val addressId = ro.get(Keys.addressId(firstAccount)).get - ro.iterateOver(KeyTags.NftPossession.prefixBytes ++ addressId.toByteArray) { e => + ro.iterateOver(KeyTags.NftPossession.prefixBytes ++ addressId.toByteArray, Some(d.rdb.apiHandle.handle)) { e => persistedNfts += IssuedAsset(ByteStr(e.getKey.takeRight(32))) } } @@ -99,7 +99,6 @@ class BlockchainUpdaterNFTTest extends PropSpec with DomainScenarioDrivenPropert val settings = settingsWithFeatures(BlockchainFeatures.NG, BlockchainFeatures.ReduceNFTFee) withDomain(settings)(assert) - withDomain(settings.copy(dbSettings = settings.dbSettings.copy(useBloomFilter = true)))(assert) } } diff --git a/node/src/test/scala/com/wavesplatform/history/Domain.scala b/node/src/test/scala/com/wavesplatform/history/Domain.scala index 58def9c43cc..ff6b3155941 100644 --- a/node/src/test/scala/com/wavesplatform/history/Domain.scala +++ b/node/src/test/scala/com/wavesplatform/history/Domain.scala @@ -197,7 +197,7 @@ case class Domain(rdb: RDB, blockchainUpdater: BlockchainUpdaterImpl, rocksDBWri def balance(address: Address): Long = blockchainUpdater.balance(address) def balance(address: Address, asset: Asset): Long = blockchainUpdater.balance(address, asset) - def nftList(address: Address): Seq[(IssuedAsset, AssetDescription)] = rdb.db.withResource { resource => + def nftList(address: Address): Seq[(IssuedAsset, AssetDescription)] = rdb.db.withResource(rdb.apiHandle.handle) { resource => AddressPortfolio .nftIterator(resource, address, blockchainUpdater.bestLiquidSnapshot.orEmpty, None, blockchainUpdater.assetDescription) .toSeq diff --git a/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala b/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala new file mode 100644 index 00000000000..074940ba3a9 --- /dev/null +++ b/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala @@ -0,0 +1,60 @@ +package com.wavesplatform.state + +import com.wavesplatform.db.WithState +import com.wavesplatform.db.WithState.AddrWithBalance +import com.wavesplatform.lang.directives.values.V7 +import com.wavesplatform.lang.v1.compiler.TestCompiler +import com.wavesplatform.settings.WavesSettings +import com.wavesplatform.test.* +import com.wavesplatform.transaction.TxHelpers + +class DataKeyRollback extends PropSpec with SharedDomain { + private val richAccount = TxHelpers.signer(1500) + + override def genesisBalances: Seq[WithState.AddrWithBalance] = Seq(AddrWithBalance(richAccount.toAddress, 10_000_000.waves)) + override def settings: WavesSettings = DomainPresets.TransactionStateSnapshot + + property("check new entries") { + val oracleAccount = TxHelpers.signer(1501) + val dappAccount = TxHelpers.signer(1502) + + val dataSenderCount = 5 + val dataEntryCount = 5 + + val dataSenders = IndexedSeq.tabulate(dataSenderCount)(i => TxHelpers.signer(1550 + i)) + domain.appendBlock( + TxHelpers + .massTransfer( + richAccount, + dataSenders.map(kp => kp.toAddress -> 100.waves) ++ + Seq(oracleAccount.toAddress -> 100.waves, dappAccount.toAddress -> 10.waves), + fee = 0.05.waves + ), + TxHelpers.setScript( + dappAccount, + TestCompiler(V7).compileContract(s""" + let oracleAddress = Address(base58'${oracleAccount.toAddress}') + @Callable(i) + func default() = [ + IntegerEntry("loadedHeight_" + height.toString() + i.transactionId.toBase58String(), oracleAddress.getIntegerValue("lastUpdatedBlock")) + ] + """) + ), + TxHelpers.data(oracleAccount, Seq(IntegerDataEntry("lastUpdatedBlock", 2))) + ) + domain.appendBlock(dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 501)), 0.01.waves))*) + domain.appendBlock(dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 503)), 0.01.waves))*) + domain.appendBlock( + (dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 504)), 0.01.waves)) ++ + Seq( + TxHelpers.invoke(dappAccount.toAddress, invoker = richAccount), + TxHelpers.data(oracleAccount, Seq(IntegerDataEntry("lastUpdatedBlock", 5))) + ))* + ) + domain.appendBlock() + val discardedBlocks = domain.rollbackTo(domain.blockchain.blockId(domain.blockchain.height - 2).get) + discardedBlocks.foreach { case (block, _, _) => + domain.appendBlock(block) + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8cce403945c..e169b401214 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -32,19 +32,19 @@ object Dependencies { val janino = "org.codehaus.janino" % "janino" % "3.1.11" val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.12.3" val curve25519 = "com.wavesplatform" % "curve25519-java" % "0.6.6" - val nettyHandler = "io.netty" % "netty-handler" % "4.1.104.Final" + val nettyHandler = "io.netty" % "netty-handler" % "4.1.106.Final" val shapeless = Def.setting("com.chuusai" %%% "shapeless" % "2.3.10") - val playJson = "com.typesafe.play" %% "play-json" % "2.10.3" // 2.10.x and later is built for Java 11 + val playJson = "com.typesafe.play" %% "play-json" % "2.10.4" val scalaTest = "org.scalatest" %% "scalatest" % "3.2.17" % Test val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.2" % Test) - val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.1" // 3.6.x and later is built for Java 11 - val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.1" + val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.2" + val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.2" - val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk15on" % "1.70" + val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk18on" % "1.77" val console = Seq("com.github.scopt" %% "scopt" % "4.1.0") @@ -69,7 +69,7 @@ object Dependencies { curve25519, bouncyCastleProvider, "com.wavesplatform" % "zwaves" % "0.2.1", - web3jModule("crypto") + web3jModule("crypto").excludeAll(ExclusionRule("org.bouncycastle", "bcprov-jdk15on")), ) ++ langCompilerPlugins.value ++ scalapbRuntime.value ++ protobuf.value ) @@ -100,7 +100,7 @@ object Dependencies { akkaModule("slf4j") % Runtime ) - private val rocksdb = "org.rocksdb" % "rocksdbjni" % "8.9.1" + private val rocksdb = "org.rocksdb" % "rocksdbjni" % "8.10.0" lazy val node = Def.setting( Seq( @@ -127,10 +127,10 @@ object Dependencies { monixModule("reactive").value, nettyHandler, "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", - "eu.timepit" %% "refined" % "0.11.0" exclude ("org.scala-lang.modules", "scala-xml_2.13"), + "eu.timepit" %% "refined" % "0.11.1" exclude ("org.scala-lang.modules", "scala-xml_2.13"), "com.esaulpaugh" % "headlong" % "10.0.2", "com.github.jbellis" % "jamm" % "0.4.0", // Weighing caches - web3jModule("abi"), + web3jModule("abi").excludeAll(ExclusionRule("org.bouncycastle", "bcprov-jdk15on")), akkaModule("testkit") % Test, akkaHttpModule("akka-http-testkit") % Test ) ++ test ++ console ++ logDeps ++ protobuf.value ++ langCompilerPlugins.value @@ -161,8 +161,6 @@ object Dependencies { lazy val rideRunner = Def.setting( Seq( rocksdb, - // https://github.com/netty/netty/wiki/Native-transports - // "io.netty" % "netty-transport-native-epoll" % "4.1.79.Final" classifier "linux-x86_64", "com.github.ben-manes.caffeine" % "caffeine" % "3.1.8", "net.logstash.logback" % "logstash-logback-encoder" % "7.4" % Runtime, kamonModule("caffeine"), diff --git a/project/plugins.sbt b/project/plugins.sbt index 7cb18557cca..9c052045cf2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,7 +6,7 @@ resolvers ++= Seq( // Should go before Scala.js addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") -libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.14" +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.15" Seq( "com.eed3si9n" % "sbt-assembly" % "2.1.5", diff --git a/repl/jvm/src/test/logback-test.xml b/repl/jvm/src/test/logback-test.xml deleted file mode 100644 index 8b4e22b2f22..00000000000 --- a/repl/jvm/src/test/logback-test.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - %date %-5level [%.15thread] %logger{26} - %msg%n - - - - - - - - - diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala index 919ddd1bd63..fdd474f5b53 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala @@ -53,7 +53,6 @@ object KeyTags extends Enumeration { AssetStaticInfo, NftCount, NftPossession, - BloomFilterChecksum, IssuedAssets, UpdatedAssets, SponsoredAssets,