交易池是以太坊节点用于存储和管理待处理交易的内存数据结构。交易可以分为两种类型:
-
Local Transaction (本地交易)
- 通过节点的 RPC 接口提交的交易
- 享有特权地位,不会被轻易驱逐
- 会被持久化保存到本地磁盘
-
Remote Transaction (远程交易)
- 通过 P2P 网络接收的交易
- 在资源受限时可能被驱逐
- 不会被持久化
注:本文主要讨论 legacypool,适用于 Legacy、AccessList 和 Dynamic 类型的交易。
Transaction Pool 主要由两个核心组件构成:
-
Pending Pool
- 存储当前可执行的交易
- 交易的 nonce 值连续且正确
- 账户余额足够支付交易费用
-
Queue Pool
- 存储暂时无法执行的交易
- 可能是因为 nonce 过高
- 或账户余额不足等原因
type LegacyPool struct {
// 基础配置
config Config // 交易池配置参数(如容量限制、价格限制等)
chainconfig *params.ChainConfig // 区块链配置参数(如网络ID、分叉规则等)
chain BlockChain // 区块链接口,用于访问链状态
gasTip atomic.Pointer[uint256.Int] // 最低gas小费要求,原子操作保证线程安全
txFeed event.Feed // 交易事件发送器,用于通知新交易
signer types.Signer // 交易签名器,用于验证交易签名
mu sync.RWMutex // 读写锁,保护并发访问
// 当前状态
currentHead atomic.Pointer[types.Header] // 当前区块头,原子操作保证线程安全
currentState *state.StateDB // 当前状态数据库
pendingNonces *noncer // 待处理的nonce追踪器,用于nonce计数
// 本地交易管理
locals *accountSet // 本地账户集合,这些账户的交易免于驱逐规则
journal *journal // 本地交易日志,用于持久化存储本地交易
// 交易存储和管理
reserve txpool.AddressReserver // 地址预留器,确保跨子池的地址互斥性
pending map[common.Address]*list // 当前可执行的交易映射(按地址索引)
queue map[common.Address]*list // 未来待执行的交易映射(按地址索引)
beats map[common.Address]time.Time // 每个账户的最后活动时间
all *lookup // 所有交易的查找表,支持快速查找
priced *pricedList // 按价格排序的交易列表
// 通道和事件处理
reqResetCh chan *txpoolResetRequest // 请求重置交易池的通道
reqPromoteCh chan *accountSet // 请求提升交易的通道
queueTxEventCh chan *types.Transaction // 交易队列事件通道
reorgDoneCh chan chan struct{} // 重组完成通知通道
reorgShutdownCh chan struct{} // 请求关闭重组循环的通道
wg sync.WaitGroup // 等待组,用于追踪goroutine
initDoneCh chan struct{} // 初始化完成通道(用于测试)
// 状态追踪
changesSinceReorg int // 在重组之间执行的删除操作计数器
}
验证交易:
- nonce检查
- gas价格检查
- 余额检查
- gas限制检查
func validateTx(tx *types.Transaction, local bool) error {
// 1. 大小验证
if tx.Size() > txMaxSize { // txMaxSize = 128KB
return ErrOversizedData
}
// 2. 签名验证
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// 3. Nonce检查
nonce := pool.currentState.GetNonce(from)
if tx.Nonce() < nonce {
// nonce太低,交易已过期
return ErrNonceTooLow
}
// 4. Gas价格验证
if !local { // 本地交易豁免
gasTipCap, _ := tx.EffectiveGasTip(baseFee)
if gasTipCap.Cmp(pool.gasTip.Load()) < 0 {
return ErrUnderpriced
}
}
// 5. 余额验证
balance := pool.currentState.GetBalance(from)
if balance.Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// 6. Gas限制验证
if tx.Gas() > pool.currentHead.Load().GasLimit {
return ErrGasLimit
}
return nil
}
交易池使用两级排序机制:
-
账户内排序
- 使用
TransactionsByNonce
按 nonce 值排序 - 确保交易按序执行
- 使用
-
账户间排序
- 使用
pricedList
按 gas 价格排序 - 高价格交易优先处理
- 使用
type pricedList struct {
all *lookup // 指向所有交易的指针
items *prque.Prque[*types.Transaction] // 按价格排序的优先队列
stales int64 // 作废交易的数量
}
// Put 添加一个交易到价格排序列表
func (l *pricedList) Put(tx *types.Transaction) {
// 计算交易的有效gas价格
gasFeeCap := tx.GasFeeCap()
l.items.Push(tx, -gasFeeCap.Int64()) // 负值使得高价格排在前面
}
// list 实现按nonce排序的交易列表
type list struct {
txs *types.TransactionsByNonce // 按nonce排序的交易
nonces map[uint64]*types.Transaction // nonce到交易的映射
}
// Add 添加一个交易到nonce排序列表
func (l *list) Add(tx *types.Transaction) {
nonce := tx.Nonce()
if l.nonces[nonce] == nil {
l.txs.Insert(tx)
l.nonces[nonce] = tx
}
}
从pending池中选择交易打包时:
- 首先按账户分组
- 每个账户内部按nonce排序
- 不同账户间按gas价格排序
// 当收到相同nonce的新交易时:
if newTx.GasPrice().Cmp(oldTx.GasPrice()) > pool.config.PriceBump {
// 如果新交易的gas价格比旧交易高出足够多(默认10%)
// 则替换旧交易
}
当池满时移除交易:
- 优先移除gas价格低的交易
- 保护本地交易不被移除
- 确保每个账户的连续性(nonce不能断开)
交易池设置了一些的参数来限制单个交易的 Size ,以及整个 Transaction Pool 中所保存的交易的总数量。当交易池的中维护的交易超过某个阈值的时候,交易池会丢弃/驱逐(Discard/Evict)一部分的交易。这里注意,被清除的交易通常都是 Remote Transaction,而 Local Transaction 通常都会被保留下来。
负责判断哪些交易会被丢弃的函数是 txPricedList.Discard()
。
Transaction Pool 引入了一个名为 txSlotSize
的 Metrics 作为计量一个交易在交易池中所占的空间。目前,txSlotSize
的值是 32 * 1024
。每个交易至少占据一个 txSlot
,最大能占用四个 txSlotSize
,txMaxSize = 4 * txSlotSize = 128 KB
。换句话说,如果一个交易的物理数据大小不足 32KB,那么它会占据一个 txSlot
。同时,一个合法的交易最大是 128KB 的大小
按照默认的设置,交易池的最多保存 4096+1024
个交易,其中 Pending 区保存 4096 个 txSlot
规模的交易,Queue 区最多保存 1024 个 txSlot
规模的交易。
// add 验证并添加交易到池中
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// 获取交易发送者
from, err := types.Sender(pool.signer, tx)
if err != nil {
return false, err
}
// 加锁保护并发访问
pool.mu.Lock()
defer pool.mu.Unlock()
// 1. 基础验证
if err := pool.validateTx(tx, local); err != nil {
return false, err
}
// 2. 检查是否替换现有交易
if old := pool.all.Get(tx.Hash()); old != nil {
// 如果新交易gas价格不够高,拒绝替换
if !pool.shouldReplace(old, tx) {
return false, ErrReplaceUnderpriced
}
// 标记为替换操作
replaced = true
}
// 3. 将交易添加到合适的队列
if pool.pending[from] == nil {
pool.pending[from] = newTxList(true)
}
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
// 4. 根据nonce决定放入pending还是queue
if tx.Nonce() > pool.currentState.GetNonce(from) {
pool.queue[from].Add(tx)
} else {
pool.pending[from].Add(tx)
}
// 5. 更新价格排序
pool.priced.Put(tx)
return replaced, nil
}
// reset 处理链状态更新
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// 1. 初始化新状态
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// 发生链重组,需要重新注入交易
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
// 收集需要重新注入的交易
for i := oldNum + 1; i <= newNum; i++ {
block := pool.chain.GetBlock(newHead.Hash(), i)
for _, tx := range block.Transactions() {
reinject = append(reinject, tx)
}
}
}
// 2. 更新当前状态
pool.currentState, _ = pool.chain.StateAt(newHead.Root)
pool.pendingNonces = newNoncer(pool.currentState)
// 3. 重新验证所有pending交易
pool.demoteUnexecutables()
// 4. 重新注入交易
if len(reinject) > 0 {
pool.addTxsLocked(reinject, false)
}
}
// promoteTx 尝试将交易从queue提升到pending
func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// 1. 检查nonce
if pool.currentState.GetNonce(addr) != tx.Nonce() {
return false
}
// 2. 检查余额
if pool.currentState.GetBalance(addr).Cmp(tx.Cost()) < 0 {
return false
}
// 3. 从queue移除
pool.queue[addr].Remove(hash)
// 4. 添加到pending
pool.pending[addr].Add(tx)
// 5. 更新状态
pool.beats[addr] = time.Now()
return true
}
-
单个交易限制
- 最小占用: 1个 txSlot (32KB)
- 最大占用: 4个 txSlot (128KB)
-
交易池容量
- Pending池: 4096个 txSlot
- Queue池: 1024个 txSlot
当池满时,按以下策略清理交易:
- 优先清理远程交易,保护本地交易
- 按gas价格排序,优先清理低价交易
- 保持每个账户的nonce连续性
- 清理长期未被打包的过期交易
交易池运行以下定期维护任务:
func loop() {
// 1. 状态报告 (每分钟)
case <-report.C:
logPoolStats()
// 2. 交易清理 (每小时)
case <-evict.C:
removeExpiredTxs()
// 3. 本地交易持久化 (每小时)
case <-journal.C:
persistLocalTxs()
}
当区块链状态发生变化时:
- 检测链重组,重新注入相关交易
- 更新状态数据库引用
- 重新验证所有pending交易
- 尝试将queue中的交易提升到pending
这些机制共同确保了交易池的正确性和效率。