From 8cbd332c764cb716414e1cd9dd8b0914e4a32d7c Mon Sep 17 00:00:00 2001 From: Ivan Mashonskii Date: Thu, 12 Oct 2023 14:26:09 +0300 Subject: [PATCH] NODE-2617 Return asset distribution route --- .../wavesplatform/it/api/AsyncHttpApi.scala | 7 ++- .../wavesplatform/it/api/SyncHttpApi.scala | 5 ++- .../it/asset/IssueReissueBurnAssetSuite.scala | 8 ++-- .../it/sync/AssetDistributionSuite.scala | 44 +++++++++++++++++++ .../main/resources/swagger-ui/openapi.yaml | 32 ++++++++++++++ .../scala/com/wavesplatform/Application.scala | 6 +-- .../api/http/assets/AssetsApiRoute.scala | 39 ++++++++++++++-- .../api/http/CustomJsonMarshallerSpec.scala | 1 + .../wavesplatform/http/AssetsRouteSpec.scala | 1 + 9 files changed, 130 insertions(+), 13 deletions(-) diff --git a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala index 0dacbcb697..d2d26afa7b 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala @@ -23,7 +23,7 @@ import com.wavesplatform.lang.v1.FunctionHeader import com.wavesplatform.lang.v1.compiler.Terms import com.wavesplatform.lang.v1.compiler.Terms.FUNCTION_CALL import com.wavesplatform.state.DataEntry.Format -import com.wavesplatform.state.{AssetDistributionPage, DataEntry, EmptyDataEntry, LeaseBalance, Portfolio} +import com.wavesplatform.state.{AssetDistribution, AssetDistributionPage, DataEntry, EmptyDataEntry, LeaseBalance, Portfolio} import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves} import com.wavesplatform.transaction.assets.* import com.wavesplatform.transaction.assets.exchange.{Order, ExchangeTransaction as ExchangeTx} @@ -338,6 +338,11 @@ object AsyncHttpApi extends Assertions { get(url, amountsAsStrings).as[AssetDistributionPage](amountsAsStrings) } + def assetDistribution(asset: String, amountsAsStrings: Boolean = false): Future[AssetDistribution] = { + val req = s"/assets/$asset/distribution" + get(req, amountsAsStrings).as[AssetDistribution](amountsAsStrings) + } + def effectiveBalance(address: String, confirmations: Option[Int] = None, amountsAsStrings: Boolean = false): Future[Balance] = { val maybeConfirmations = confirmations.fold("")(a => s"/$a") get(s"/addresses/effectiveBalance/$address$maybeConfirmations", amountsAsStrings).as[Balance](amountsAsStrings) diff --git a/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala b/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala index 2a090fa609..e9ab3c5ecb 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala @@ -14,7 +14,7 @@ import com.wavesplatform.it.Node import com.wavesplatform.it.sync.* import com.wavesplatform.lang.script.v1.ExprScript import com.wavesplatform.lang.v1.compiler.Terms -import com.wavesplatform.state.{AssetDistributionPage, DataEntry} +import com.wavesplatform.state.{AssetDistribution, AssetDistributionPage, DataEntry} import com.wavesplatform.transaction.assets.exchange.Order import com.wavesplatform.transaction.lease.{LeaseCancelTransaction, LeaseTransaction} import com.wavesplatform.transaction.smart.InvokeScriptTransaction @@ -261,6 +261,9 @@ object SyncHttpApi extends Assertions with matchers.should.Matchers { ): AssetDistributionPage = sync(async(n).assetDistributionAtHeight(asset, height, limit, maybeAfter, amountsAsStrings)) + def assetDistribution(asset: String): AssetDistribution = + sync(async(n).assetDistribution(asset)) + def broadcastIssue( source: KeyPair, name: String, diff --git a/node-it/src/test/scala/com/wavesplatform/it/asset/IssueReissueBurnAssetSuite.scala b/node-it/src/test/scala/com/wavesplatform/it/asset/IssueReissueBurnAssetSuite.scala index 227c2b1325..355adaa7d9 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/asset/IssueReissueBurnAssetSuite.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/asset/IssueReissueBurnAssetSuite.scala @@ -250,15 +250,15 @@ class IssueReissueBurnAssetSuite extends BaseFreeSpec { val acc = createDapp(script(simpleReissuableAsset)) val asset = issueValidated(acc, simpleReissuableAsset) invokeScript(acc, "transferAndBurn", assetId = asset, count = 100) - val height1 = nodes.waitForHeightArise() - sender.assetDistributionAtHeight(asset, height1 - 1, 10).items.map { case (a, v) => a.toString -> v } shouldBe Map( + nodes.waitForHeightArise() + sender.assetDistribution(asset).map { case (a, v) => a.toString -> v } shouldBe Map( miner.address -> 100L, acc.toAddress.toString -> (simpleReissuableAsset.quantity - 200) ) reissue(acc, CallableMethod, asset, 400, reissuable = false) invokeScript(acc, "transferAndBurn", assetId = asset, count = 100) - val height2 = nodes.waitForHeightArise() - sender.assetDistributionAtHeight(asset, height2 - 1, 10).items.map { case (a, v) => a.toString -> v } shouldBe Map( + nodes.waitForHeightArise() + sender.assetDistribution(asset).map { case (a, v) => a.toString -> v } shouldBe Map( miner.address -> 200L, acc.toAddress.toString -> simpleReissuableAsset.quantity ) diff --git a/node-it/src/test/scala/com/wavesplatform/it/sync/AssetDistributionSuite.scala b/node-it/src/test/scala/com/wavesplatform/it/sync/AssetDistributionSuite.scala index 2355523ffa..39cd533c6b 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/sync/AssetDistributionSuite.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/sync/AssetDistributionSuite.scala @@ -8,6 +8,8 @@ import com.wavesplatform.state.AssetDistributionPage import com.wavesplatform.transaction.transfer.MassTransferTransaction import org.scalatest.CancelAfterFailure +import scala.concurrent.duration.* + class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailure { lazy val node: Node = nodes.head @@ -47,6 +49,8 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur val issuerAssetDis = assetDis.view.filterKeys(_ == issuer.toAddress).values + assetDis should be equals node.assetDistribution(issueTx) + issuerAssetDis.size shouldBe 1 issuerAssetDis.head shouldBe (issueAmount - addresses.length * transferAmount) @@ -68,6 +72,30 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur ) } + test("'Asset distribution' works properly") { + val receivers = for (i <- 0 until 10) yield KeyPair(s"receiver#$i".getBytes("UTF-8")) + + val issueTx = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, false, issueFee, waitForTx = true).id + + node + .massTransfer( + issuer, + receivers.map(rc => MassTransferTransaction.Transfer(rc.toAddress.toString, 10)).toList, + minFee + minFee * receivers.length, + assetId = Some(issueTx), + waitForTx = true + ) + + nodes.waitForHeightArise() + + val distribution = node.assetDistribution(issueTx) + + distribution.size shouldBe (receivers.size + 1) + distribution(issuer.toAddress) shouldBe (issueAmount - 10 * receivers.length) + + assert(receivers.forall(rc => distribution(rc.toAddress) == 10), "Distribution correct") + } + test("Correct last page and entry count") { val receivers = for (i <- 0 until 50) yield KeyPair(s"receiver#$i".getBytes("UTF-8")) @@ -96,6 +124,22 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur assert(pages.map(_.items.size).sum == 51) } + test("Unlimited list") { + val receivers = for (i <- 0 until 2000) yield KeyPair(s"receiver#$i".getBytes("UTF-8")) + + val assetId = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, false, issueFee, waitForTx = true).id + + receivers.foreach { receiver => + node.transfer(issuer, receiver.toAddress.toString, 10, assetId = Some(assetId)) + } + + node.waitFor("empty utx")(_.utxSize, (_: Int) == 0, 1 second) + nodes.waitForHeightArise() + + val list = node.assetDistribution(assetId) + list should have size 2001 + } + def distributionPages(asset: String, height: Int, limit: Int): List[AssetDistributionPage] = { def _load(acc: List[AssetDistributionPage], maybeAfter: Option[String]): List[AssetDistributionPage] = { val page = node.assetDistributionAtHeight(asset, height, limit, maybeAfter) diff --git a/node/src/main/resources/swagger-ui/openapi.yaml b/node/src/main/resources/swagger-ui/openapi.yaml index 6ff380ed17..8d8d8b4eba 100644 --- a/node/src/main/resources/swagger-ui/openapi.yaml +++ b/node/src/main/resources/swagger-ui/openapi.yaml @@ -2470,6 +2470,38 @@ paths: type: string balance: type: string + '/assets/{assetId}/distribution': + get: + tags: + - assets + summary: Asset balance distribution + description: Get asset balance distribution by addresses + operationId: getAssetDistributionOld + parameters: + - $ref: '#/components/parameters/assetId' + responses: + '200': + description: successful operation + content: + application/json: + schema: + type: object + additionalProperties: + type: integer + format: int64 + description: map of assetId <-> balance + example: + 2eEUvypDSivnzPiLrbYEW39SM8yMZ1aq4eJuiKfs4sEY: 15 + 3PPqZ623dAfbmxmnpTjwV6yD5GA5s3PJiUG: 25 + application/json;large-significand-format=string: + schema: + type: object + additionalProperties: + type: string + description: map of assetId <-> balance + example: + 2eEUvypDSivnzPiLrbYEW39SM8yMZ1aq4eJuiKfs4sEY: "15" + 3PPqZ623dAfbmxmnpTjwV6yD5GA5s3PJiUG: "25" '/assets/{assetId}/distribution/{height}/limit/{limit}': get: tags: diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index 02cf0a6b82..b371f345c2 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -375,9 +375,8 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con else heavyRequestExecutor ) - val routeTimeout = new RouteTimeout( - FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS) - )(heavyRequestScheduler) + val serverRequestTimeout = FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS) + val routeTimeout = new RouteTimeout(serverRequestTimeout)(heavyRequestScheduler) val apiRoutes = Seq( new EthRpcRoute(blockchainUpdater, extensionContext.transactionsApi, time), @@ -441,6 +440,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con ), AssetsApiRoute( settings.restAPISettings, + serverRequestTimeout, wallet, transactionPublisher, blockchainUpdater, diff --git a/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala b/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala index 64a01f9952..24107f6d09 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala @@ -18,7 +18,13 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi} import com.wavesplatform.api.http.* import com.wavesplatform.api.http.ApiError.* import com.wavesplatform.api.http.StreamSerializerUtils.* -import com.wavesplatform.api.http.assets.AssetsApiRoute.{AssetDetails, AssetInfo, DistributionParams, assetDetailsSerializer} +import com.wavesplatform.api.http.assets.AssetsApiRoute.{ + AssetDetails, + AssetInfo, + DistributionParams, + assetDetailsSerializer, + assetDistributionSerializer +} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.lang.ValidationError import com.wavesplatform.network.TransactionPublisher @@ -39,9 +45,11 @@ import play.api.libs.json.* import java.util.concurrent.* import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration case class AssetsApiRoute( settings: RestAPISettings, + serverRequestTimeout: FiniteDuration, wallet: Wallet, transactionPublisher: TransactionPublisher, blockchain: Blockchain, @@ -65,6 +73,8 @@ case class AssetsApiRoute( ) ) + private val assetDistRouteTimeout = new RouteTimeout(serverRequestTimeout)(distributionTaskScheduler) + override lazy val route: Route = pathPrefix("assets") { pathPrefix("balance" / AddrSegment) { address => @@ -97,9 +107,10 @@ case class AssetsApiRoute( (path("nft" / AddrSegment / "limit" / IntNumber) & parameter("after".as[String].?)) { (address, limit, maybeAfter) => nft(address, limit, maybeAfter) } ~ pathPrefix(AssetId / "distribution") { assetId => - (path(IntNumber / "limit" / IntNumber) & parameter("after".?)) { (height, limit, maybeAfter) => - balanceDistributionAtHeight(assetId, height, limit, maybeAfter) - } + pathEndOrSingleSlash(balanceDistribution(assetId)) ~ + (path(IntNumber / "limit" / IntNumber) & parameter("after".?)) { (height, limit, maybeAfter) => + balanceDistributionAtHeight(assetId, height, limit, maybeAfter) + } } } } @@ -180,6 +191,16 @@ case class AssetsApiRoute( } } + def balanceDistribution(assetId: IssuedAsset): Route = { + implicit val jsonStreamingSupport: ToResponseMarshaller[Source[(Address, Long), NotUsed]] = + jacksonStreamMarshaller(prefix = "{", suffix = "}")(assetDistributionSerializer) + + assetDistRouteTimeout.executeFromObservable( + commonAssetsApi + .assetDistribution(assetId, blockchain.height, None) + ) + } + def balanceDistributionAtHeight(assetId: IssuedAsset, heightParam: Int, limitParam: Int, afterParam: Option[String]): Route = optionalHeaderValueByType(Accept) { accept => val paramsEi: Either[ValidationError, DistributionParams] = @@ -511,4 +532,14 @@ object AssetsApiRoute { gen.writeEndObject() } } + + def assetDistributionSerializer(numbersAsString: Boolean): JsonSerializer[(Address, Long)] = + (value: (Address, Long), gen: JsonGenerator, _: SerializerProvider) => { + val (address, balance) = value + if (numbersAsString) { + gen.writeStringField(address.toString, balance.toString) + } else { + gen.writeNumberField(address.toString, balance) + } + } } diff --git a/node/src/test/scala/com/wavesplatform/api/http/CustomJsonMarshallerSpec.scala b/node/src/test/scala/com/wavesplatform/api/http/CustomJsonMarshallerSpec.scala index f266896a79..9f1a6b67ed 100644 --- a/node/src/test/scala/com/wavesplatform/api/http/CustomJsonMarshallerSpec.scala +++ b/node/src/test/scala/com/wavesplatform/api/http/CustomJsonMarshallerSpec.scala @@ -115,6 +115,7 @@ class CustomJsonMarshallerSpec private val assetsRoute = AssetsApiRoute( restAPISettings, + 60.seconds, testWallet, publisher, blockchain, diff --git a/node/src/test/scala/com/wavesplatform/http/AssetsRouteSpec.scala b/node/src/test/scala/com/wavesplatform/http/AssetsRouteSpec.scala index a1e2a8ab0c..6484726d1e 100644 --- a/node/src/test/scala/com/wavesplatform/http/AssetsRouteSpec.scala +++ b/node/src/test/scala/com/wavesplatform/http/AssetsRouteSpec.scala @@ -57,6 +57,7 @@ class AssetsRouteSpec seal( AssetsApiRoute( restAPISettings, + 60.seconds, testWallet, DummyTransactionPublisher.accepting, d.blockchain,