From 0f8cfb0028471896388e707ce97b1e60f84e550d Mon Sep 17 00:00:00 2001 From: jangko Date: Mon, 23 Dec 2024 12:41:39 +0700 Subject: [PATCH] Refactor TxPool: leaner and simpler --- .../nodocker/engine/engine_env.nim | 14 +- .../nodocker/graphql/graphql_sim.nim | 7 +- hive_integration/nodocker/rpc/test_env.nim | 4 - nimbus/beacon/beacon_engine.nim | 4 - nimbus/core/tx_pool.nim | 577 ++---------------- nimbus/core/tx_pool/tx_desc.nim | 378 +++++++----- nimbus/core/tx_pool/tx_info.nim | 110 ---- nimbus/core/tx_pool/tx_item.nim | 235 +++---- nimbus/core/tx_pool/tx_packer.nim | 75 +-- nimbus/core/tx_pool/tx_tabs.nim | 431 +++---------- nimbus/core/tx_pool/tx_tabs/tx_rank.nim | 158 ----- nimbus/core/tx_pool/tx_tabs/tx_sender.nim | 432 ------------- nimbus/core/tx_pool/tx_tabs/tx_status.nim | 240 -------- nimbus/core/tx_pool/tx_tasks/tx_add.nim | 223 ------- nimbus/core/tx_pool/tx_tasks/tx_bucket.nim | 172 ------ nimbus/core/tx_pool/tx_tasks/tx_classify.nim | 214 ------- nimbus/core/tx_pool/tx_tasks/tx_dispose.nim | 117 ---- nimbus/core/tx_pool/tx_tasks/tx_head.nim | 59 -- nimbus/core/tx_pool/tx_tasks/tx_recover.nim | 75 --- nimbus/graphql/ethapi.nim | 11 +- nimbus/nimbus_execution_client.nim | 2 - nimbus/rpc/server_api.nim | 9 +- tests/test_engine_api.nim | 4 - tests/test_ledger.nim | 21 +- tests/test_rpc.nim | 44 +- tests/test_txpool/helpers.nim | 49 +- tests/test_txpool2.nim | 57 +- 27 files changed, 553 insertions(+), 3169 deletions(-) delete mode 100644 nimbus/core/tx_pool/tx_info.nim delete mode 100644 nimbus/core/tx_pool/tx_tabs/tx_rank.nim delete mode 100644 nimbus/core/tx_pool/tx_tabs/tx_sender.nim delete mode 100644 nimbus/core/tx_pool/tx_tabs/tx_status.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_add.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_bucket.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_classify.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_dispose.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_head.nim delete mode 100644 nimbus/core/tx_pool/tx_tasks/tx_recover.nim diff --git a/hive_integration/nodocker/engine/engine_env.nim b/hive_integration/nodocker/engine/engine_env.nim index 496357d1a7..071dd2af06 100644 --- a/hive_integration/nodocker/engine/engine_env.nim +++ b/hive_integration/nodocker/engine/engine_env.nim @@ -92,10 +92,6 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E chain, txPool) - # txPool must be informed of active head - # so it can know the latest account state - doAssert txPool.smartHead(chain.latestHeader) - var key: JwtSharedKey key.fromHex(jwtSecret).isOkOr: echo "JWT SECRET ERROR: ", error @@ -178,14 +174,12 @@ proc peer*(env: EngineEnv): Peer = proc getTxsInPool*(env: EngineEnv, txHashes: openArray[common.Hash32]): seq[Transaction] = result = newSeqOfCap[Transaction](txHashes.len) for txHash in txHashes: - let res = env.txPool.getItem(txHash) - if res.isErr: continue - let item = res.get - if item.reject == txInfoOk: - result.add item.tx + let item = env.txPool.getItem(txHash).valueOr: + continue + result.add item.tx proc numTxsInPool*(env: EngineEnv): int = - env.txPool.numTxs + env.txPool.len func version*(env: EngineEnv, time: EthTime): Version = if env.com.isPragueOrLater(time): diff --git a/hive_integration/nodocker/graphql/graphql_sim.nim b/hive_integration/nodocker/graphql/graphql_sim.nim index 3178837216..93e17d3bbb 100644 --- a/hive_integration/nodocker/graphql/graphql_sim.nim +++ b/hive_integration/nodocker/graphql/graphql_sim.nim @@ -85,18 +85,13 @@ proc main() = ) chain = ForkedChainRef.init(com) txPool = TxPoolRef.new(chain) - + discard importRlpBlock(blocksFile, com) let ctx = setupGraphqlContext(com, ethNode, txPool) var stat: SimStat let start = getTime() - # txPool must be informed of active head - # so it can know the latest account state - # e.g. "sendRawTransaction Nonce too low" case - doAssert txPool.smartHead(chain.latestHeader) - for fileName in walkDirRec( caseFolder, yieldFilter = {pcFile,pcLinkToFile}): if not fileName.endsWith(".json"): diff --git a/hive_integration/nodocker/rpc/test_env.nim b/hive_integration/nodocker/rpc/test_env.nim index 74de5c856d..da6768aa00 100644 --- a/hive_integration/nodocker/rpc/test_env.nim +++ b/hive_integration/nodocker/rpc/test_env.nim @@ -90,10 +90,6 @@ proc setupEnv*(taskPool: Taskpool): TestEnv = let chain = ForkedChainRef.init(com) let txPool = TxPoolRef.new(chain) - # txPool must be informed of active head - # so it can know the latest account state - doAssert txPool.smartHead(chain.latestHeader) - let rpcServer = setupRpcServer(ethCtx, com, ethNode, txPool, conf, chain) let rpcClient = newRpcHttpClient() waitFor rpcClient.connect("127.0.0.1", Port(8545), false) diff --git a/nimbus/beacon/beacon_engine.nim b/nimbus/beacon/beacon_engine.nim index 3ab4169f92..78db29ce3a 100644 --- a/nimbus/beacon/beacon_engine.nim +++ b/nimbus/beacon/beacon_engine.nim @@ -155,10 +155,6 @@ proc generateExecutionBundle*(ben: BeaconEngineRef, pos.setWithdrawals(attrs) - if headBlock.blockHash != xp.head.blockHash: - # reorg - discard xp.smartHead(headBlock) - if pos.timestamp <= headBlock.timestamp: return err "timestamp must be strictly later than parent" diff --git a/nimbus/core/tx_pool.nim b/nimbus/core/tx_pool.nim index ba0a880a79..387332f2ed 100644 --- a/nimbus/core/tx_pool.nim +++ b/nimbus/core/tx_pool.nim @@ -8,450 +8,79 @@ # at your option. This file may not be copied, modified, or distributed except # according to those terms. -## TODO: -## ===== -## * Impose a size limit to the bucket database. Which items would be removed? -## -## * There is a conceivable problem with the per-account optimisation. The -## algorithm chooses an account and does not stop packing until all txs -## of the account are packed or the block is full. In the latter case, -## there might be some txs left unpacked from the account which might be -## the most lucrative ones. Should this be tackled (see also next item)? -## -## * The classifier throws out all txs with negative gas tips. This implies -## that all subsequent txs must also be suspended for this account even -## though these following txs might be extraordinarily profitable so that -## packing the whole account might be woth wile. Should this be considered, -## somehow (see also previous item)? -## -## -## Transaction Pool -## ================ -## -## The transaction pool collects transactions and holds them in a database. -## This database consists of the three buckets *pending*, *staged*, and -## *packed* and a *waste basket*. These database entities are discussed in -## more detail, below. -## -## At some point, there will be some transactions in the *staged* bucket. -## Upon request, the pool will pack as many of those transactions as possible -## into to *packed* bucket which will subsequently be used to generate a -## new Ethereum block. -## -## When packing transactions from *staged* into *packed* bucked, the staged -## transactions are sorted by *sender account* and *nonce*. The *sender -## account* values are ordered by a *ranking* function (highest ranking first) -## and the *nonce* values by their natural integer order. Then, transactions -## are greedily picked from the ordered set until there are enough -## transactions in the *packed* bucket. Some boundary condition applies which -## roughly says that for a given account, all the transactions packed must -## leave no gaps between nonce values when sorted. -## -## The rank function applied to the *sender account* sorting is chosen as a -## guess for higher profitability which goes with a higher rank account. -## -## -## Rank calculator -## --------------- -## Let *tx()* denote the mapping -## :: -## tx: (account,nonce) -> tx -## -## from an index pair *(account,nonce)* to a transaction *tx*. Also, for some -## external parameter *baseFee*, let -## :: -## maxProfit: (tx,baseFee) -> tx.effectiveGasTip(baseFee) * tx.gasLimit -## -## be the maximal tip a single transation can achieve (where unit of the -## *effectiveGasTip()* is a *price* and *gasLimit* is a *commodity value*.). -## Then the rank function -## :: -## rank(account) = Σ maxProfit(tx(account,ν),baseFee) / Σ tx(account,ν).gasLimit -## ν ν -## -## is a *price* estimate of the maximal avarage tip per gas unit over all -## transactions for the given account. The nonces `ν` for the summation -## run over all transactions from the *staged* and *packed* bucket. -## -## -## -## -## Pool database: -## -------------- -## :: -## . . -## . . -## . . +----------+ -## add() ----+---------------------------------> | | -## | . +-----------+ . | disposed | -## +-----------> | pending | ------> | | -## . +-----------+ . | | -## . | ^ ^ . | waste | -## . v | | . | basket | -## . +----------+ | . | | -## . | staged | | . | | -## . +----------+ | . | | -## . | | ^ | . | | -## . | v | | . | | -## . | +----------+ . | | -## . | | packed | -------> | | -## . | +----------+ . | | -## . +----------------------> | | -## . . +----------+ -## -## The three columns *Batch queue*, *State bucket*, and *Terminal state* -## represent three different accounting (or database) systems. The pool -## database is continuosly updated while new transactions are added. -## Transactions are bundled with meta data which holds the full datanbase -## state in addition to other cached information like the sender account. -## -## -## New transactions -## ---------------- -## When entering the pool, new transactions are bundled with meta data and -## appended to the batch queue. These bundles are called *items* which are -## forwarded to one of the following entites: -## -## * the *staged* bucket if the transaction is valid and match some constraints -## on expected minimum mining fees (or a semblance of that for *non-PoW* -## networks) -## * the *pending* bucket if the transaction is valid but is not subject to be -## held in the *staged* bucket -## * the *waste basket* if the transaction is invalid -## -## If a valid transaction item supersedes an existing one, the existing -## item is moved to the waste basket and the new transaction replaces the -## existing one in the current bucket if the gas price of the transaction is -## at least `priceBump` per cent higher (see adjustable parameters, below.) -## -## Status buckets -## -------------- -## The term *bucket* is a nickname for a set of *items* (i.e. transactions -## bundled with meta data as mentioned earlier) all labelled with the same -## `status` symbol and not marked *waste*. In particular, bucket membership -## for an item is encoded as -## -## * the `status` field indicates the particular *bucket* membership -## * the `reject` field is reset/unset and has zero-equivalent value -## -## The following boundary conditions hold for the union of all buckets: -## -## * *Unique index:* -## Let **T** be the union of all buckets and **Q** be the -## set of *(sender,nonce)* pairs derived from the items of **T**. Then -## **T** and **Q** are isomorphic, i.e. for each pair *(sender,nonce)* -## from **Q** there is exactly one item from **T**, and vice versa. -## -## * *Consecutive nonces:* -## For each *(sender0,nonce0)* of **Q**, either -## *(sender0,nonce0-1)* is in **Q** or *nonce0* is the current nonce as -## registered with the *sender account* (implied by the block chain), -## -## The *consecutive nonces* requirement involves the *sender account* -## which depends on the current state of the block chain as represented by the -## internally cached head (i.e. insertion point where a new block is to be -## appended.) -## -## The following notation describes sets of *(sender,nonce)* pairs for -## per-bucket items. It will be used for boundary conditions similar to the -## ones above. -## -## * **Pending** denotes the set of *(sender,nonce)* pairs for the -## *pending* bucket -## -## * **Staged** denotes the set of *(sender,nonce)* pairs for the -## *staged* bucket -## -## * **Packed** denotes the set of *(sender,nonce)* pairs for the -## *packed* bucket -## -## The pending bucket -## ^^^^^^^^^^^^^^^^^^ -## Items in this bucket hold valid transactions that are not in any of the -## other buckets. All itmes might be promoted form here into other buckets if -## the current state of the block chain as represented by the internally cached -## head changes. -## -## The staged bucket -## ^^^^^^^^^^^^^^^^^ -## Items in this bucket are ready to be added to a new block. They typycally -## imply some expected minimum reward when mined on PoW networks. Some -## boundary condition holds: -## -## * *Consecutive nonces:* -## For any *(sender0,nonce0)* pair from **Staged**, the pair -## *(sender0,nonce0-1)* is not in **Pending**. -## -## Considering the respective boundary condition on the union of buckets -## **T**, this condition here implies that a *staged* per sender nonce has a -## predecessor in the *staged* or *packed* bucket or is a nonce as registered -## with the *sender account*. -## -## The packed bucket -## ^^^^^^^^^^^^^^^^^ -## All items from this bucket have been selected from the *staged* bucket, the -## transactions of which (i.e. unwrapped items) can go right away into a new -## ethernet block. How these items are selected was described at the beginning -## of this chapter. The following boundary conditions holds: -## -## * *Consecutive nonces:* -## For any *(sender0,nonce0)* pair from **Packed**, the pair -## *(sender0,nonce0-1)* is neither in **Pending**, nor in **Staged**. -## -## Considering the respective boundary condition on the union of buckets -## **T**, this condition here implies that a *packed* per-sender nonce has a -## predecessor in the very *packed* bucket or is a nonce as registered with the -## *sender account*. -## -## -## Terminal state -## -------------- -## After use, items are disposed into a waste basket *FIFO* queue which has a -## maximal length. If the length is exceeded, the oldest items are deleted. -## The waste basket is used as a cache for discarded transactions that need to -## re-enter the system. Recovering from the waste basket saves the effort of -## recovering the sender account from the signature. An item is identified -## *waste* if -## -## * the `reject` field is explicitely set and has a value different -## from a zero-equivalent. -## -## So a *waste* item is clearly distinguishable from any active one as a -## member of one of the *status buckets*. -## -## -## -## Pool coding -## =========== -## A piece of code using this pool architecture could look like as follows: -## :: -## # see also unit test examples, e.g. "Block packer tests" -## var chain: ForkedChainRef # to be initialised -## var txs: seq[Transaction] # to be initialised -## -## -## var xq = TxPoolRef.new(chain) # initialise tx-pool -## .. -## -## xq.add(txs) # add transactions .. -## .. # .. into the buckets -## -## let newBlock = xq.assembleBlock # fetch current mining block -## -## xp.smartHead(newBlock.header) # update pool, new insertion point -## -## -## Discussion of example -## --------------------- -## In the example, transactions are processed into buckets via `add()`. -## -## The `assembleBlock()` directive assembles and retrieves a new block for mining -## derived from the current pool state. It invokes the block packer which -## accumulates txs from the `pending` buscket into the `packed` bucket which -## then go into the block. -## -## Then mining and signing takes place ... -## -## After mining and signing, the view of the block chain as seen by the pool -## must be updated to be ready for a new mining process. In the best case, the -## canonical head is just moved to the currently mined block which would imply -## just to discard the contents of the *packed* bucket with some additional -## transactions from the *staged* bucket. A more general block chain state -## head update would be more complex, though. -## -## In the most complex case, the newly mined block was added to some block -## chain branch which has become an uncle to the new canonical head retrieved -## by `latestHeader()`. In order to update the pool to the very state -## one would have arrived if worked on the retrieved canonical head branch -## in the first place, the directive `smartHead()` calculates the actions of -## what is needed to get just there from the locally cached head state of the -## pool. These actions are applied by `smartHead()` after the internal head -## position was moved. -## -## The *setter* behind the internal head position adjustment also caches -## updated internal parameters as base fee, state, fork, etc. -## -## -## Adjustable Parameters -## --------------------- -## -## flags -## The `flags` parameter holds a set of strategy symbols for how to process -## items and buckets. -## -## *autoUpdateBucketsDB* -## Automatically update the state buckets after running batch jobs if the -## `dirtyBuckets` flag is also set. -## -## *autoZombifyUnpacked* -## Automatically dispose *pending* or *staged* tx items that were added to -## the state buckets database at least `lifeTime` ago. -## -## lifeTime -## Txs that stay longer in one of the buckets will be moved to a waste -## basket. From there they will be eventually deleted oldest first when -## the maximum size would be exceeded. -## -## priceBump -## There can be only one transaction in the database for the same `sender` -## account and `nonce` value. When adding a transaction with the same -## (`sender`, `nonce`) pair, the new transaction will replace the current one -## if it has a gas price which is at least `priceBump` per cent higher. -## -## -## Read-Only Parameters -## -------------------- -## head -## Cached block chain insertion point, not necessarily the same header as -## retrieved by the `latestHeader()`. This insertion point can be -## adjusted with the `smartHead()` function. - +{.push raises: [].} import - std/[sequtils, tables], - ./tx_pool/[tx_packer, tx_desc, tx_info, tx_item], + eth/common/blocks, ./tx_pool/tx_tabs, - ./tx_pool/tx_tasks/[ - tx_add, - tx_bucket, - tx_head, - tx_dispose], - chronicles, - stew/keyed_queue, - results, - ../common/common, + ./tx_pool/tx_item, + ./tx_pool/tx_desc, + ./tx_pool/tx_packer, ./chain/forked_chain, ./casper + +from eth/common/eth_types_rlp import rlpHash + +# ------------------------------------------------------------------------------ +# TxPoolRef public types +# ------------------------------------------------------------------------------ export - TxItemRef, - TxItemStatus, - TxPoolFlags, TxPoolRef, - TxTabsItemsCount, - results, - tx_desc.startDate, - tx_info, - tx_item.effectiveGasTip, - tx_item.info, - tx_item.itemID, - tx_item.sender, - tx_item.status, - tx_item.timeStamp, - tx_item.tx, - tx_desc.head - -{.push raises: [].} - -logScope: - topics = "tx-pool" + TxError # ------------------------------------------------------------------------------ -# Private functions: tasks processor +# TxItemRef public getters # ------------------------------------------------------------------------------ -proc maintenanceProcessing(xp: TxPoolRef) - {.gcsafe,raises: [CatchableError].} = - ## Tasks to be done after add/del txs processing - - # Purge expired items - if autoZombifyUnpacked in xp.pFlags: - # Move transactions older than `xp.lifeTime` to the waste basket. - xp.disposeExpiredItems - - # Update buckets - if autoUpdateBucketsDB in xp.pFlags: - if xp.pDirtyBuckets: - # For all items, re-calculate item status values (aka bucket labels). - # If the `force` flag is set, re-calculation is done even though the - # change flag has remained unset. - discard xp.bucketUpdateAll - xp.pDirtyBuckets = false - -proc setHead(xp: TxPoolRef; val: Header) - {.gcsafe,raises: [CatchableError].} = - ## Update cached block chain insertion point. This will also update the - ## internally cached `baseFee` (depends on the block chain state.) - if xp.head != val: - xp.head = val # calculates the new baseFee - xp.txDB.baseFee = xp.baseFee - xp.pDirtyBuckets = true - xp.bucketFlushPacked +export + tx, # : Transaction + pooledTx, # : PooledTransaction + id, # : Hash32 + sender # : Address # ------------------------------------------------------------------------------ -# Public constructor/destructor +# TxPoolRef constructor # ------------------------------------------------------------------------------ -proc new*(T: type TxPoolRef; chain: ForkedChainRef): T - {.gcsafe,raises: [].} = +proc new*(T: type TxPoolRef; chain: ForkedChainRef): T = ## Constructor, returns a new tx-pool descriptor. new result result.init(chain) # ------------------------------------------------------------------------------ -# Public functions, task manager, pool actions serialiser +# TxPoolRef public getters # ------------------------------------------------------------------------------ -# core/tx_pool.go(848): func (pool *TxPool) AddLocals(txs [].. -# core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs [].. -proc add*(xp: TxPoolRef; txs: openArray[PooledTransaction]; info = "") - {.gcsafe,raises: [CatchableError].} = - ## Add a list of transactions to be processed and added to the buckets - ## database. It is OK pass an empty list in which case some maintenance - ## check can be forced. - ## - ## The argument Transactions `txs` may come in any order, they will be - ## sorted by `` before adding to the database with the - ## least nonce first. For this reason, it is suggested to pass transactions - ## in larger groups. Calling single transaction jobs, they must strictly be - ## passed *smaller nonce* before *larger nonce*. - xp.pDoubleCheckAdd xp.addTxs(txs, info).topItems - xp.maintenanceProcessing - -# core/tx_pool.go(854): func (pool *TxPool) AddLocals(txs [].. -# core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs [].. -proc add*(xp: TxPoolRef; tx: PooledTransaction; info = "") - {.gcsafe,raises: [CatchableError].} = - ## Variant of `add()` for a single transaction. - xp.add(@[tx], info) - -proc smartHead*(xp: TxPoolRef; pos: Header): bool - {.gcsafe,raises: [CatchableError].} = - ## This function moves the internal head cache (i.e. tx insertion point, - ## vmState) and ponts it to a now block on the chain. - ## - ## it calculates the - ## txs that need to be added or deleted after moving the insertion point - ## head so that the tx-pool will not fail to re-insert quered txs that are - ## on the chain, already. Neither will it loose any txs. After updating the - ## the internal head cache, the previously calculated actions will be - ## applied. - ## - let rcDiff = xp.headDiff(pos, xp.chain) - if rcDiff.isOk: - let changes = rcDiff.value - - # Need to move head before adding txs which may rightly be rejected in - # `addTxs()` otherwise. - xp.setHead(pos) - - # Delete already *mined* transactions - if 0 < changes.remTxs.len: - debug "queuing delta txs", - mode = "remove", - num = changes.remTxs.len - xp.disposeById(toSeq(changes.remTxs.keys), txInfoChainHeadUpdate) +export + chain, + com, + len - xp.maintenanceProcessing - return true +# chain(xp: TxPoolRef): ForkedChainRef +# com(xp: TxPoolRef): CommonRef +# len(xp: TxPoolRef): int # ------------------------------------------------------------------------------ -# Public functions, getters +# TxPoolRef public functions # ------------------------------------------------------------------------------ -func com*(xp: TxPoolRef): CommonRef = - ## Getter - xp.vmState.com +export + addTx, + getItem, + removeTx, + removeExpiredTxs + +# addTx(xp: TxPoolRef, ptx: PooledTransaction): Result[void, TxError] +# addTx(xp: TxPoolRef, tx: Transaction): Result[void, TxError] +# getItem(xp: TxPoolRef, id: Hash32): Result[TxItemRef, TxError] +# removeTx(xp: TxPoolRef, id: Hash32) +# removeExpiredTxs(xp: TxPoolRef, lifeTime: Duration) + +proc removeNewBlockTxs*(xp: TxPoolRef, blk: Block) = + for tx in blk.transactions: + let txHash = rlpHash(tx) + xp.removeTx(txHash) type AssembledBlock* = object blk*: EthBlock @@ -462,37 +91,26 @@ type AssembledBlock* = object proc assembleBlock*( xp: TxPoolRef, someBaseFee: bool = false -): Result[AssembledBlock, string] {.gcsafe,raises: [CatchableError].} = - ## Getter, retrieves a packed block ready for mining and signing depending - ## on the internally cached block chain head, the txs in the pool and some - ## tuning parameters. The following block header fields are left - ## uninitialised: - ## - ## * *mixHash*: Hash32 - ## * *nonce*: BlockNonce - ## - ## Note that this getter runs *ad hoc* all the txs through the VM in - ## order to build the block. +): Result[AssembledBlock, string] = + xp.updateVmState(xp.chain.latestHeader) - var pst = xp.packerVmExec().valueOr: # updates vmState + var pst = xp.setupTxPacker().valueOr: return err(error) var blk = EthBlock( - header: pst.assembleHeader # uses updated vmState + header: pst.assembleHeader ) var blobsBundle: BlobsBundle - - for _, nonceList in xp.txDB.packingOrderAccounts(txItemPacked): - for item in nonceList.incNonce: - let tx = item.pooledTx - blk.txs.add tx.tx - if tx.networkPayload != nil: - for k in tx.networkPayload.commitments: - blobsBundle.commitments.add k - for p in tx.networkPayload.proofs: - blobsBundle.proofs.add p - for blob in tx.networkPayload.blobs: - blobsBundle.blobs.add blob + for item in pst.packedTxs: + let tx = item.pooledTx + blk.txs.add tx.tx + if tx.networkPayload != nil: + for k in tx.networkPayload.commitments: + blobsBundle.commitments.add k + for p in tx.networkPayload.proofs: + blobsBundle.proofs.add p + for blob in tx.networkPayload.blobs: + blobsBundle.blobs.add blob blk.header.transactionsRoot = calcTxRoot(blk.txs) let com = xp.vmState.com @@ -524,80 +142,3 @@ proc assembleBlock*( blobsBundle: blobsBundleOpt, blockValue: pst.blockValue, executionRequests: executionRequestsOpt) - -# core/tx_pool.go(474): func (pool SetGasPrice,*TxPool) Stats() (int, int) { -# core/tx_pool.go(1728): func (t *txLookup) Count() int { -# core/tx_pool.go(1737): func (t *txLookup) LocalCount() int { -# core/tx_pool.go(1745): func (t *txLookup) RemoteCount() int { -func nItems*(xp: TxPoolRef): TxTabsItemsCount = - ## Getter, retrieves the current number of items per bucket and - ## some totals. - xp.txDB.nItems - -# ------------------------------------------------------------------------------ -# Public functions, per-tx-item operations -# ------------------------------------------------------------------------------ - -# core/tx_pool.go(979): func (pool *TxPool) Get(hash common.Hash) .. -# core/tx_pool.go(985): func (pool *TxPool) Has(hash common.Hash) bool { -func getItem*(xp: TxPoolRef; hash: Hash32): Result[TxItemRef,void] = - ## Returns a transaction if it is contained in the pool. - xp.txDB.byItemID.eq(hash) - -func disposeItems*(xp: TxPoolRef; item: TxItemRef; - reason = txInfoExplicitDisposal; - otherReason = txInfoImpliedDisposal): int - {.discardable,gcsafe,raises: [CatchableError].} = - ## Move item to wastebasket. All items for the same sender with nonces - ## greater than the current one are deleted, as well. The function returns - ## the number of items eventally removed. - xp.disposeItemAndHigherNonces(item, reason, otherReason) - -iterator txHashes*(xp: TxPoolRef): Hash32 = - for txHash in nextKeys(xp.txDB.byItemID): - yield txHash - -iterator okPairs*(xp: TxPoolRef): (Hash32, TxItemRef) = - for x in nextPairs(xp.txDB.byItemID): - if x.data.reject == txInfoOk: - yield (x.key, x.data) - -func numTxs*(xp: TxPoolRef): int = - xp.txDB.byItemID.len - -func disposeAll*(xp: TxPoolRef) {.raises: [CatchableError].} = - let numTx = xp.numTxs - var list = newSeqOfCap[TxItemRef](numTx) - for x in nextPairs(xp.txDB.byItemID): - list.add x.data - for x in list: - xp.disposeItems(x) - -# ------------------------------------------------------------------------------ -# Public functions, local/remote accounts -# ------------------------------------------------------------------------------ - -func inPoolAndOk*(xp: TxPoolRef; txHash: Hash32): bool = - let res = xp.getItem(txHash) - if res.isErr: return false - res.get().reject == txInfoOk - -func inPoolAndReason*(xp: TxPoolRef; txHash: Hash32): Result[void, string] = - let res = xp.getItem(txHash) - if res.isErr: - # try to look in rejecteds - let r = xp.txDB.byRejects.eq(txHash) - if r.isErr: - return err("cannot find tx in txpool") - else: - return err(r.get().rejectInfo) - - let item = res.get() - if item.reject == txInfoOk: - return ok() - else: - return err(item.rejectInfo) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_desc.nim b/nimbus/core/tx_pool/tx_desc.nim index 880bae9be4..7f69ba7492 100644 --- a/nimbus/core/tx_pool/tx_desc.nim +++ b/nimbus/core/tx_pool/tx_desc.nim @@ -8,132 +8,125 @@ # at your option. This file may not be copied, modified, or distributed except # according to those terms. -## Transaction Pool Descriptor -## =========================== -## +{.push raises: [].} import - std/[times], + std/times, eth/eip1559, + eth/common/transaction_utils, + stew/sorted_set, ../../common/common, ../../evm/state, ../../evm/types, ../../db/ledger, ../../constants, - ../../core/chain/forked_chain, + ../../transaction, + ../chain/forked_chain, ../pow/header, ../eip4844, ../casper, - ./tx_item, + ../validate, ./tx_tabs, - ./tx_tabs/tx_sender + ./tx_item -{.push raises: [].} +from eth/common/eth_types_rlp import rlpHash type - TxPoolFlags* = enum ##\ - ## Processing strategy selector symbols - - autoUpdateBucketsDB ##\ - ## Automatically update the state buckets after running batch jobs if - ## the `dirtyBuckets` flag is also set. - - autoZombifyUnpacked ##\ - ## Automatically dispose *pending* or *staged* txs that were queued - ## at least `lifeTime` ago. - - TxPoolParam* = tuple ## Getter/setter accessible parameters - dirtyBuckets: bool ## Buckets need to be updated - doubleCheck: seq[TxItemRef] ## Check items after moving block chain head - flags: set[TxPoolFlags] ## Processing strategy symbols - - TxPoolRef* = ref object of RootObj ##\ - ## Transaction pool descriptor - startDate: Time ## Start date (read-only) - param: TxPoolParam ## Getter/Setter parameters - - vmState: BaseVMState - txDB: TxTabsRef ## Transaction lists & tables - - lifeTime*: times.Duration ## Maximum life time of a tx in the system - priceBump*: uint ## Min precentage price when superseding - chain*: ForkedChainRef + TxPoolRef* = ref object + vmState : BaseVMState + chain : ForkedChainRef + senderTab: TxSenderTab + idTab : TxIdTab const - txItemLifeTime = ##\ - ## Maximum amount of time transactions can be held in the database\ - ## unless they are packed already for a block. This default is chosen\ - ## as found in core/tx_pool.go(184) of the geth implementation. - initDuration(hours = 3) - - txPriceBump = ##\ - ## Minimum price bump percentage to replace an already existing\ - ## transaction (nonce). This default is chosen as found in\ - ## core/tx_pool.go(177) of the geth implementation. - 10u - - txPoolFlags = {autoUpdateBucketsDB, - autoZombifyUnpacked} + MAX_POOL_SIZE = 5000 + MAX_TXS_PER_ACCOUNT = 100 + TX_ITEM_LIFETIME = initDuration(minutes = 60) # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc baseFeeGet(com: CommonRef; parent: Header): Opt[UInt256] = +proc getBaseFee(com: CommonRef; parent: Header): Opt[UInt256] = ## Calculates the `baseFee` of the head assuming this is the parent of a ## new block header to generate. - - # Note that the baseFee is calculated for the next header - if not com.isLondonOrLater(parent.number+1): - return Opt.none(UInt256) - - # If the new block is the first EIP-1559 block, return initial base fee. - if not com.isLondonOrLater(parent.number): - return Opt.some(EIP1559_INITIAL_BASE_FEE) - + ## Post Merge rule Opt.some calcEip1599BaseFee( parent.gasLimit, parent.gasUsed, parent.baseFeePerGas.get(0.u256)) -proc gasLimitsGet(com: CommonRef; parent: Header): GasInt = - if com.isLondonOrLater(parent.number+1): - var parentGasLimit = parent.gasLimit - if not com.isLondonOrLater(parent.number): - # Bump by 2x - parentGasLimit = parent.gasLimit * EIP1559_ELASTICITY_MULTIPLIER - calcGasLimit1559(parentGasLimit, desiredLimit = com.gasLimit) - else: - computeGasLimit( - parent.gasUsed, - parent.gasLimit, - gasFloor = com.gasLimit, - gasCeil = com.gasLimit) +func getGasLimit(com: CommonRef; parent: Header): GasInt = + ## Post Merge rule + calcGasLimit1559(parent.gasLimit, desiredLimit = com.gasLimit) proc setupVMState(com: CommonRef; parent: Header): BaseVMState = - # do hardfork transition before - # BaseVMState querying any hardfork/consensus from CommonRef - - let pos = com.pos - - let blockCtx = BlockContext( - timestamp : pos.timestamp, - gasLimit : gasLimitsGet(com, parent), - baseFeePerGas: baseFeeGet(com, parent), - prevRandao : pos.prevRandao, - difficulty : UInt256.zero(), - coinbase : pos.feeRecipient, - excessBlobGas: calcExcessBlobGas(parent, com.isPragueOrLater(pos.timestamp)), - parentHash : parent.blockHash, - ) + let + pos = com.pos + electra = com.isPragueOrLater(pos.timestamp) + blockCtx = BlockContext( + timestamp : pos.timestamp, + gasLimit : getGasLimit(com, parent), + baseFeePerGas: getBaseFee(com, parent), + prevRandao : pos.prevRandao, + difficulty : UInt256.zero(), + coinbase : pos.feeRecipient, + excessBlobGas: calcExcessBlobGas(parent, electra), + parentHash : parent.blockHash, + ) BaseVMState.new( parent = parent, blockCtx = blockCtx, com = com) -proc update(xp: TxPoolRef; parent: Header) = - xp.vmState = setupVMState(xp.vmState.com, parent) +template append(tab: var TxSenderTab, sn: TxSenderNonceRef) = + tab[item.sender] = sn + +proc getCurrentFromSenderTab(xp: TxPoolRef; item: TxItemRef): Opt[TxItemRef] = + let sn = xp.senderTab.getOrDefault(item.sender) + if sn.isNil: + return Opt.none(TxItemRef) + let current = sn.list.eq(item.nonce).valueOr: + return Opt.none(TxItemRef) + Opt.some(current.data) + +proc removeFromSenderTab(xp: TxPoolRef; item: TxItemRef) = + let sn = xp.senderTab.getOrDefault(item.sender) + if sn.isNil: + return + discard sn.list.delete(item.nonce) + +func alreadyKnown(xp: TxPoolRef, id: Hash32): bool = + xp.idTab.getOrDefault(id).isNil.not + +proc insertToSenderTab(xp: TxPoolRef; item: TxItemRef): Result[void, TxError] = + ## Add transaction `item` to the list. The function has no effect if the + ## transaction exists, already. + var sn = xp.senderTab.getOrDefault(item.sender) + if sn.isNil: + # First insertion + sn = TxSenderNonceRef.init() + sn.insertOrReplace(item) + xp.senderTab.append(sn) + return ok() + + if sn.len >= MAX_TXS_PER_ACCOUNT: + return err(txErrorSenderMaxTxs) + + let current = xp.getCurrentFromSenderTab(item).valueOr: + # no equal sender/nonce, + # insert into txpool + sn.insertOrReplace(item) + return ok() + + ?current.validateTxGasBump(item) + + # Replace current item, + # insertion to idTab will be handled by addTx. + xp.idTab.del(current.id) + sn.insertOrReplace(item) + ok() # ------------------------------------------------------------------------------ # Public functions, constructor @@ -141,52 +134,14 @@ proc update(xp: TxPoolRef; parent: Header) = proc init*(xp: TxPoolRef; chain: ForkedChainRef) = ## Constructor, returns new tx-pool descriptor. - xp.startDate = getTime().utc.toTime - let head = chain.latestHeader xp.vmState = setupVMState(chain.com, head) - xp.txDB = TxTabsRef.new - - xp.lifeTime = txItemLifeTime - xp.priceBump = txPriceBump - - xp.param.reset - xp.param.flags = txPoolFlags xp.chain = chain -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc clearAccounts*(xp: TxPoolRef) = - ## Reset transaction environment, e.g. before packing a new block - xp.update(xp.vmState.parent) - # ------------------------------------------------------------------------------ # Public functions, getters # ------------------------------------------------------------------------------ -func pFlags*(xp: TxPoolRef): set[TxPoolFlags] = - ## Returns the set of algorithm strategy symbols for labelling items - ## as`packed` - xp.param.flags - -func pDirtyBuckets*(xp: TxPoolRef): bool = - ## Getter, buckets need update - xp.param.dirtyBuckets - -func pDoubleCheck*(xp: TxPoolRef): seq[TxItemRef] = - ## Getter, cached block chain head was moved back - xp.param.doubleCheck - -func startDate*(xp: TxPoolRef): Time = - ## Getter - xp.startDate - -func txDB*(xp: TxPoolRef): TxTabsRef = - ## Getter, pool database - xp.txDB - func baseFee*(xp: TxPoolRef): GasInt = ## Getter, baseFee for the next bock header. This value is auto-generated ## when a new insertion point is set via `head=`. @@ -208,52 +163,143 @@ func excessBlobGas*(xp: TxPoolRef): GasInt = xp.vmState.blockCtx.excessBlobGas proc getBalance*(xp: TxPoolRef; account: Address): UInt256 = - ## Wrapper around `vmState.readOnlyLedger.getBalance()` for a `vmState` - ## descriptor positioned at the `dh.head`. This might differ from the - ## `dh.vmState.readOnlyLedger.getBalance()` which returnes the current - ## balance relative to what has been accumulated by the current packing - ## procedure. xp.vmState.ledger.getBalance(account) proc getNonce*(xp: TxPoolRef; account: Address): AccountNonce = - ## Wrapper around `vmState.readOnlyLedger.getNonce()` for a `vmState` - ## descriptor positioned at the `dh.head`. This might differ from the - ## `dh.vmState.readOnlyLedger.getNonce()` which returnes the current balance - ## relative to what has been accumulated by the current packing procedure. xp.vmState.ledger.getNonce(account) -func head*(xp: TxPoolRef): Header = - ## Getter, cached block chain insertion point. Typocally, this should be the - ## the same header as retrieved by the `ForkedChainRef.latestHeader` (unless in the - ## middle of a mining update.) - xp.vmState.parent - -# ------------------------------------------------------------------------------ -# Public functions, setters -# ------------------------------------------------------------------------------ - -func `pDirtyBuckets=`*(xp: TxPoolRef; val: bool) = - ## Setter - xp.param.dirtyBuckets = val - -func pDoubleCheckAdd*(xp: TxPoolRef; val: seq[TxItemRef]) = - ## Pseudo setter - xp.param.doubleCheck.add val - -func pDoubleCheckFlush*(xp: TxPoolRef) = - ## Pseudo setter - xp.param.doubleCheck.setLen(0) +template chain*(xp: TxPoolRef): ForkedChainRef = + xp.chain -func `pFlags=`*(xp: TxPoolRef; val: set[TxPoolFlags]) = - ## Install a set of algorithm strategy symbols for labelling items as`packed` - xp.param.flags = val +template com*(xp: TxPoolRef): CommonRef = + xp.chain.com -proc `head=`*(xp: TxPoolRef; val: Header) - {.gcsafe,raises: [].} = - ## Setter, updates descriptor. This setter re-positions the `vmState` and - ## account caches to a new insertion point on the block chain database. - xp.update(val) +func len*(xp: TxPoolRef): int = + xp.idTab.len # ------------------------------------------------------------------------------ -# End +# Public functions # ------------------------------------------------------------------------------ + +proc updateVmState*(xp: TxPoolRef; parent: Header) = + ## Reset transaction environment, e.g. before packing a new block + xp.vmState = setupVMState(xp.vmState.com, parent) + +proc classifyValid(xp: TxPoolRef; tx: Transaction, sender: Address): bool = + if tx.tip(xp.baseFee) <= 0.GasInt: + return false + + if tx.gasLimit > xp.gasLimit: + return false + + # Ensure that the user was willing to at least pay the base fee + # And to at least pay the current data gasprice + if tx.txType >= TxEip1559: + if tx.maxFeePerGas < xp.baseFee: + return false + + if tx.txType == TxEip4844: + let + excessBlobGas = xp.excessBlobGas + blobGasPrice = getBlobBaseFee(excessBlobGas, xp.nextFork >= FkPrague) + if tx.maxFeePerBlobGas < blobGasPrice: + return false + + # Check whether the worst case expense is covered by the price budget, + let + balance = xp.getBalance(sender) + gasCost = tx.gasCost + if balance < gasCost: + return false + let balanceOffGasCost = balance - gasCost + if balanceOffGasCost < tx.value: + return false + + # For legacy transactions check whether minimum gas price and tip are + # high enough. These checks are optional. + if tx.txType < TxEip1559: + if tx.gasPrice < 0: + return false + + # Fall back transaction selector scheme + if tx.tip(xp.baseFee) < 1.GasInt: + return false + + if tx.txType >= TxEip1559: + if tx.tip(xp.baseFee) < 1.GasInt: + return false + + if tx.maxFeePerGas < 1.GasInt: + return false + + true + +proc addTx*(xp: TxPoolRef, ptx: PooledTransaction): Result[void, TxError] = + if not ptx.tx.validateChainId(xp.chain.com.chainId): + return err(txErrorChainIdMismatch) + + if ptx.tx.txType == TxEip4844: + ptx.validateBlobTransactionWrapper().isOkOr: + return err(txErrorInvalidBlob) + + let id = ptx.rlpHash + if xp.alreadyKnown(id): + return err(txErrorAlreadyKnown) + + validateTxBasic( + ptx.tx, + xp.nextFork, + # A new transaction of the next fork may be + # coming before the fork activated + validateFork = false).isOkOr: + return err(txErrorBasicValidation) + + let + sender = ptx.tx.recoverSender().valueOr: + return err(txErrorInvalidSignature) + nonce = xp.getNonce(sender) + + if ptx.tx.nonce < nonce: + return err(txErrorNonceTooSmall) + + if not xp.classifyValid(ptx.tx, sender): + return err(txErrorTxInvalid) + + if xp.idTab.len >= MAX_POOL_SIZE: + return err(txErrorPoolIsFull) + + let item = TxItemRef.new(ptx, id, sender) + ?xp.insertToSenderTab(item) + xp.idTab[item.id] = item + ok() + +proc addTx*(xp: TxPoolRef, tx: Transaction): Result[void, TxError] = + xp.addTx(PooledTransaction(tx: tx)) + +proc getItem*(xp: TxPoolRef, id: Hash32): Result[TxItemRef, TxError] = + let item = xp.idTab.getOrDefault(id) + if item.isNil: + return err(txErrorItemNotFound) + ok(item) + +proc removeTx*(xp: TxPoolRef, id: Hash32) = + let item = xp.getItem(id).valueOr: + return + xp.removeFromSenderTab(item) + xp.idTab.del(id) + +proc removeExpiredTxs*(xp: TxPoolRef, lifeTime: Duration = TX_ITEM_LIFETIME) = + var expired = newSeqOfCap[Hash32](xp.idTab.len div 4) + let now = utcNow() + + for txHash, item in xp.idTab: + if now - item.time > lifeTime: + expired.add txHash + + for txHash in expired: + xp.removeTx(txHash) + +iterator byPriceAndNonce*(xp: TxPoolRef): TxItemRef = + for item in byPriceAndNonce(xp.senderTab, + xp.vmState.ledger, xp.baseFee): + yield item diff --git a/nimbus/core/tx_pool/tx_info.nim b/nimbus/core/tx_pool/tx_info.nim deleted file mode 100644 index 3a6c8237dc..0000000000 --- a/nimbus/core/tx_pool/tx_info.nim +++ /dev/null @@ -1,110 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Info Symbols & Error Codes -## =========================================== - -{.push raises: [].} - -type - TxInfo* = enum - txInfoOk = - (0, "no error") - - txInfoSenderNonceSuperseded = ##\ - ## Tx superseded by another one with same index - "Sender/nonce index superseded" - - txInfoErrNonceGap = ##\ - ## Non consecutive nonces detected after moving back the block chain - ## head. This should not happen and indicates an inconsistency between - ## cached transactions and the ones on the block chain. - "nonce gap" - - txInfoErrImpliedNonceGap = ##\ - ## Implied disposal, applies to transactions with higher nonces after - ## a `txInfoErrNonceGap` error. - "implied nonce gap" - - txInfoExplicitDisposal = ##\ - ## Unspecified disposal reason (fallback value) - "on-demand disposal" - - txInfoImpliedDisposal = ##\ - ## Implied disposal, typically implied by greater nonces (fallback value) - "implied disposal" - - txInfoChainIdMismatch = ##\ - ## Tx chainId does not match with network chainId - "chainId mismatch" - # ------ Miscellaneous errors ---------------------------------------------- - - txInfoErrUnspecified = ##\ - ## Some unspecified error occured - "generic error" - - txInfoErrVoidDisposal = ##\ - ## Cannot dispose non-existing item - "void disposal" - - txInfoErrAlreadyKnown = ##\ - ## The transactions is already contained within the pool - "already known" - - txInfoErrSenderNonceIndex = ##\ - ## index for transaction exists, already. - "Sender/nonce index error" - - # ------ Transaction format/parsing problems ------------------------------- - - txInfoErrBasicValidatorFailed = ##\ - ## Running basic validator failed on current transaction - "Tx rejected by basic validator" - - txInfoErrInvalidBlob = ##\ - ## Invalid EIP-4844 kzg validation on blob wrapper - "Invalid EIP-4844 blob validation" - - # ------ Signature problems ------------------------------------------------ - - txInfoErrInvalidSender = ##\ - ## The transaction contains an invalid signature. - "invalid sender" - - # ------ Gas fee and selection problems ------------------------------------ - - - txInfoErrReplaceUnderpriced = ##\ - ## A transaction is attempted to be replaced with a different one - ## without the required price bump. - "Replacement tx underpriced" - - # ------- operational events related to transactions ----------------------- - - txInfoErrTxExpired = ##\ - ## A transaction has been on the system for too long so it was removed. - "Tx expired" - - txInfoErrTxExpiredImplied = ##\ - ## Implied disposal for greater nonces for the same sender when the base - ## tx was removed. - "Tx expired implied" - - # ------- update/move block chain head ------------------------------------- - - txInfoErrForwardHeadMissing = ##\ - ## Cannot move forward current head to non-existing target position - "Non-existing forward header" - - txInfoChainHeadUpdate = ##\ - ## Tx becomes obsolete as it is in a mined block, already - "Tx obsoleted" - -# End diff --git a/nimbus/core/tx_pool/tx_item.nim b/nimbus/core/tx_pool/tx_item.nim index 9abf72279c..8c9282a01a 100644 --- a/nimbus/core/tx_pool/tx_item.nim +++ b/nimbus/core/tx_pool/tx_item.nim @@ -12,182 +12,145 @@ ## ========================================= ## +{.push raises: [].} + import std/[hashes, times], + results, ../../utils/utils, - ../../transaction, - ./tx_info, - eth/common/transaction_utils, - results + ../../transaction -from eth/common/eth_types_rlp import rlpHash - -{.push raises: [].} +from ../eip4844 import getTotalBlobGas +from eth/common/hashes import hash type - TxItemStatus* = enum ##\ - ## Current status of a transaction as seen by the pool. - txItemPending = 0 - txItemStaged - txItemPacked - - TxItemRef* = ref object of RootObj ##\ - ## Data container with transaction and meta data. Entries are *read-only*\ - ## by default, for some there is a setter available. - tx: PooledTransaction ## Transaction data - itemID: Hash32 ## Transaction hash - timeStamp: Time ## Time when added - sender: Address ## Sender account address - info: string ## Whatever - status: TxItemStatus ## Transaction status (setter available) - reject: TxInfo ## Reason for moving to waste basket - -# ------------------------------------------------------------------------------ -# Private, helpers for debugging and pretty printing -# ------------------------------------------------------------------------------ - -proc utcTime: Time = - getTime().utc.toTime + TxError* = enum + txErrorInvalidSignature + txErrorItemNotFound + txErrorAlreadyKnown + txErrorNonceTooSmall + txErrorNonceGap + txErrorBasicValidation + txErrorInvalidBlob + txErrorReplacementGasTooLow + txErrorReplacementBlobGasTooLow + txErrorPoolIsFull + txErrorSenderMaxTxs + txErrorTxInvalid + txErrorChainIdMismatch + + TxItemRef* = ref object + ptx : PooledTransaction ## Transaction data + id : Hash32 ## Transaction hash + time : Time ## Time when added + sender: Address ## Sender account address + price : GasInt + + TxGasPrice = object + maxFee: GasInt + tip: GasInt # ------------------------------------------------------------------------------ # Public functions, Constructor # ------------------------------------------------------------------------------ -proc init*(item: TxItemRef; status: TxItemStatus; info: string) = - ## Update item descriptor. - item.info = info - item.status = status - item.timeStamp = utcTime() - item.reject = txInfoOk +proc utcNow*(): Time = + getTime().utc.toTime -proc new*(T: type TxItemRef; tx: PooledTransaction; itemID: Hash32; - status: TxItemStatus; info: string): Result[T,void] {.gcsafe,raises: [].} = +proc new*(T: type TxItemRef; + ptx: PooledTransaction, + id: Hash32, + sender: Address): T = ## Create item descriptor. - let rc = tx.tx.recoverSender() - if rc.isErr: - return err() - ok(T(itemID: itemID, - tx: tx, - sender: rc.value, - timeStamp: utcTime(), - info: info, - status: status)) - -proc new*(T: type TxItemRef; tx: PooledTransaction; - reject: TxInfo; status: TxItemStatus; info: string): T {.gcsafe,raises: [].} = - ## Create incomplete item descriptor, so meta-data can be stored (e.g. - ## for holding in the waste basket to be investigated later.) - T(tx: tx, - timeStamp: utcTime(), - info: info, - status: status) - -# ------------------------------------------------------------------------------ -# Public functions, Table ID helper -# ------------------------------------------------------------------------------ - -proc hash*(item: TxItemRef): Hash = - ## Needed if `TxItemRef` is used as hash-`Table` index. - cast[pointer](item).hash + T( + ptx : ptx, + id : id, + time : utcNow(), + sender: sender, + ) # ------------------------------------------------------------------------------ -# Public functions, transaction getters -# ------------------------------------------------------------------------------ - -proc itemID*(tx: Transaction): Hash32 = - ## Getter, transaction ID - tx.rlpHash - -proc itemID*(tx: PooledTransaction): Hash32 = - ## Getter, transaction ID - tx.rlpHash - -# core/types/transaction.go(297): func (tx *Transaction) Cost() *big.Int { -proc cost*(tx: Transaction): UInt256 = - ## Getter (go/ref compat): gas * gasPrice + value. - (tx.gasPrice * tx.gasLimit).u256 + tx.value +# Public functions +# ------------------------------------------------------------------------------- -func effectiveGasTip*(tx: Transaction; baseFee: GasInt): GasInt = +func tip*(tx: Transaction; baseFee: GasInt): GasInt = + ## Tip calculator effectiveGasTip(tx, Opt.some(baseFee.u256)) -# ------------------------------------------------------------------------------ -# Public functions, item getters -# ------------------------------------------------------------------------------ - -proc dup*(item: TxItemRef): TxItemRef = - ## Getter, provide contents copy - TxItemRef( - tx: item.tx, - itemID: item.itemID, - timeStamp: item.timeStamp, - sender: item.sender, - info: item.info, - status: item.status, - reject: item.reject - ) +func txGasPrice*(tx: Transaction): TxGasPrice = + case tx.txType + of TxLegacy, TxEip2930: + TxGasPrice( + maxFee: tx.gasPrice, + tip: tx.gasPrice, + ) + else: + TxGasPrice( + maxFee: tx.maxFeePerGas, + tip: tx.maxPriorityFeePerGas, + ) + +func hash*(item: TxItemRef): Hash = + ## Needed if `TxItemRef` is used as hash-`Table` index. + hash(item.id) -proc info*(item: TxItemRef): string = +template pooledTx*(item: TxItemRef): PooledTransaction = ## Getter - item.info + item.ptx -proc itemID*(item: TxItemRef): Hash32 = +template tx*(item: TxItemRef): Transaction = ## Getter - item.itemID + item.ptx.tx -proc reject*(item: TxItemRef): TxInfo = +template id*(item: TxItemRef): Hash32 = ## Getter - item.reject + item.id -proc sender*(item: TxItemRef): Address = +template sender*(item: TxItemRef): Address = ## Getter item.sender -proc status*(item: TxItemRef): TxItemStatus = - ## Getter - item.status - -proc timeStamp*(item: TxItemRef): Time = +template time*(item: TxItemRef): Time = ## Getter - item.timeStamp + item.time -proc pooledTx*(item: TxItemRef): PooledTransaction = +template nonce*(item: TxItemRef): AccountNonce = ## Getter - item.tx + item.tx.nonce -proc tx*(item: TxItemRef): Transaction = +template price*(item: TxItemRef): GasInt = ## Getter - item.tx.tx + item.price -func rejectInfo*(item: TxItemRef): string = - ## Getter - result = $item.reject - if item.info.len > 0: - result.add ": " - result.add item.info +func calculatePrice*(item: TxItemRef; baseFee: GasInt) = + ## Profit calculator + item.price = item.tx.gasLimit * item.tx.tip(baseFee) + item.tx.getTotalBlobGas -# ------------------------------------------------------------------------------ -# Public functions, setters -# ------------------------------------------------------------------------------ +func validateTxGasBump*(current: TxItemRef, added: TxItemRef): Result[void, TxError] = + func txGasPrice(item: TxItemRef): TxGasPrice = + txGasPrice(item.tx) -proc `status=`*(item: TxItemRef; val: TxItemStatus) = - ## Setter - item.status = val + const + MIN_GAS_PRICE_BUMP_PERCENT = 10 -proc `reject=`*(item: TxItemRef; val: TxInfo) = - ## Setter - item.reject = val + let + currentGasPrice = current.txGasPrice + newGasPrice = added.txGasPrice + minTipCap = currentGasPrice.tip + + (currentGasPrice.tip * MIN_GAS_PRICE_BUMP_PERCENT) div 100.GasInt + minFeeCap = currentGasPrice.maxFee + + (currentGasPrice.maxFee * MIN_GAS_PRICE_BUMP_PERCENT) div 100.GasInt -proc `info=`*(item: TxItemRef; val: string) = - ## Setter - item.info = val + if newGasPrice.tip < minTipCap or newGasPrice.maxFee < minFeeCap: + return err(txErrorReplacementGasTooLow) -# ------------------------------------------------------------------------------ -# Public functions, pretty printing and debugging -# ------------------------------------------------------------------------------ + if added.tx.txType == TxEip4844 and current.tx.txType == TxEip4844: + let minblobGasFee = current.tx.maxFeePerBlobGas + + (current.tx.maxFeePerBlobGas * MIN_GAS_PRICE_BUMP_PERCENT.u256) div 100.u256 + if added.tx.maxFeePerBlobGas < minblobGasFee: + return err(txErrorReplacementBlobGasTooLow) -proc `$`*(w: TxItemRef): string = - ## Visualise item ID (use for debugging) - "<" & w.itemID.short & ">" + ok() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/core/tx_pool/tx_packer.nim b/nimbus/core/tx_pool/tx_packer.nim index dafa7339a1..7ff59a77e5 100644 --- a/nimbus/core/tx_pool/tx_packer.nim +++ b/nimbus/core/tx_pool/tx_packer.nim @@ -20,25 +20,28 @@ import ../../db/ledger, ../../common/common, ../../utils/utils, - ../../constants, - ".."/[executor, validate, casper], + ../../constants, ../../transaction/call_evm, ../../transaction, ../../evm/state, ../../evm/types, + ../executor, + ../validate, + ../casper, ../eip4844, ../eip6110, ../eip7691, - "."/[tx_desc, tx_item, tx_tabs, tx_tabs/tx_status, tx_info], - tx_tasks/[tx_bucket] + ./tx_desc, + ./tx_item, + ./tx_tabs type TxPacker = object # Packer state vmState: BaseVMState - txDB: TxTabsRef cleanState: bool numBlobPerBlock: int + packedTxs: seq[TxItemRef] # Packer results blockValue: UInt256 @@ -49,16 +52,14 @@ type consolidationReqs: seq[byte] depositReqs: seq[byte] - GrabResult = enum - FetchNextItem - ContinueWithNextAccount - StopCollecting - const receiptsExtensionSize = ##\ ## Number of slots to extend the `receipts[]` at the same time. 20 + ContinueWithNextAccount = true + StopCollecting = false + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -129,14 +130,13 @@ proc runTx(pst: var TxPacker; item: TxItemRef): GasInt = doAssert 0 <= gasUsed gasUsed -proc runTxCommit(pst: var TxPacker; item: TxItemRef; gasBurned: GasInt) - {.gcsafe,raises: [CatchableError].} = +proc runTxCommit(pst: var TxPacker; item: TxItemRef; gasBurned: GasInt) = ## Book keeping after executing argument `item` transaction in the VM. The ## function returns the next number of items `nItems+1`. let vmState = pst.vmState - inx = pst.txDB.byStatus.eq(txItemPacked).nItems - gasTip = item.tx.effectiveGasTip(pst.baseFee) + inx = pst.packedTxs.len + gasTip = item.tx.tip(pst.baseFee) # The gas tip cannot get negative as all items in the `staged` bucket # are vetted for profitability before entering that bucket. @@ -164,24 +164,15 @@ proc runTxCommit(pst: var TxPacker; item: TxItemRef; gasBurned: GasInt) # gasUsed accounting vmState.cumulativeGasUsed += gasBurned vmState.receipts[inx] = vmState.makeReceipt(item.tx.txType) - - # Add the item to the `packed` bucket. This implicitely increases the - # receipts index `inx` at the next visit of this function. - discard pst.txDB.reassign(item,txItemPacked) + pst.packedTxs.add item # ------------------------------------------------------------------------------ # Private functions: packer packerVmExec() helpers # ------------------------------------------------------------------------------ -proc vmExecInit(xp: TxPoolRef): Result[TxPacker, string] - {.gcsafe,raises: [CatchableError].} = - - # Flush `packed` bucket - xp.bucketFlushPacked - +proc vmExecInit(xp: TxPoolRef): Result[TxPacker, string] = let packer = TxPacker( vmState: xp.vmState, - txDB: xp.txDB, numBlobPerBlock: 0, blockValue: 0.u256, stateRoot: xp.vmState.parent.stateRoot, @@ -200,16 +191,11 @@ proc vmExecInit(xp: TxPoolRef): Result[TxPacker, string] ok(packer) -proc vmExecGrabItem(pst: var TxPacker; item: TxItemRef): GrabResult - {.gcsafe,raises: [CatchableError].} = +proc vmExecGrabItem(pst: var TxPacker; item: TxItemRef): bool = ## Greedily collect & compact items as long as the accumulated `gasLimit` ## values are below the maximum block size. let vmState = pst.vmState - if not item.tx.validateChainId(vmState.com.chainId): - discard pst.txDB.dispose(item, txInfoChainIdMismatch) - return ContinueWithNextAccount - # EIP-4844 let maxBlobsPerBlob = getMaxBlobsPerBlock(vmState.fork >= FkPrague) if (pst.numBlobPerBlock + item.tx.versionedHashes.len).uint64 > maxBlobsPerBlob: @@ -257,7 +243,7 @@ proc vmExecGrabItem(pst: var TxPacker; item: TxItemRef): GrabResult # Finish book-keeping and move item to `packed` bucket pst.runTxCommit(item, gasUsed) - FetchNextItem + ContinueWithNextAccount proc vmExecCommit(pst: var TxPacker): Result[void, string] = let @@ -279,8 +265,7 @@ proc vmExecCommit(pst: var TxPacker): Result[void, string] = ledger.persist(clearEmptyAccount = vmState.fork >= FkSpurious) # Update flexi-array, set proper length - let nItems = pst.txDB.byStatus.eq(txItemPacked).nItems - vmState.receipts.setLen(nItems) + vmState.receipts.setLen(pst.packedTxs.len) pst.receiptsRoot = vmState.receipts.calcReceiptsRoot pst.logsBloom = vmState.receipts.createBloom @@ -291,8 +276,7 @@ proc vmExecCommit(pst: var TxPacker): Result[void, string] = # Public functions # ------------------------------------------------------------------------------ -proc packerVmExec*(xp: TxPoolRef): Result[TxPacker, string] - {.gcsafe,raises: [CatchableError].} = +proc setupTxPacker*(xp: TxPoolRef): Result[TxPacker, string] = ## Rebuild `packed` bucket by selection items from the `staged` bucket ## after executing them in the VM. let db = xp.vmState.com.db @@ -302,20 +286,13 @@ proc packerVmExec*(xp: TxPoolRef): Result[TxPacker, string] var pst = xp.vmExecInit.valueOr: return err(error) - block loop: - for (_,nonceList) in xp.txDB.packingOrderAccounts(txItemStaged): - - block account: - for item in nonceList.incNonce: - let rc = pst.vmExecGrabItem(item) - if rc == StopCollecting: - break loop # stop - if rc == ContinueWithNextAccount: - break account # continue with next account + for item in xp.byPriceAndNonce: + let rc = pst.vmExecGrabItem(item) + if rc == StopCollecting: + break ?pst.vmExecCommit() ok(pst) - # Block chain will roll back automatically func getExtraData(com: CommonRef): seq[byte] = if com.extraData.len > 32: @@ -377,6 +354,10 @@ func executionRequests*(pst: var TxPacker): seq[seq[byte]] = result.append(WITHDRAWAL_REQUEST_TYPE, pst.withdrawalReqs) result.append(CONSOLIDATION_REQUEST_TYPE, pst.consolidationReqs) +iterator packedTxs*(pst: TxPacker): TxItemRef = + for item in pst.packedTxs: + yield item + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tabs.nim b/nimbus/core/tx_pool/tx_tabs.nim index 8fb656b6d9..f5319ea622 100644 --- a/nimbus/core/tx_pool/tx_tabs.nim +++ b/nimbus/core/tx_pool/tx_tabs.nim @@ -8,359 +8,88 @@ # at your option. This file may not be copied, modified, or distributed except # according to those terms. -## Transaction Pool Database For Buckets And Waste Basket -## ====================================================== -## - {.push raises: [].} import - std/[tables], - ./tx_info, - ./tx_item, - ./tx_tabs/[tx_sender, tx_rank, tx_status], - eth/common/[transactions, addresses], - stew/[keyed_queue, sorted_set], - results - -export - # bySender/byStatus index operations - sub, eq, ge, gt, le, len, lt, nItems + std/[tables, heapqueue], + eth/common/base, + eth/common/addresses, + eth/common/hashes, + stew/sorted_set, + ../../db/ledger, + ./tx_item type - TxTabsItemsCount* = tuple - pending, staged, packed: int ## sum => total - total: int ## excluding rejects - disposed: int ## waste basket - - TxTabsRef* = ref object ##\ - ## Base descriptor - maxRejects: int ##\ - ## Maximal number of items in waste basket - - # ----- primary tables ------ - - byRejects*: KeyedQueue[Hash32,TxItemRef] ##\ - ## Rejects queue and waste basket, queued by disposal event - - byItemID*: KeyedQueue[Hash32,TxItemRef] ##\ - ## Primary table containing all tx items, queued by arrival event - - # ----- index tables for byItemID ------ - - bySender*: TxSenderTab ##\ - ## Index for byItemID: `sender` > `status` > `nonce` > item - - byStatus*: TxStatusTab ##\ - ## Index for byItemID: `status` > `nonce` > item - - byRank*: TxRankTab ##\ - ## Ranked address table, used for sender address traversal - -const - txTabMaxRejects = ##\ - ## Default size of rejects queue (aka waste basket.) Older waste items will - ## be automatically removed so that there are no more than this many items - ## in the rejects queue. - 500 - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc deleteImpl(xp: TxTabsRef; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - ## Delete transaction (and wrapping container) from the database. If - ## successful, the function returns the wrapping container that was just - ## removed. - if xp.byItemID.delete(item.itemID).isOk: - discard xp.bySender.delete(item) - discard xp.byStatus.delete(item) - - # Update address rank - let rc = xp.bySender.rank(item.sender) - if rc.isOk: - discard xp.byRank.insert(rc.value.TxRank, item.sender) # update - else: - discard xp.byRank.delete(item.sender) - - return true - -proc insertImpl(xp: TxTabsRef; item: TxItemRef): Result[void,TxInfo] - {.gcsafe,raises: [CatchableError].} = - if not xp.bySender.insert(item): - return err(txInfoErrSenderNonceIndex) - - # Insert item - discard xp.byItemID.append(item.itemID,item) - discard xp.byStatus.insert(item) - - # Update address rank - let rank = xp.bySender.rank(item.sender).value.TxRank - discard xp.byRank.insert(rank, item.sender) - - return ok() - -# ------------------------------------------------------------------------------ -# Public functions, constructor -# ------------------------------------------------------------------------------ - -proc new*(T: type TxTabsRef): T {.gcsafe,raises: [].} = - ## Constructor, returns new tx-pool descriptor. - new result - result.maxRejects = txTabMaxRejects - - # result.byItemID -- KeyedQueue, no need to init - # result.byRejects -- KeyedQueue, no need to init - - # index tables - result.bySender.init - result.byStatus.init - result.byRank.init - -# ------------------------------------------------------------------------------ -# Public functions, add/remove entry -# ------------------------------------------------------------------------------ - -proc insert*( - xp: TxTabsRef; - tx: var PooledTransaction; - status = txItemPending; - info = ""): Result[void,TxInfo] - {.gcsafe,raises: [CatchableError].} = - ## Add new transaction argument `tx` to the database. If accepted and added - ## to the database, a `key` value is returned which can be used to retrieve - ## this transaction direcly via `tx[key].tx`. The following holds for the - ## returned `key` value (see `[]` below for details): - ## :: - ## xp[key].id == key # id: transaction key stored in the wrapping container - ## tx.toKey == key # holds as long as tx is not modified - ## - ## Adding the transaction will be rejected if the transaction key `tx.toKey` - ## exists in the database already. - ## - ## CAVEAT: - ## The returned transaction key `key` for the transaction `tx` is - ## recoverable as `tx.toKey` only while the trasaction remains unmodified. - ## - let itemID = tx.itemID - if xp.byItemID.hasKey(itemID): - return err(txInfoErrAlreadyKnown) - var item: TxItemRef - block: - let rc = TxItemRef.new(tx, itemID, status, info) - if rc.isErr: - return err(txInfoErrInvalidSender) - item = rc.value - block: - let rc = xp.insertImpl(item) - if rc.isErr: - return rc - ok() - -proc insert*(xp: TxTabsRef; item: TxItemRef): Result[void,TxInfo] - {.gcsafe,raises: [CatchableError].} = - ## Variant of `insert()` with fully qualified `item` argument. - if xp.byItemID.hasKey(item.itemID): - return err(txInfoErrAlreadyKnown) - return xp.insertImpl(item.dup) - - -proc reassign*(xp: TxTabsRef; item: TxItemRef; status: TxItemStatus): bool - {.gcsafe,raises: [CatchableError].} = - ## Variant of `reassign()` for the `TxItemStatus` flag. - # make sure that the argument `item` is not some copy - let rc = xp.byItemID.eq(item.itemID) - if rc.isOk: - var realItem = rc.value - if realItem.status != status: - discard xp.bySender.delete(realItem) # delete original - discard xp.byStatus.delete(realItem) - realItem.status = status - discard xp.bySender.insert(realItem) # re-insert changed - discard xp.byStatus.insert(realItem) - return true - - -proc flushRejects*(xp: TxTabsRef; maxItems = int.high): (int,int) = - ## Flush/delete at most `maxItems` oldest items from the waste basket and - ## return the numbers of deleted and remaining items (a waste basket item - ## is considered older if it was moved there earlier.) - if xp.byRejects.len <= maxItems: - result[0] = xp.byRejects.len - xp.byRejects.clear - return # result - while result[0] < maxItems: - if xp.byRejects.shift.isErr: - break - result[0].inc - result[1] = xp.byRejects.len - - -proc dispose*(xp: TxTabsRef; item: TxItemRef; reason: TxInfo): bool - {.gcsafe,raises: [KeyError].} = - ## Move argument `item` to rejects queue (aka waste basket.) - if xp.deleteImpl(item): - if xp.maxRejects <= xp.byRejects.len: - discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects) - item.reject = reason - xp.byRejects[item.itemID] = item - return true - -proc reject*(xp: TxTabsRef; tx: var PooledTransaction; - reason: TxInfo; status = txItemPending; info = "") = - ## Similar to dispose but for a tx without the item wrapper, the function - ## imports the tx into the waste basket (e.g. after it could not - ## be inserted.) - if xp.maxRejects <= xp.byRejects.len: - discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects) - let item = TxItemRef.new(tx, reason, status, info) - xp.byRejects[item.itemID] = item - -proc reject*(xp: TxTabsRef; item: TxItemRef; reason: TxInfo) = - ## Variant of `reject()` with `item` rather than `tx` (assuming - ## `item` is not in the database.) - if xp.maxRejects <= xp.byRejects.len: - discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects) - item.reject = reason - xp.byRejects[item.itemID] = item - -proc reject*(xp: TxTabsRef; tx: PooledTransaction; - reason: TxInfo; status = txItemPending; info = "") = - ## Variant of `reject()` - var ty = tx - xp.reject(ty, reason, status) - -# ------------------------------------------------------------------------------ -# Public getters -# ------------------------------------------------------------------------------ - -proc baseFee*(xp: TxTabsRef): GasInt = - ## Getter - xp.bySender.baseFee - -proc maxRejects*(xp: TxTabsRef): int = - ## Getter - xp.maxRejects - -# ------------------------------------------------------------------------------ -# Public functions, setters -# ------------------------------------------------------------------------------ - -proc `baseFee=`*(xp: TxTabsRef; val: GasInt) - {.gcsafe,raises: [KeyError].} = - ## Setter, update may cause database re-org - if xp.bySender.baseFee != val: - xp.bySender.baseFee = val - # Build new rank table - xp.byRank.clear - for (address,rank) in xp.bySender.accounts: - discard xp.byRank.insert(rank.TxRank, address) - - -proc `maxRejects=`*(xp: TxTabsRef; val: int) = - ## Setter, applicable with next `reject()` invocation. - xp.maxRejects = val - -# ------------------------------------------------------------------------------ -# Public functions, miscellaneous -# ------------------------------------------------------------------------------ - -proc hasTx*(xp: TxTabsRef; tx: Transaction): bool = - ## Returns `true` if the argument pair `(key,local)` exists in the - ## database. - ## - ## If this function returns `true`, then it is save to use the `xp[key]` - ## paradigm for accessing a transaction container. - xp.byItemID.hasKey(tx.itemID) - -proc nItems*(xp: TxTabsRef): TxTabsItemsCount = - result.pending = xp.byStatus.eq(txItemPending).nItems - result.staged = xp.byStatus.eq(txItemStaged).nItems - result.packed = xp.byStatus.eq(txItemPacked).nItems - result.total = xp.byItemID.len - result.disposed = xp.byRejects.len - -# ------------------------------------------------------------------------------ -# Public iterators, `TxRank` > `(Address,TxStatusNonceRef)` -# ------------------------------------------------------------------------------ - -iterator incAccount*(xp: TxTabsRef; bucket: TxItemStatus; - fromRank = TxRank.low): (Address,TxStatusNonceRef) - {.gcsafe,raises: [KeyError].} = - ## Walk accounts with increasing ranks and return a nonce-ordered item list. - let rcBucket = xp.byStatus.eq(bucket) - if rcBucket.isOk: - let bucketList = xp.byStatus.eq(bucket).value.data - - var rcRank = xp.byRank.ge(fromRank) - while rcRank.isOk: - let (rank, addrList) = (rcRank.value.key, rcRank.value.data) - - # Use adresses for this rank which are also found in the bucket - for account in addrList.keys: - let rcAccount = bucketList.eq(account) - if rcAccount.isOk: - yield (account, rcAccount.value.data) - - # Get next ranked address list (top down index walk) - rcRank = xp.byRank.gt(rank) # potenially modified database - - -iterator decAccount*(xp: TxTabsRef; bucket: TxItemStatus; - fromRank = TxRank.high): (Address,TxStatusNonceRef) - {.gcsafe,raises: [KeyError].} = - ## Walk accounts with decreasing ranks and return the nonce-ordered item list. - let rcBucket = xp.byStatus.eq(bucket) - if rcBucket.isOk: - let bucketList = xp.byStatus.eq(bucket).value.data - - var rcRank = xp.byRank.le(fromRank) - while rcRank.isOk: - let (rank, addrList) = (rcRank.value.key, rcRank.value.data) - - # Use adresses for this rank which are also found in the bucket - for account in addrList.keys: - let rcAccount = bucketList.eq(account) - if rcAccount.isOk: - yield (account, rcAccount.value.data) - - # Get next ranked address list (top down index walk) - rcRank = xp.byRank.lt(rank) # potenially modified database - -iterator packingOrderAccounts*(xp: TxTabsRef; bucket: TxItemStatus): - (Address,TxStatusNonceRef) - {.gcsafe,raises: [KeyError].} = - ## Loop over accounts from a particular bucket ordered by - ## For the `txItemStaged` bucket, this iterator defines the packing order - ## for transactions (important when calculationg the *txRoot*.) - for (account,nonceList) in xp.decAccount(bucket): - yield (account,nonceList) - -# ----------------------------------------------------------------------------- -# Public second stage iterators: nonce-ordered item lists. -# ----------------------------------------------------------------------------- - -iterator incNonce*(nonceList: TxSenderNonceRef; - nonceFrom = AccountNonce.low): TxItemRef = - ## Second stage iterator inside `incAccount()` or `decAccount()`. The - ## items visited are always sorted by least-nonce first. - var rc = nonceList.ge(nonceFrom) - while rc.isOk: - let (nonce, item) = (rc.value.key, rc.value.data) - yield item - rc = nonceList.gt(nonce) # potenially modified database - - -iterator incNonce*(nonceList: TxStatusNonceRef; - nonceFrom = AccountNonce.low): TxItemRef = - ## Variant of `incNonce()` for the `TxStatusNonceRef` list. - var rc = nonceList.ge(nonceFrom) - while rc.isOk: - let (nonce, item) = (rc.value.key, rc.value.data) - yield item - rc = nonceList.gt(nonce) # potenially modified database - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ + SenderNonceList* = SortedSet[AccountNonce, TxItemRef] + + TxSenderNonceRef* = ref object + ## Sub-list ordered by `AccountNonce` values containing transaction + ## item lists. + list*: SenderNonceList + + TxSenderTab* = Table[Address, TxSenderNonceRef] + + TxIdTab* = Table[Hash32, TxItemRef] + +func init*(_ : type TxSenderNonceRef): TxSenderNonceRef = + TxSenderNonceRef(list: SenderNonceList.init()) + +template insertOrReplace*(sn: TxSenderNonceRef, item: TxItemRef) = + sn.list.findOrInsert(item.nonce). + expect("insert txitem ok").data = item + +func last*(sn: TxSenderNonceRef): auto = + sn.list.le(AccountNonce.high) + +func len*(sn: TxSenderNonceRef): auto = + sn.list.len + +iterator byPriceAndNonce*(senderTab: TxSenderTab, + ledger: LedgerRef, + baseFee: GasInt): TxItemRef = + template removeFirstAndPushTo(sn, byPrice) = + let rc = sn.list.ge(AccountNonce.low).valueOr: + continue + discard sn.list.delete(rc.data.nonce) + byPrice.push(rc.data) + + var byNonce: TxSenderTab + for address, sn in senderTab: + # Check if the account nonce matches the lowest known tx nonce + var + nonce = ledger.getNonce(address) + rc = sn.list.eq(nonce) + sortedByNonce: TxSenderNonceRef + + while rc.isOk: + let item = rc.get.data + item.calculatePrice(baseFee) + if item.nonce != nonce: + # a gap in nonce, stop collecting + break + + if sortedByNonce.isNil: + sortedByNonce = TxSenderNonceRef.init() + byNonce[address] = sortedByNonce + + sortedByNonce.insertOrReplace(item) + nonce = item.nonce + 1 + rc = sn.list.eq(nonce) + + # HeapQueue needs `<` to be overloaded for custom object + # and in this case, we want to pop highest price first + func `<`(a, b: TxItemRef): bool {.used.} = a.price > b.price + var byPrice = initHeapQueue[TxItemRef]() + for _, sn in byNonce: + sn.removeFirstAndPushTo(byPrice) + + while byPrice.len > 0: + # Retrieve the next best transaction by price + let best = byPrice.pop() + + # Push in its place the next transaction from the same account + let accTxs = byNonce.getOrDefault(best.sender) + if accTxs.isNil.not and accTxs.len > 0: + accTxs.removeFirstAndPushTo(byPrice) + + yield best diff --git a/nimbus/core/tx_pool/tx_tabs/tx_rank.nim b/nimbus/core/tx_pool/tx_tabs/tx_rank.nim deleted file mode 100644 index cec5899349..0000000000 --- a/nimbus/core/tx_pool/tx_tabs/tx_rank.nim +++ /dev/null @@ -1,158 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Table: `rank` ~ `sender` -## ========================================= -## - -{.push raises: [].} - -import - std/[tables], - eth/common, - stew/[sorted_set], - results - - -type - TxRank* = ##\ - ## Order relation, determins how the `Addresses` are ranked - distinct int64 - - TxRankAddrRef* = ##\ - ## Set of adresses having the same rank. - TableRef[Address,TxRank] - - TxRankTab* = object ##\ - ## Descriptor for `TxRank` <-> `Address` mapping. - rankList: SortedSet[TxRank,TxRankAddrRef] - addrTab: Table[Address,TxRank] - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc cmp(a,b: TxRank): int {.borrow.} - ## mixin for SortedSet - -proc `==`(a,b: TxRank): bool {.borrow.} - -# ------------------------------------------------------------------------------ -# Public constructor -# ------------------------------------------------------------------------------ - -proc init*(rt: var TxRankTab) = - ## Constructor - rt.rankList.init - -proc clear*(rt: var TxRankTab) = - ## Flush tables - rt.rankList.clear - rt.addrTab.clear - -# ------------------------------------------------------------------------------ -# Public functions, base management operations -# ------------------------------------------------------------------------------ - -proc insert*(rt: var TxRankTab; rank: TxRank; sender: Address): bool - {.gcsafe,raises: [KeyError].} = - ## Add or update a new ranked address. This function returns `true` it the - ## address exists already with the current rank. - - # Does this address exists already? - if rt.addrTab.hasKey(sender): - let oldRank = rt.addrTab[sender] - if oldRank == rank: - return false - - # Delete address from oldRank address set - let oldRankSet = rt.rankList.eq(oldRank).value.data - if 1 < oldRankSet.len: - oldRankSet.del(sender) - else: - discard rt.rankList.delete(oldRank) - - # Add new ranked address - var newRankSet: TxRankAddrRef - let rc = rt.rankList.insert(rank) - if rc.isOk: - newRankSet = newTable[Address,TxRank](1) - rc.value.data = newRankSet - else: - newRankSet = rt.rankList.eq(rank).value.data - - newRankSet[sender] = rank - rt.addrTab[sender] = rank - true - - -proc delete*(rt: var TxRankTab; sender: Address): bool - {.gcsafe,raises: [KeyError].} = - ## Delete argument address `sender` from rank table. - if rt.addrTab.hasKey(sender): - let - rankNum = rt.addrTab[sender] - rankSet = rt.rankList.eq(rankNum).value.data - - # Delete address from oldRank address set - if 1 < rankSet.len: - rankSet.del(sender) - else: - discard rt.rankList.delete(rankNum) - - rt.addrTab.del(sender) - return true - -# ------------------------------------------------------------------------------ -# Public functions: `TxRank` > `Address` -# ------------------------------------------------------------------------------ - -proc len*(rt: var TxRankTab): int = - ## Number of ranks available - rt.rankList.len - -proc eq*(rt: var TxRankTab; rank: TxRank): - SortedSetResult[TxRank,TxRankAddrRef] = - rt.rankList.eq(rank) - -proc ge*(rt: var TxRankTab; rank: TxRank): - SortedSetResult[TxRank,TxRankAddrRef] = - rt.rankList.ge(rank) - -proc gt*(rt: var TxRankTab; rank: TxRank): - SortedSetResult[TxRank,TxRankAddrRef] = - rt.rankList.gt(rank) - -proc le*(rt: var TxRankTab; rank: TxRank): - SortedSetResult[TxRank,TxRankAddrRef] = - rt.rankList.le(rank) - -proc lt*(rt: var TxRankTab; rank: TxRank): - SortedSetResult[TxRank,TxRankAddrRef] = - rt.rankList.lt(rank) - -# ------------------------------------------------------------------------------ -# Public functions: `Address` > `TxRank` -# ------------------------------------------------------------------------------ - -proc nItems*(rt: var TxRankTab): int = - ## Total number of address items registered - rt.addrTab.len - -proc eq*(rt: var TxRankTab; sender: Address): - SortedSetResult[Address,TxRank] - {.gcsafe,raises: [KeyError].} = - if rt.addrTab.hasKey(sender): - return toSortedSetResult(key = sender, data = rt.addrTab[sender]) - err(rbNotFound) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tabs/tx_sender.nim b/nimbus/core/tx_pool/tx_tabs/tx_sender.nim deleted file mode 100644 index abbdf0fcb0..0000000000 --- a/nimbus/core/tx_pool/tx_tabs/tx_sender.nim +++ /dev/null @@ -1,432 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -{.push raises: [].} - -## Transaction Pool Table: `Sender` > `status` | all > `nonce` -## =========================================================== -## - -import - ../tx_item, - eth/common/transaction_utils, - stew/[keyed_queue, sorted_set], - results, - ../../eip4844 - -type - TxSenderNonceRef* = ref object ##\ - ## Sub-list ordered by `AccountNonce` values containing transaction\ - ## item lists. - gasLimits: GasInt ## Accumulated gas limits - profit: GasInt ## Aggregated `effectiveGasTip*gasLimit` values - nonceList: SortedSet[AccountNonce,TxItemRef] - - TxSenderSchedRef* = ref object ##\ - ## For a sender, items can be accessed by *nonce*, or *status,nonce*. - size: int ## Total number of items - statusList: array[TxItemStatus,TxSenderNonceRef] - allList: TxSenderNonceRef - - TxSenderTab* = object ##\ - ## Per address table This is table provided as a keyed queue so deletion\ - ## while traversing is supported and predictable. - size: int ## Total number of items - baseFee: GasInt ## For aggregating `effectiveGasTip` => `gasTipSum` - addrList: KeyedQueue[Address,TxSenderSchedRef] - - TxSenderSchedule* = enum ##\ - ## Generalised key for sub-list to be used in `TxSenderNoncePair` - txSenderAny = 0 ## All entries status (aka bucket name) ... - txSenderPending - txSenderStaged - txSenderPacked - - TxSenderInx = object ##\ - ## Internal access data - schedData: TxSenderSchedRef - statusNonce: TxSenderNonceRef ## status items sub-list - allNonce: TxSenderNonceRef ## all items sub-list - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc `$`(rq: TxSenderSchedRef): string = - ## Needed by `rq.verify()` for printing error messages - var n = 0 - for status in TxItemStatus: - if not rq.statusList[status].isNil: - n.inc - $n - -proc nActive(rq: TxSenderSchedRef): int = - ## Number of non-nil items - for status in TxItemStatus: - if not rq.statusList[status].isNil: - result.inc - -func toSenderSchedule(status: TxItemStatus): TxSenderSchedule = - case status - of txItemPending: - return txSenderPending - of txItemStaged: - return txSenderStaged - of txItemPacked: - return txSenderPacked - -proc getRank(schedData: TxSenderSchedRef): int64 = - ## Rank calculator - let pendingData = schedData.statusList[txItemPending] - - var - maxProfit = schedData.allList.profit - gasLimits = schedData.allList.gasLimits - if not pendingData.isNil: - maxProfit -= pendingData.profit - gasLimits -= pendingData.gasLimits - - if gasLimits <= 0: - return int64.low - let profit = maxProfit div gasLimits - - # Beware of under/overflow - if int64.high.GasInt < profit: - return int64.high - - profit.int64 - -proc maxProfit(item: TxItemRef; baseFee: GasInt): GasInt = - ## Profit calculator - item.tx.gasLimit * item.tx.effectiveGasTip(baseFee) + item.tx.getTotalBlobGas - -proc recalcProfit(nonceData: TxSenderNonceRef; baseFee: GasInt) = - ## Re-calculate profit value depending on `baseFee` - nonceData.profit = 0 - var rc = nonceData.nonceList.ge(AccountNonce.low) - while rc.isOk: - let item = rc.value.data - nonceData.profit += item.maxProfit(baseFee) - rc = nonceData.nonceList.gt(item.tx.nonce) - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc mkInxImpl(gt: var TxSenderTab; item: TxItemRef): Result[TxSenderInx,void] - {.gcsafe,raises: [KeyError].} = - var inxData: TxSenderInx - - if gt.addrList.hasKey(item.sender): - inxData.schedData = gt.addrList[item.sender] - else: - new inxData.schedData - gt.addrList[item.sender] = inxData.schedData - - # all items sub-list - if inxData.schedData.allList.isNil: - new inxData.allNonce - inxData.allNonce.nonceList.init - inxData.schedData.allList = inxData.allNonce - else: - inxData.allNonce = inxData.schedData.allList - let rc = inxData.allNonce.nonceList.insert(item.tx.nonce) - if rc.isErr: - return err() - rc.value.data = item - - # by status items sub-list - if inxData.schedData.statusList[item.status].isNil: - new inxData.statusNonce - inxData.statusNonce.nonceList.init - inxData.schedData.statusList[item.status] = inxData.statusNonce - else: - inxData.statusNonce = inxData.schedData.statusList[item.status] - # this is a new item, checked at `all items sub-list` above - inxData.statusNonce.nonceList.insert(item.tx.nonce).value.data = item - - return ok(inxData) - - -proc getInxImpl(gt: var TxSenderTab; item: TxItemRef): Result[TxSenderInx,void] - {.gcsafe,raises: [KeyError].} = - - var inxData: TxSenderInx - if not gt.addrList.hasKey(item.sender): - return err() - - # Sub-lists are non-nil as `TxSenderSchedRef` cannot be empty - inxData.schedData = gt.addrList[item.sender] - - # by status items sub-list - inxData.statusNonce = inxData.schedData.statusList[item.status] - - # all items sub-list - inxData.allNonce = inxData.schedData.allList - - ok(inxData) - -# ------------------------------------------------------------------------------ -# Public constructor -# ------------------------------------------------------------------------------ - -proc init*(gt: var TxSenderTab) = - ## Constructor - gt.size = 0 - gt.addrList.init - -# ------------------------------------------------------------------------------ -# Public functions, base management operations -# ------------------------------------------------------------------------------ - -proc insert*(gt: var TxSenderTab; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - ## Add transaction `item` to the list. The function has no effect if the - ## transaction exists, already. - let rc = gt.mkInxImpl(item) - if rc.isOk: - let - inx = rc.value - tip = item.maxProfit(gt.baseFee) - gt.size.inc - - inx.schedData.size.inc - - inx.statusNonce.gasLimits += item.tx.gasLimit - inx.statusNonce.profit += tip - - inx.allNonce.gasLimits += item.tx.gasLimit - inx.allNonce.profit += tip - return true - - -proc delete*(gt: var TxSenderTab; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - let rc = gt.getInxImpl(item) - if rc.isOk: - let - inx = rc.value - tip = item.maxProfit(gt.baseFee) - gt.size.dec - - inx.schedData.size.dec - - discard inx.allNonce.nonceList.delete(item.tx.nonce) - if inx.allNonce.nonceList.len == 0: - # this was the last nonce for that sender account - discard gt.addrList.delete(item.sender) - return true - - inx.allNonce.gasLimits -= item.tx.gasLimit - inx.allNonce.profit -= tip - - discard inx.statusNonce.nonceList.delete(item.tx.nonce) - if inx.statusNonce.nonceList.len == 0: - inx.schedData.statusList[item.status] = nil - return true - - inx.statusNonce.gasLimits -= item.tx.gasLimit - inx.statusNonce.profit -= tip - return true - -# ------------------------------------------------------------------------------ -# Public getters -# ------------------------------------------------------------------------------ - -proc baseFee*(gt: var TxSenderTab): GasInt = - ## Getter - gt.baseFee - -# ------------------------------------------------------------------------------ -# Public functions, setters -# ------------------------------------------------------------------------------ - -proc `baseFee=`*(gt: var TxSenderTab; val: GasInt) = - ## Setter. When invoked, there is *always* a re-calculation of the profit - ## values stored with the sender address. - gt.baseFee = val - - for p in gt.addrList.nextPairs: - let schedData = p.data - - # statusList[] - for status in TxItemStatus: - let statusData = schedData.statusList[status] - if not statusData.isNil: - statusData.recalcProfit(val) - - # allList - schedData.allList.recalcProfit(val) - -# ------------------------------------------------------------------------------ -# Public SortedSet ops -- `Address` (level 0) -# ------------------------------------------------------------------------------ - -proc len*(gt: var TxSenderTab): int = - gt.addrList.len - -proc nItems*(gt: var TxSenderTab): int = - ## Getter, total number of items in the list - gt.size - - -proc rank*(gt: var TxSenderTab; sender: Address): Result[int64,void] - {.gcsafe,raises: [KeyError].} = - ## The *rank* of the `sender` argument address is the - ## :: - ## maxProfit() / gasLimits() - ## - ## calculated over all items of the `staged` and `packed` buckets. - ## - if gt.addrList.hasKey(sender): - return ok(gt.addrList[sender].getRank) - err() - - -proc eq*(gt: var TxSenderTab; sender: Address): - SortedSetResult[Address,TxSenderSchedRef] - {.gcsafe,raises: [KeyError].} = - if gt.addrList.hasKey(sender): - return toSortedSetResult(key = sender, data = gt.addrList[sender]) - err(rbNotFound) - -# ------------------------------------------------------------------------------ -# Public array ops -- `TxSenderSchedule` (level 1) -# ------------------------------------------------------------------------------ - -proc len*(schedData: TxSenderSchedRef): int = - schedData.nActive - - -proc nItems*(schedData: TxSenderSchedRef): int = - ## Getter, total number of items in the sub-list - schedData.size - -proc nItems*(rc: SortedSetResult[Address,TxSenderSchedRef]): int = - if rc.isOk: - return rc.value.data.nItems - 0 - - -proc eq*(schedData: TxSenderSchedRef; status: TxItemStatus): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - ## Return by status sub-list - let nonceData = schedData.statusList[status] - if nonceData.isNil: - return err(rbNotFound) - toSortedSetResult(key = status.toSenderSchedule, data = nonceData) - -proc eq*(rc: SortedSetResult[Address,TxSenderSchedRef]; - status: TxItemStatus): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - ## Return by status sub-list - if rc.isOk: - return rc.value.data.eq(status) - err(rc.error) - - -proc sub*(schedData: TxSenderSchedRef): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - ## Return all-entries sub-list - let nonceData = schedData.allList - if nonceData.isNil: - return err(rbNotFound) - toSortedSetResult(key = txSenderAny, data = nonceData) - -proc sub*(rc: SortedSetResult[Address,TxSenderSchedRef]): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - ## Return all-entries sub-list - if rc.isOk: - return rc.value.data.sub - err(rc.error) - - -proc eq*(schedData: TxSenderSchedRef; - key: TxSenderSchedule): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - ## Variant of `eq()` using unified key schedule - case key - of txSenderAny: - return schedData.sub - of txSenderPending: - return schedData.eq(txItemPending) - of txSenderStaged: - return schedData.eq(txItemStaged) - of txSenderPacked: - return schedData.eq(txItemPacked) - -proc eq*(rc: SortedSetResult[Address,TxSenderSchedRef]; - key: TxSenderSchedule): - SortedSetResult[TxSenderSchedule,TxSenderNonceRef] = - if rc.isOk: - return rc.value.data.eq(key) - err(rc.error) - -# ------------------------------------------------------------------------------ -# Public SortedSet ops -- `AccountNonce` (level 2) -# ------------------------------------------------------------------------------ - -proc nItems*(nonceData: TxSenderNonceRef): int = - ## Getter, total number of items in the sub-list - nonceData.nonceList.len - -proc nItems*(rc: SortedSetResult[TxSenderSchedule,TxSenderNonceRef]): int = - if rc.isOk: - return rc.value.data.nItems - 0 - -proc eq*(nonceData: TxSenderNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.eq(nonce) - -proc eq*(rc: SortedSetResult[TxSenderSchedule,TxSenderNonceRef]; - nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.eq(nonce) - err(rc.error) - - -proc ge*(nonceData: TxSenderNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.ge(nonce) - -proc ge*(rc: SortedSetResult[TxSenderSchedule,TxSenderNonceRef]; - nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.ge(nonce) - err(rc.error) - - -proc gt*(nonceData: TxSenderNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.gt(nonce) - -proc gt*(rc: SortedSetResult[TxSenderSchedule,TxSenderNonceRef]; - nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.gt(nonce) - err(rc.error) - -# ------------------------------------------------------------------------------ -# Public iterators -# ------------------------------------------------------------------------------ - -iterator accounts*(gt: var TxSenderTab): (Address,int64) = - ## Sender account traversal, returns the account address and the rank - ## for that account. - for p in gt.addrList.nextPairs: - yield (p.key, p.data.getRank) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tabs/tx_status.nim b/nimbus/core/tx_pool/tx_tabs/tx_status.nim deleted file mode 100644 index e197cf2b86..0000000000 --- a/nimbus/core/tx_pool/tx_tabs/tx_status.nim +++ /dev/null @@ -1,240 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Table: `status` > `nonce` -## ========================================== -## - -import - ../tx_item, - eth/common, - stew/[keyed_queue, sorted_set], - results - -{.push raises: [].} - -type - TxStatusNonceRef* = ref object ##\ - ## Sub-list ordered by `AccountNonce` or `TxItemRef` insertion order. - nonceList: SortedSet[AccountNonce,TxItemRef] - - TxStatusSenderRef* = ref object ##\ - ## Per address table. This table is provided as a keyed queue so deletion\ - ## while traversing is supported and predictable. - size: int ## Total number of items - addrList: KeyedQueue[Address,TxStatusNonceRef] - - TxStatusTab* = object ##\ - ## Per status table - size: int ## Total number of items - statusList: array[TxItemStatus,TxStatusSenderRef] - - TxStatusInx = object ##\ - ## Internal access data - addrData: TxStatusSenderRef - nonceData: TxStatusNonceRef - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc `$`(rq: TxStatusNonceRef): string {.gcsafe, raises: [].} = - ## Needed by `rq.verify()` for printing error messages - $rq.nonceList.len - -proc nActive(sq: TxStatusTab): int {.gcsafe, raises: [].} = - ## Number of non-nil items - for status in TxItemStatus: - if not sq.statusList[status].isNil: - result.inc - -proc mkInxImpl(sq: var TxStatusTab; item: TxItemRef): Result[TxStatusInx,void] - {.gcsafe,raises: [KeyError].} = - ## Fails if item exists, already - var inx: TxStatusInx - - # array of buckets (aka status) => senders - inx.addrData = sq.statusList[item.status] - if inx.addrData.isNil: - new inx.addrData - inx.addrData.addrList.init - sq.statusList[item.status] = inx.addrData - - # sender address sub-list => nonces - if inx.addrData.addrList.hasKey(item.sender): - inx.nonceData = inx.addrData.addrList[item.sender] - else: - new inx.nonceData - inx.nonceData.nonceList.init - inx.addrData.addrList[item.sender] = inx.nonceData - - # nonce sublist - let rc = inx.nonceData.nonceList.insert(item.tx.nonce) - if rc.isErr: - return err() - rc.value.data = item - - return ok(inx) - - -proc getInxImpl(sq: var TxStatusTab; item: TxItemRef): Result[TxStatusInx,void] - {.gcsafe,raises: [KeyError].} = - var inx: TxStatusInx - - # array of buckets (aka status) => senders - inx.addrData = sq.statusList[item.status] - if inx.addrData.isNil: - return err() - - # sender address sub-list => nonces - if not inx.addrData.addrList.hasKey(item.sender): - return err() - inx.nonceData = inx.addrData.addrList[item.sender] - - ok(inx) - -# ------------------------------------------------------------------------------ -# Public all-queue helpers -# ------------------------------------------------------------------------------ - -proc init*(sq: var TxStatusTab; size = 10) {.gcsafe, raises: [].} = - ## Optional constructor - sq.size = 0 - sq.statusList.reset - - -proc insert*(sq: var TxStatusTab; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - ## Add transaction `item` to the list. The function has no effect if the - ## transaction exists, already (apart from returning `false`.) - let rc = sq.mkInxImpl(item) - if rc.isOk: - let inx = rc.value - sq.size.inc - inx.addrData.size.inc - return true - - -proc delete*(sq: var TxStatusTab; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - let rc = sq.getInxImpl(item) - if rc.isOk: - let inx = rc.value - - sq.size.dec - inx.addrData.size.dec - - discard inx.nonceData.nonceList.delete(item.tx.nonce) - if inx.nonceData.nonceList.len == 0: - discard inx.addrData.addrList.delete(item.sender) - - if inx.addrData.addrList.len == 0: - sq.statusList[item.status] = nil - - return true - -# ------------------------------------------------------------------------------ -# Public array ops -- `TxItemStatus` (level 0) -# ------------------------------------------------------------------------------ - -proc len*(sq: var TxStatusTab): int = - sq.nActive - -proc nItems*(sq: var TxStatusTab): int = - ## Getter, total number of items in the list - sq.size - -proc eq*(sq: var TxStatusTab; status: TxItemStatus): - SortedSetResult[TxItemStatus,TxStatusSenderRef] = - let addrData = sq.statusList[status] - if addrData.isNil: - return err(rbNotFound) - toSortedSetResult(key = status, data = addrData) - -# ------------------------------------------------------------------------------ -# Public array ops -- `Address` (level 1) -# ------------------------------------------------------------------------------ - -proc nItems*(addrData: TxStatusSenderRef): int = - ## Getter, total number of items in the sub-list - addrData.size - -proc nItems*(rc: SortedSetResult[TxItemStatus,TxStatusSenderRef]): int = - if rc.isOk: - return rc.value.data.nItems - 0 - -proc eq*(addrData: TxStatusSenderRef; sender: Address): - SortedSetResult[Address,TxStatusNonceRef] - {.gcsafe,raises: [KeyError].} = - if addrData.addrList.hasKey(sender): - return toSortedSetResult(key = sender, data = addrData.addrList[sender]) - err(rbNotFound) - -proc eq*(rc: SortedSetResult[TxItemStatus,TxStatusSenderRef]; - sender: Address): SortedSetResult[Address,TxStatusNonceRef] - {.gcsafe,raises: [KeyError].} = - if rc.isOk: - return rc.value.data.eq(sender) - err(rc.error) - -# ------------------------------------------------------------------------------ -# Public array ops -- `AccountNonce` (level 2) -# ------------------------------------------------------------------------------ - -proc len*(nonceData: TxStatusNonceRef): int = - ## Getter, same as `nItems` (for last level list) - nonceData.nonceList.len - -proc nItems*(nonceData: TxStatusNonceRef): int = - ## Getter, total number of items in the sub-list - nonceData.nonceList.len - -proc nItems*(rc: SortedSetResult[Address,TxStatusNonceRef]): int = - if rc.isOk: - return rc.value.data.nItems - 0 - - -proc eq*(nonceData: TxStatusNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.eq(nonce) - -proc eq*(rc: SortedSetResult[Address,TxStatusNonceRef]; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.eq(nonce) - err(rc.error) - - -proc ge*(nonceData: TxStatusNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.ge(nonce) - -proc ge*(rc: SortedSetResult[Address,TxStatusNonceRef]; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.ge(nonce) - err(rc.error) - - -proc gt*(nonceData: TxStatusNonceRef; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - nonceData.nonceList.gt(nonce) - -proc gt*(rc: SortedSetResult[Address,TxStatusNonceRef]; nonce: AccountNonce): - SortedSetResult[AccountNonce,TxItemRef] = - if rc.isOk: - return rc.value.data.gt(nonce) - err(rc.error) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_add.nim b/nimbus/core/tx_pool/tx_tasks/tx_add.nim deleted file mode 100644 index 0275eea590..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_add.nim +++ /dev/null @@ -1,223 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklet: Add Transaction -## ========================================= -## - -import - std/[tables], - ../tx_desc, - ../tx_info, - ../tx_item, - ../tx_tabs, - ./tx_classify, - ./tx_recover, - chronicles, - eth/common/[transactions, addresses, keys], - stew/[keyed_queue, sorted_set], - ../../eip4844 - -{.push raises: [].} - -type - TxAddStats* = tuple ##\ - ## Status code returned from the `addTxs()` function - - stagedIndicator: bool ##\ - ## If `true`, this value indicates that at least one item was added to\ - ## the `staged` bucket (which suggest a re-run of the packer.) - - topItems: seq[TxItemRef] ##\ - ## For each sender where txs were added to the bucket database or waste\ - ## basket, this list keeps the items with the highest nonce (handy for\ - ## chasing nonce gaps after a back-move of the block chain head.) - - NonceList = ##\ - ## Temporary sorter list - SortedSet[AccountNonce,TxItemRef] - - AccountNonceTab = ##\ - ## Temporary sorter table - Table[Address,NonceList] - -logScope: - topics = "tx-pool add transaction" - -# ------------------------------------------------------------------------------ -# Private helper -# ------------------------------------------------------------------------------ - -proc getItemList(tab: var AccountNonceTab; key: Address): var NonceList - {.gcsafe,raises: [KeyError].} = - if not tab.hasKey(key): - tab[key] = NonceList.init - tab[key] - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc supersede(xp: TxPoolRef; item: TxItemRef): Result[void,TxInfo] - {.gcsafe,raises: [CatchableError].} = - - var current: TxItemRef - - block: - let rc = xp.txDB.bySender.eq(item.sender).sub.eq(item.tx.nonce) - if rc.isErr: - return err(txInfoErrUnspecified) - current = rc.value.data - - # TODO: To unblock `ethpandaops/ethereum-package` based testing, - # we have to accept superseding transactions temporarily until `rpc_utils.nim` - # supports the 'pending' tag by incorporating pending transactions from the - # mempool when returning the current account nonce. Until that is fixed, - # we keep telling the transaction spammer that their nonce has not changed, - # and it keeps spamming transactions with the same nonce repeatedly. - # Note: When this is fixed, update `tests/test_txpool.nim` and - # re-enable the "Superseding txs with sender and nonce variants" test case. - if false: - let bumpPrice = (current.tx.gasPrice * xp.priceBump.GasInt + 99) div 100 - if item.tx.gasPrice < current.tx.gasPrice + bumpPrice: - return err(txInfoErrReplaceUnderpriced) - - # make space, delete item - if not xp.txDB.dispose(current, txInfoSenderNonceSuperseded): - return err(txInfoErrVoidDisposal) - - # try again - block: - let rc = xp.txDB.insert(item) - if rc.isErr: - return err(rc.error) - - return ok() - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc addTx*(xp: TxPoolRef; item: TxItemRef): bool - {.discardable,gcsafe,raises: [CatchableError].} = - ## Add a transaction item. It is tested and stored in either of the `pending` - ## or `staged` buckets, or disposed into the waste basket. The function - ## returns `true` if the item was added to the `staged` bucket. - - var - # stagedItemAdded = false -- notused - vetted = txInfoOk - - # Leave this frame with `return`, or proceeed with error - block txErrorFrame: - # Create tx ID and check for dups - if xp.txDB.byItemID.hasKey(item.itemID): - vetted = txInfoErrAlreadyKnown - break txErrorFrame - - # Verify transaction - if not xp.classifyValid(item): - vetted = txInfoErrBasicValidatorFailed - break txErrorFrame - - # Update initial state bucket - item.status = - if xp.classifyActive(item): txItemStaged - else: txItemPending - - # Insert into database - block: - let rc = xp.txDB.insert(item) - if rc.isOk: - return item.status == txItemStaged - vetted = rc.error - - # need to replace tx with same as the new item - if vetted == txInfoErrSenderNonceIndex: - let rc = xp.supersede(item) - if rc.isOk: - return - vetted = rc.error - - # Error processing => store in waste basket - xp.txDB.reject(item, vetted) - -# core/tx_pool.go(889): func (pool *TxPool) addTxs(txs []*types.Transaction, .. -proc addTxs*(xp: TxPoolRef; - txs: openArray[PooledTransaction]; info = ""): TxAddStats - {.discardable,gcsafe,raises: [CatchableError].} = - ## Add a list of transactions. The list is sorted after nonces and txs are - ## tested and stored into either of the `pending` or `staged` buckets, or - ## disposed o the waste basket. The function returns the tuple - ## `(staged-indicator,top-items)` as explained below. - ## - ## *stagedIndicator* - ## If `true`, this value indicates that at least one item was added to - ## the `staged` bucket (which suggest a re-run of the packer.) - ## - ## *topItems* - ## For each sender where txs were added to the bucket database or waste - ## basket, this list keeps the items with the highest nonce (handy for - ## chasing nonce gaps after a back-move of the block chain head.) - ## - var accTab: AccountNonceTab - - for tx in txs.items: - var reason: TxInfo - - if tx.tx.txType == TxEip4844: - let res = tx.validateBlobTransactionWrapper() - if res.isErr: - # move item to waste basket - reason = txInfoErrInvalidBlob - xp.txDB.reject(tx, reason, txItemPending, res.error) - continue - - # Create tx item wrapper, preferably recovered from waste basket - let rcTx = xp.recoverItem(tx, txItemPending, info) - if rcTx.isErr: - reason = rcTx.error - else: - let - item = rcTx.value - rcInsert = accTab.getItemList(item.sender).insert(item.tx.nonce) - if rcInsert.isErr: - reason = txInfoErrSenderNonceIndex - else: - rcInsert.value.data = item # link that item - continue - - # move item to waste basket - xp.txDB.reject(tx, reason, txItemPending, info) - - # Add sorted transaction items - for itemList in accTab.mvalues: - var - rc = itemList.ge(AccountNonce.low) - lastItem: TxItemRef # => nil - - while rc.isOk: - let (nonce,item) = (rc.value.key,rc.value.data) - if xp.addTx(item): - result.stagedIndicator = true - - # Make sure that there is at least one item per sender, prefereably - # a non-error item. - if item.reject == txInfoOk or lastItem.isNil: - lastItem = item - rc = itemList.gt(nonce) - - # return the last one in the series - if not lastItem.isNil: - result.topItems.add lastItem - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_bucket.nim b/nimbus/core/tx_pool/tx_tasks/tx_bucket.nim deleted file mode 100644 index 31e2260a33..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_bucket.nim +++ /dev/null @@ -1,172 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklets: Update by Bucket -## =========================================== -## - -import - std/[tables], - ../../../constants, - ../tx_desc, - ../tx_info, - ../tx_item, - ../tx_tabs, - ../tx_tabs/tx_status, - ./tx_classify, - ./tx_dispose, - chronicles, - eth/common/[transactions, keys], - stew/[sorted_set] - -{.push raises: [].} - -const - minNonce = AccountNonce.low - -logScope: - topics = "tx-pool buckets" - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc bucketItemsReassignPending*(xp: TxPoolRef; labelFrom: TxItemStatus; - account: Address; nonceFrom = minNonce) - {.gcsafe,raises: [CatchableError].} = - ## Move all items in bucket `lblFrom` with nonces not less than `nonceFrom` - ## to the `pending` bucket - let rc = xp.txDB.byStatus.eq(labelFrom).eq(account) - if rc.isOk: - for item in rc.value.data.incNonce(nonceFrom): - discard xp.txDB.reassign(item, txItemPending) - - -proc bucketItemsReassignPending*(xp: TxPoolRef; item: TxItemRef) - {.gcsafe,raises: [CatchableError].} = - ## Variant of `bucketItemsReassignPending()` - xp.bucketItemsReassignPending(item.status, item.sender, item.tx.nonce) - - -proc bucketUpdateAll*(xp: TxPoolRef): bool - {.discardable,gcsafe,raises: [CatchableError].} = - ## Update all buckets. The function returns `true` if some items were added - ## to the `staged` bucket. - - # Sort order: `Address` > `AccountNonce` > item. - var - stagedItemsAdded = false - stashed: Table[Address,seq[TxItemRef]] - - # Prepare - if 0 < xp.pDoubleCheck.len: - for item in xp.pDoubleCheck: - if item.reject == txInfoOk: - # Check whether there was a gap when the head was moved backwards. - let rc = xp.txDB.bySender.eq(item.sender).sub.gt(item.tx.nonce) - if rc.isOk: - let nextItem = rc.value.data - if item.tx.nonce + 1 < nextItem.tx.nonce: - discard xp.disposeItemAndHigherNonces( - item, txInfoErrNonceGap, txInfoErrImpliedNonceGap) - else: - # For failed txs, make sure that the account state has not - # changed. Assuming that this list is complete, then there are - # no other account affected. - let rc = xp.txDB.bySender.eq(item.sender).sub.ge(minNonce) - if rc.isOk: - let firstItem = rc.value.data - if not xp.classifyValid(firstItem): - discard xp.disposeItemAndHigherNonces( - firstItem, txInfoErrNonceGap, txInfoErrImpliedNonceGap) - - # Clean up that queue - xp.pDoubleCheckFlush - - - # PENDING - # - # Stash the items from the `pending` bucket The nonces in this list are - # greater than the ones from other lists. When processing the `staged` - # list, all that can happen is that loer nonces (than the stashed ones) - # are added. - for (sender,nonceList) in xp.txDB.incAccount(txItemPending): - # New per-sender-account sub-sequence - stashed[sender] = newSeq[TxItemRef]() - for item in nonceList.incNonce: - # Add to sub-sequence - stashed[sender].add item - - # STAGED - # - # Update/edit `staged` bucket. - for (_,nonceList) in xp.txDB.incAccount(txItemStaged): - for item in nonceList.incNonce: - - if not xp.classifyActive(item): - # Larger nonces cannot be held in the `staged` bucket anymore for this - # sender account. So they are moved back to the `pending` bucket. - xp.bucketItemsReassignPending(item) - - # The nonces in the `staged` bucket are always smaller than the one in - # the `pending` bucket. So, if the lower nonce items must go to the - # `pending` bucket, then the stashed `pending` bucket items can only - # stay there. - stashed.del(item.sender) - break # inner `incItemList()` loop - - # PACKED - # - # Update `packed` bucket. The items are a subset of all possibly staged - # (aka active) items. So they follow a similar logic as for the `staged` - # items above. - for (_,nonceList) in xp.txDB.incAccount(txItemPacked): - for item in nonceList.incNonce: - - if not xp.classifyActive(item): - xp.bucketItemsReassignPending(item) - - # For the `sender` all staged items have smaller nonces, so they have - # to go to the `pending` bucket, as well. - xp.bucketItemsReassignPending(txItemStaged, item.sender) - stagedItemsAdded = true - - stashed.del(item.sender) - break # inner `incItemList()` loop - - # PENDING re-visted - # - # Post-process `pending` and `staged` buckets. Re-insert the - # list of stashed `pending` items. - for itemList in stashed.values: - for item in itemList: - if not xp.classifyActive(item): - # Ignore higher nonces - break # inner loop for `itemList` sequence - # Move to staged bucket - discard xp.txDB.reassign(item, txItemStaged) - - stagedItemsAdded - -# --------------------------- - -proc bucketFlushPacked*(xp: TxPoolRef) - {.gcsafe,raises: [CatchableError].} = - ## Move all items from the `packed` bucket to the `pending` bucket - for (_,nonceList) in xp.txDB.decAccount(txItemPacked): - for item in nonceList.incNonce: - discard xp.txDB.reassign(item,txItemStaged) - - # Reset bucket status info - xp.clearAccounts - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_classify.nim b/nimbus/core/tx_pool/tx_tasks/tx_classify.nim deleted file mode 100644 index 578886974e..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_classify.nim +++ /dev/null @@ -1,214 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklet: Classify Transactions -## =============================================== -## - -import - ../../validate, - ../../eip4844, - ../tx_desc, - ../tx_item, - ../tx_tabs, - chronicles, - eth/common/[transactions, keys] - -import - ../../../transaction, - ../../../common/evmforks - -{.push raises: [].} - -logScope: - topics = "tx-pool classify" - -# ------------------------------------------------------------------------------ -# Private function: tx validity check helpers -# ------------------------------------------------------------------------------ - -proc checkTxBasic(xp: TxPoolRef; item: TxItemRef): bool = - let res = validateTxBasic( - item.tx, - xp.nextFork, - # A new transaction of the next fork may be - # coming before the fork activated - validateFork = false - ) - if res.isOk: - return true - item.info = res.error - return false - -proc checkTxNonce(xp: TxPoolRef; item: TxItemRef): bool - {.gcsafe,raises: [CatchableError].} = - ## Make sure that there is only one contiuous sequence of nonces (per - ## sender) starting at the account nonce. - - # get the next applicable nonce as registered on the account database - let accountNonce = xp.getNonce(item.sender) - - if item.tx.nonce < accountNonce: - debug "invalid tx: account nonce too small", - txNonce = item.tx.nonce, - accountNonce - return false - - elif accountNonce < item.tx.nonce: - # for an existing account, nonces must come in increasing consecutive order - let rc = xp.txDB.bySender.eq(item.sender) - if rc.isOk: - if rc.value.data.sub.eq(item.tx.nonce - 1).isErr: - debug "invalid tx: account nonces gap", - txNonce = item.tx.nonce, - accountNonce - return false - - true - -# ------------------------------------------------------------------------------ -# Private function: active tx classifier check helpers -# ------------------------------------------------------------------------------ - -proc txNonceActive(xp: TxPoolRef; item: TxItemRef): bool - {.gcsafe,raises: [KeyError].} = - ## Make sure that nonces appear as a contiuous sequence in `staged` bucket - ## probably preceeded in `packed` bucket. - let rc = xp.txDB.bySender.eq(item.sender) - if rc.isErr: - return true - # Must not be in the `pending` bucket. - if rc.value.data.eq(txItemPending).eq(item.tx.nonce - 1).isOk: - return false - true - - -proc txGasCovered(xp: TxPoolRef; item: TxItemRef): bool = - ## Check whether the max gas consumption is within the gas limit (aka block - ## size). - let trgLimit = xp.gasLimit - if trgLimit < item.tx.gasLimit: - debug "invalid tx: gasLimit exceeded", - maxLimit = trgLimit, - gasLimit = item.tx.gasLimit - return false - true - -proc txFeesCovered(xp: TxPoolRef; item: TxItemRef): bool = - ## Ensure that the user was willing to at least pay the base fee - ## And to at least pay the current data gasprice - if item.tx.txType >= TxEip1559: - if item.tx.maxFeePerGas < xp.baseFee: - debug "invalid tx: maxFee is smaller than baseFee", - maxFee = item.tx.maxFeePerGas, - baseFee = xp.baseFee - return false - - if item.tx.txType == TxEip4844: - let - excessBlobGas = xp.excessBlobGas - blobGasPrice = getBlobBaseFee(excessBlobGas, xp.nextFork >= FkPrague) - if item.tx.maxFeePerBlobGas < blobGasPrice: - debug "invalid tx: maxFeePerBlobGas smaller than blobGasPrice", - maxFeePerBlobGas=item.tx.maxFeePerBlobGas, - blobGasPrice=blobGasPrice - return false - true - -proc txCostInBudget(xp: TxPoolRef; item: TxItemRef): bool = - ## Check whether the worst case expense is covered by the price budget, - let - balance = xp.getBalance(item.sender) - gasCost = item.tx.gasCost - if balance < gasCost: - debug "invalid tx: not enough cash for gas", - available = balance, - require = gasCost - return false - let balanceOffGasCost = balance - gasCost - if balanceOffGasCost < item.tx.value: - debug "invalid tx: not enough cash to send", - available = balance, - availableMinusGas = balanceOffGasCost, - require = item.tx.value - return false - true - - -proc txPreLondonAcceptableGasPrice(xp: TxPoolRef; item: TxItemRef): bool = - ## For legacy transactions check whether minimum gas price and tip are - ## high enough. These checks are optional. - if item.tx.txType < TxEip1559: - if item.tx.gasPrice < 0: - return false - - # Fall back transaction selector scheme - if item.tx.effectiveGasTip(xp.baseFee) < 1.GasInt: - return false - true - -proc txPostLondonAcceptableTipAndFees(xp: TxPoolRef; item: TxItemRef): bool = - ## Helper for `classifyTxPacked()` - if item.tx.txType >= TxEip1559: - if item.tx.effectiveGasTip(xp.baseFee) < 1.GasInt: - return false - - if item.tx.maxFeePerGas < 1.GasInt: - return false - true - -# ------------------------------------------------------------------------------ -# Public functionss -# ------------------------------------------------------------------------------ - -proc classifyValid*(xp: TxPoolRef; item: TxItemRef): bool - {.gcsafe,raises: [CatchableError].} = - ## Check a (typically new) transaction whether it should be accepted at all - ## or re-jected right away. - - if not xp.checkTxNonce(item): - return false - - if not xp.checkTxBasic(item): - return false - - true - -proc classifyActive*(xp: TxPoolRef; item: TxItemRef): bool - {.gcsafe,raises: [CatchableError].} = - ## Check whether a valid transaction is ready to be held in the - ## `staged` bucket in which case the function returns `true`. - - if not xp.txNonceActive(item): - return false - - if item.tx.effectiveGasTip(xp.baseFee) <= 0.GasInt: - return false - - if not xp.txGasCovered(item): - return false - - if not xp.txFeesCovered(item): - return false - - if not xp.txCostInBudget(item): - return false - - if not xp.txPreLondonAcceptableGasPrice(item): - return false - - if not xp.txPostLondonAcceptableTipAndFees(item): - return false - - true - -# ------------------------------------------------------------------------------ -# Public functionss -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_dispose.nim b/nimbus/core/tx_pool/tx_tasks/tx_dispose.nim deleted file mode 100644 index 3c8295cfb8..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_dispose.nim +++ /dev/null @@ -1,117 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklet: Dispose expired items -## =============================================== -## - -import - std/[times], - ../tx_desc, - ../tx_info, - ../tx_item, - ../tx_tabs, - chronicles, - eth/common/[transactions, keys], - stew/keyed_queue - -{.push raises: [].} - -logScope: - topics = "tx-pool dispose expired" - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc utcNow: Time = - getTime().utc.toTime - -#proc pp(t: Time): string = -# t.format("yyyy-MM-dd'T'HH:mm:ss'.'fff", utc()) - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc deleteOtherNonces(xp: TxPoolRef; item: TxItemRef; newerThan: Time): bool - {.gcsafe,raises: [KeyError].} = - let rc = xp.txDB.bySender.eq(item.sender).sub - if rc.isOk: - for other in rc.value.data.incNonce(item.tx.nonce): - # only delete non-expired items - if newerThan < other.timeStamp: - discard xp.txDB.dispose(other, txInfoErrTxExpiredImplied) - result = true - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -# core/tx_pool.go(384): for addr := range pool.queue { -proc disposeExpiredItems*(xp: TxPoolRef) {.gcsafe,raises: [KeyError].} = - ## Any non-local transaction old enough will be removed. This will not - ## apply to items in the packed queue. - let - deadLine = utcNow() - xp.lifeTime - dspUnpacked = autoZombifyUnpacked in xp.pFlags - - var rc = xp.txDB.byItemID.first - while rc.isOk: - let (key, item) = (rc.value.key, rc.value.data) - if deadLine < item.timeStamp: - break - rc = xp.txDB.byItemID.next(key) - - if item.status != txItemPacked: - if not dspUnpacked: - continue - - # Note: it is ok to delete the current item - discard xp.txDB.dispose(item, txInfoErrTxExpired) - - # Also delete all non-expired items with higher nonces. - if xp.deleteOtherNonces(item, deadLine): - if rc.isOk: - # If one of the "other" just deleted items was the "next(key)", the - # loop would have stooped anyway at the "if deadLine < item.timeStamp:" - # clause at the while() loop header. - if not xp.txDB.byItemID.hasKey(rc.value.key): - break - - -proc disposeItemAndHigherNonces*(xp: TxPoolRef; item: TxItemRef; - reason, otherReason: TxInfo): int - {.gcsafe,raises: [CatchableError].} = - ## Move item and higher nonces per sender to wastebasket. - if xp.txDB.dispose(item, reason): - result = 1 - # For the current sender, delete all items with higher nonces - let rc = xp.txDB.bySender.eq(item.sender).sub - if rc.isOk: - let nonceList = rc.value.data - - for otherItem in nonceList.incNonce(item.tx.nonce): - if xp.txDB.dispose(otherItem, otherReason): - result.inc - - -proc disposeById*(xp: TxPoolRef; itemIDs: openArray[Hash32]; reason: TxInfo) - {.gcsafe,raises: [KeyError].}= - ## Dispose items by item ID wihtout checking whether this makes other items - ## unusable (e.g. with higher nonces for the same sender.) - for itemID in itemIDs: - let rcItem = xp.txDB.byItemID.eq(itemID) - if rcItem.isOk: - discard xp.txDB.dispose(rcItem.value, reason) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_head.nim b/nimbus/core/tx_pool/tx_tasks/tx_head.nim deleted file mode 100644 index ba80e3dba1..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_head.nim +++ /dev/null @@ -1,59 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklet: Move Head of Block Chain -## ================================================== -## - - -import - std/[tables], - ../tx_desc, - ../tx_info, - ../tx_item, - ../../chain/forked_chain - -{.push raises: [].} - -type - TxHeadDiffRef* = ref object ##\ - ## Diff data, txs changes that apply after changing the head\ - ## insertion point of the block chain - - remTxs*: Table[Hash32,bool] ##\ - ## txs to remove - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -# core/tx_pool.go(218): func (pool *TxPool) reset(oldHead, newHead ... -proc headDiff*(xp: TxPoolRef; - newHead: Header, chain: ForkedChainRef): Result[TxHeadDiffRef,TxInfo] = - let - txDiffs = TxHeadDiffRef() - - if newHead.number <= chain.baseNumber: - # return empty diff - return ok(txDiffs) - - let - newHash = newHead.blockHash - blk = chain.blockByHash(newHash).valueOr: - return err(txInfoErrForwardHeadMissing) - - for tx in blk.transactions: - txDiffs.remTxs[tx.itemID] = true - - ok(txDiffs) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/core/tx_pool/tx_tasks/tx_recover.nim b/nimbus/core/tx_pool/tx_tasks/tx_recover.nim deleted file mode 100644 index 9be20b7ee3..0000000000 --- a/nimbus/core/tx_pool/tx_tasks/tx_recover.nim +++ /dev/null @@ -1,75 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -## Transaction Pool Tasklet: Recover From Waste Basket or Create -## ============================================================= -## - -import - ../tx_desc, - ../tx_info, - ../tx_item, - ../tx_tabs, - chronicles, - eth/common/[transactions, addresses, keys], - stew/keyed_queue - -{.push raises: [].} - -logScope: - topics = "tx-pool recover item" - -let - nullSender = block: - var rc: Address - rc - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc recoverItem*(xp: TxPoolRef; tx: PooledTransaction; status = txItemPending; - info = ""; acceptExisting = false): Result[TxItemRef,TxInfo] = - ## Recover item from waste basket or create new. It is an error if the item - ## is in the buckets database, already. - ## - ## If thy argument `acceptExisting` is set `true` and the tx item is in the - ## bucket database already for any bucket, the fuction successds ok. - let itemID = tx.itemID - - # Test whether the item is in the database, already - if xp.txDB.byItemID.hasKey(itemID): - if acceptExisting: - return ok(xp.txDB.byItemID.eq(itemID).value) - else: - return err(txInfoErrAlreadyKnown) - - # Check whether the tx can be re-cycled from waste basket - block: - let rc = xp.txDB.byRejects.delete(itemID) - if rc.isOk: - let item = rc.value.data - # must not be a waste tx without meta-data - if item.sender != nullSender: - let itemInfo = if info != "": info else: item.info - item.init(status, itemInfo) - return ok(item) - - # New item generated from scratch, e.g. with `nullSender` - block: - let rc = TxItemRef.new(tx, itemID, status, info) - if rc.isOk: - return ok(rc.value) - - err(txInfoErrInvalidSender) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/graphql/ethapi.nim b/nimbus/graphql/ethapi.nim index 0c9e08e33c..9a337a327e 100644 --- a/nimbus/graphql/ethapi.nim +++ b/nimbus/graphql/ethapi.nim @@ -1342,14 +1342,9 @@ proc sendRawTransaction(ud: RootRef, params: Args, parent: Node): RespResult {.a let tx = decodePooledTx(data) # we want to know if it is a valid tx blob let txHash = rlpHash(tx) - ctx.txPool.add(tx) - - let res = ctx.txPool.inPoolAndReason(txHash) - if res.isOk: - return resp(txHash) - else: - return err(res.error) - + ctx.txPool.addTx(tx).isOkOr: + return err($error) + return resp(txHash) except CatchableError as em: return err("failed to process raw transaction: " & em.msg) diff --git a/nimbus/nimbus_execution_client.nim b/nimbus/nimbus_execution_client.nim index 8b1efd55c0..5a9ab94430 100644 --- a/nimbus/nimbus_execution_client.nim +++ b/nimbus/nimbus_execution_client.nim @@ -49,8 +49,6 @@ proc basicServices(nimbus: NimbusNode, # so it can know the latest account state # e.g. sender nonce, etc nimbus.txPool = TxPoolRef.new(nimbus.chainRef) - doAssert nimbus.txPool.smartHead(nimbus.chainRef.latestHeader) - nimbus.beaconEngine = BeaconEngineRef.new(nimbus.txPool) proc manageAccounts(nimbus: NimbusNode, conf: NimbusConf) = diff --git a/nimbus/rpc/server_api.nim b/nimbus/rpc/server_api.nim index 57bf115b6e..1dadce54c9 100644 --- a/nimbus/rpc/server_api.nim +++ b/nimbus/rpc/server_api.nim @@ -307,10 +307,8 @@ proc setupServerAPI*(api: ServerAPIRef, server: RpcServer, ctx: EthContext) = pooledTx = decodePooledTx(txBytes) txHash = rlpHash(pooledTx) - api.txPool.add(pooledTx) - let res = api.txPool.inPoolAndReason(txHash) - if res.isErr: - raise newException(ValueError, res.error) + api.txPool.addTx(pooledTx).isOkOr: + raise newException(ValueError, $error) txHash server.rpc("eth_call") do(args: TransactionArgs, blockTag: BlockTag) -> seq[byte]: @@ -524,7 +522,8 @@ proc setupServerAPI*(api: ServerAPIRef, server: RpcServer, ctx: EthContext) = nil pooledTx = PooledTransaction(tx: signedTx, networkPayload: networkPayload) - api.txPool.add(pooledTx) + api.txPool.addTx(pooledTx).isOkOr: + raise newException(ValueError, $error) rlpHash(signedTx) server.rpc("eth_getTransactionByHash") do(data: Hash32) -> TransactionObject: diff --git a/tests/test_engine_api.nim b/tests/test_engine_api.nim index 37ab9941f0..387f20ebb2 100644 --- a/tests/test_engine_api.nim +++ b/tests/test_engine_api.nim @@ -92,10 +92,6 @@ proc setupEnv(envFork: HardFork = MergeFork, chain = ForkedChainRef.init(com) txPool = TxPoolRef.new(chain) - # txPool must be informed of active head - # so it can know the latest account state - doAssert txPool.smartHead(chain.latestHeader) - let server = newRpcHttpServerWithParams("127.0.0.1:0").valueOr: echo "Failed to create rpc server: ", error diff --git a/tests/test_ledger.nim b/tests/test_ledger.nim index bfaac17d1b..343520aceb 100644 --- a/tests/test_ledger.nim +++ b/tests/test_ledger.nim @@ -24,6 +24,8 @@ import ../nimbus/db/ledger {.all.}, # import all private symbols unittest2 +import results + const genesisFile = "tests/customgenesis/cancun123.json" hexPrivKey = "af1a9be9f1a54421cac82943820a0fe0f601bb5f4f6d0bccc81c613f0ce6ae22" @@ -310,11 +312,10 @@ proc runLedgerTransactionTests(noisy = true) = for _ in 0..