Skip to content

Commit

Permalink
fix(pool): fix pool max ops per sender during replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Jan 7, 2025
1 parent 83f0a63 commit f4d3a5e
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/compliance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- run: curl -sSL https://raw.githubusercontent.com/pdm-project/pdm/main/install-pdm.py | python3 -
- run: pip install jq yq
- run: sudo apt install -y libxml2-utils
- uses: actions/setup-node@v4
with:
node-version: 18

- name: Checkout Rundler
uses: actions/checkout@v4
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/compliance_v06.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- run: curl -sSL https://raw.githubusercontent.com/pdm-project/pdm/main/install-pdm.py | python3 -
- run: pip install jq yq
- run: sudo apt install -y libxml2-utils
- uses: actions/setup-node@v4
with:
node-version: 18

- name: Checkout Rundler
uses: actions/checkout@v4
Expand Down
31 changes: 31 additions & 0 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ where
})
}

pub(crate) fn all_operations(&self) -> impl Iterator<Item = Arc<PoolOperation>> + '_ {
self.by_hash.values().map(|o| o.po.clone())
}

/// Does maintenance on the pool.
///
/// 1) Removes all operations using the given entity, returning the hashes of the removed operations.
Expand Down Expand Up @@ -827,6 +831,33 @@ mod tests {
check_map_entry(pool.best.iter().nth(2), Some(&ops[0]));
}

#[test]
fn add_operations() {
let mut pool = pool();
let addr_a = Address::random();
let addr_b = Address::random();
let ops = vec![
create_op(addr_a, 0, 1),
create_op(addr_a, 1, 2),
create_op(addr_b, 0, 3),
];

let mut hashes = HashSet::new();
for op in ops.iter() {
hashes.insert(pool.add_operation(op.clone(), 0).unwrap());
}

let all = pool
.all_operations()
.map(|op| {
op.uo
.hash(pool.config.entry_point, pool.config.chain_spec.id)
})
.collect::<HashSet<_>>();

assert_eq!(all, hashes);
}

#[test]
fn best_ties() {
let mut pool = pool();
Expand Down
21 changes: 18 additions & 3 deletions crates/pool/src/mempool/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl AddressReputation {
self.state.write().add_seen(address);
}

pub(crate) fn dec_seen(&self, address: Address) {
self.state.write().dec_seen(address);
}

pub(crate) fn handle_urep_030_penalty(&self, address: Address) {
self.state.write().handle_urep_030_penalty(address);
}
Expand All @@ -136,8 +140,8 @@ impl AddressReputation {
self.state.write().add_included(address);
}

pub(crate) fn remove_included(&self, address: Address) {
self.state.write().remove_included(address);
pub(crate) fn dec_included(&self, address: Address) {
self.state.write().dec_included(address);
}

pub(crate) fn set_reputation(&self, address: Address, ops_seen: u64, ops_included: u64) {
Expand Down Expand Up @@ -218,6 +222,11 @@ impl AddressReputationInner {
count.ops_seen += 1;
}

fn dec_seen(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
count.ops_seen = count.ops_seen.saturating_sub(1);
}

fn handle_urep_030_penalty(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
count.ops_seen += self.params.bundle_invalidation_ops_seen_unstaked_penalty;
Expand Down Expand Up @@ -245,7 +254,7 @@ impl AddressReputationInner {
count.ops_included += 1;
}

fn remove_included(&mut self, address: Address) {
fn dec_included(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
count.ops_included = count.ops_included.saturating_sub(1)
}
Expand Down Expand Up @@ -315,6 +324,12 @@ mod tests {
let counts = reputation.counts.get(&addr).unwrap();
assert_eq!(counts.ops_seen, 1000);
assert_eq!(counts.ops_included, 1000);

reputation.dec_seen(addr);
reputation.dec_included(addr);
let counts = reputation.counts.get(&addr).unwrap();
assert_eq!(counts.ops_seen, 999);
assert_eq!(counts.ops_included, 999);
}

#[test]
Expand Down
124 changes: 102 additions & 22 deletions crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where

if let Some(po) = pool_op {
for entity_addr in po.entities().map(|e| e.address).unique() {
self.reputation.remove_included(entity_addr);
self.reputation.dec_included(entity_addr);
}

unmined_op_count += 1;
Expand Down Expand Up @@ -468,8 +468,19 @@ where
origin: OperationOrigin,
op: UserOperationVariant,
) -> MempoolResult<B256> {
// TODO(danc) aggregator reputation is not implemented
// TODO(danc) catch ops with aggregators prior to simulation and reject
// Initial state checks
let to_replace = {
let state = self.state.read();

// Check if op violates the STO-040 spec rule
state.pool.check_multiple_roles_violation(&op)?;

// Check if op is already known or replacing another, and if so, ensure its fees are high enough
state
.pool
.check_replacement(&op)?
.and_then(|r| self.state.read().pool.get_operation_by_hash(r))
};

// Check reputation of entities in involved in the operation
// If throttled, entity can have THROTTLED_ENTITY_MEMPOOL_COUNT inflight operation at a time, else reject
Expand Down Expand Up @@ -518,12 +529,6 @@ where
.await
.map_err(anyhow::Error::from)?;

// Check if op is already known or replacing another, and if so, ensure its fees are high enough
// do this before simulation to save resources
let replacement = self.state.read().pool.check_replacement(&op)?;
// Check if op violates the STO-040 spec rule
self.state.read().pool.check_multiple_roles_violation(&op)?;

// check if paymaster is present and exists in pool
// this is optimistic and could potentially lead to
// multiple user operations call this before they are
Expand Down Expand Up @@ -587,6 +592,7 @@ where
{
let state = self.state.read();
if !pool_op.account_is_staked
&& to_replace.is_none()
&& state.pool.address_count(&pool_op.uo.sender())
>= self.config.same_sender_mempool_count
{
Expand All @@ -599,9 +605,16 @@ where
// Check unstaked non-sender entity counts in the mempool
for entity in pool_op
.unstaked_entities()
.unique()
.filter(|e| e.address != pool_op.entity_infos.sender.address())
{
let ops_allowed = self.reputation.get_ops_allowed(entity.address);
let mut ops_allowed = self.reputation.get_ops_allowed(entity.address);
if let Some(to_replace) = &to_replace {
if to_replace.entities().contains(&entity) {
ops_allowed += 1;
}
}

if state.pool.address_count(&entity.address) >= ops_allowed as usize {
return Err(MempoolError::MaxOperationsReached(
ops_allowed as usize,
Expand All @@ -628,17 +641,20 @@ where
// once the operation has been added to the pool
self.paymaster.add_or_update_balance(&pool_op).await?;

// Update reputation
if replacement.is_none() {
pool_op.entities().unique().for_each(|e| {
self.reputation.add_seen(e.address);
if self.reputation.status(e.address) == ReputationStatus::Throttled {
self.throttle_entity(e);
} else if self.reputation.status(e.address) == ReputationStatus::Banned {
self.remove_entity(e);
}
// Update reputation, handling replacement if needed
if let Some(to_replace) = to_replace {
to_replace.entities().unique().for_each(|e| {
self.reputation.dec_seen(e.address);
});
}
pool_op.entities().unique().for_each(|e| {
self.reputation.add_seen(e.address);
if self.reputation.status(e.address) == ReputationStatus::Throttled {
self.throttle_entity(e);
} else if self.reputation.status(e.address) == ReputationStatus::Banned {
self.remove_entity(e);
}
});

// Emit event
let op_hash = pool_op
Expand Down Expand Up @@ -734,6 +750,8 @@ where

if self.reputation.status(entity.address) == ReputationStatus::Banned {
self.remove_entity(entity);
} else if self.reputation.status(entity.address) == ReputationStatus::Throttled {
self.throttle_entity(entity);
}
}

Expand Down Expand Up @@ -775,7 +793,7 @@ where
}

fn all_operations(&self, max: usize) -> Vec<Arc<PoolOperation>> {
self.state.read().pool.best_operations().take(max).collect()
self.state.read().pool.all_operations().take(max).collect()
}

fn get_user_operation_by_hash(&self, hash: B256) -> Option<Arc<PoolOperation>> {
Expand Down Expand Up @@ -991,6 +1009,25 @@ mod tests {
assert_eq!(pool.best_operations(3, 0).unwrap(), vec![]);
}

#[tokio::test]
async fn all_operations() {
let ops = vec![
create_op(Address::random(), 0, 3, None),
create_op(Address::random(), 0, 2, None),
create_op(Address::random(), 0, 1, None),
];
let uos = ops.iter().map(|op| op.op.clone()).collect::<Vec<_>>();
let pool = create_pool(ops);

for op in &uos {
let _ = pool
.add_operation(OperationOrigin::Local, op.clone())
.await
.unwrap();
}
check_ops_unordered(&pool.all_operations(16), &uos, pool.config.entry_point);
}

#[tokio::test]
async fn chain_update_mine() {
let paymaster = Address::random();
Expand Down Expand Up @@ -1276,7 +1313,7 @@ mod tests {
}

check_ops(
pool.all_operations(4),
pool.best_operations(4, 0).unwrap(),
vec![
uos[0].clone(),
uos[1].clone(),
Expand Down Expand Up @@ -1326,7 +1363,7 @@ mod tests {
.await
.unwrap();
check_ops(
pool.all_operations(4),
pool.best_operations(4, 0).unwrap(),
vec![
uos[1].clone(),
uos[2].clone(),
Expand Down Expand Up @@ -1668,6 +1705,33 @@ mod tests {
.is_err());
}

#[tokio::test]
async fn test_replacement_max_ops_for_unstaked_sender() {
let mut ops = vec![];
let addr = Address::random();
for i in 0..4 {
ops.push(create_op(addr, i, 1, None))
}
// replacement op for first op
ops.push(create_op(addr, 0, 2, None));

let pool = create_pool(ops.clone());

for op in ops.iter().take(4) {
pool.add_operation(OperationOrigin::Local, op.op.clone())
.await
.unwrap();
}

pool.add_operation(OperationOrigin::Local, ops[4].op.clone())
.await
.unwrap();

let uos = ops.into_iter().skip(1).map(|op| op.op).collect::<Vec<_>>();

check_ops_unordered(&pool.all_operations(16), &uos, pool.config.entry_point);
}

#[tokio::test]
async fn test_best_staked() {
let address = Address::random();
Expand Down Expand Up @@ -2031,4 +2095,20 @@ mod tests {
assert_eq!(actual.uo, expected);
}
}

fn check_ops_unordered(
actual: &[Arc<PoolOperation>],
expected: &[UserOperationVariant],
entry_point: Address,
) {
let actual_hashes = actual
.iter()
.map(|op| op.uo.hash(entry_point, 0))
.collect::<HashSet<_>>();
let expected_hashes = expected
.iter()
.map(|op| op.hash(entry_point, 0))
.collect::<HashSet<_>>();
assert_eq!(actual_hashes, expected_hashes);
}
}
2 changes: 1 addition & 1 deletion test/spec-tests/v0_6/bundler-spec-tests
2 changes: 1 addition & 1 deletion test/spec-tests/v0_7/bundler-spec-tests

0 comments on commit f4d3a5e

Please sign in to comment.