Skip to content

Commit

Permalink
pr feedback: refactor into separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Feb 25, 2024
1 parent e01ca71 commit 6b496a5
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 140 deletions.
4 changes: 2 additions & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ impl Blockstore {
}

/// Range-delete all entries which prefix matches the specified `slot`,
/// remove `slot` its' parents SlotMeta next_slots list, and
/// remove `slot`'s parent's SlotMeta next_slots list, and
/// clear `slot`'s SlotMeta (except for next_slots).
///
/// This function currently requires `insert_shreds_lock`, as both
Expand All @@ -1171,7 +1171,7 @@ impl Blockstore {
let _lock = self.insert_shreds_lock.lock().unwrap();
// Clear all slot related information, and cleanup slot meta by removing
// `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`.
match self.run_purge(slot, slot, PurgeType::ExactAndCleanupChaining) {
match self.purge_slot_cleanup_chaining(slot) {
Ok(_) => {}
Err(BlockstoreError::SlotUnavailable) => error!(
"clear_unconfirmed_slot() called on slot {} with no SlotMeta",
Expand Down
262 changes: 126 additions & 136 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct PurgeStats {
delete_files_in_range: u64,
}

#[derive(Clone, Copy, PartialEq, Eq)]
#[derive(Clone, Copy)]
/// Controls how `blockstore::purge_slots` purges the data.
pub enum PurgeType {
/// A slower but more accurate way to purge slots by also ensuring higher
Expand All @@ -21,10 +21,6 @@ pub enum PurgeType {
/// The fastest purge mode that relies on the slot-id based TTL
/// compaction filter to do the cleanup.
CompactionFilter,
/// In this case along with an `Exact` purge we orphan the specified slot by
/// removing it from `parent_slot_meta.next_slots` and reinsert an empty `slot_meta`
/// for the orphan that only retains the `next_slots` value.
ExactAndCleanupChaining,
}

impl Blockstore {
Expand Down Expand Up @@ -139,6 +135,7 @@ impl Blockstore {
}
}

#[cfg(test)]
pub(crate) fn run_purge(
&self,
from_slot: Slot,
Expand All @@ -148,107 +145,79 @@ impl Blockstore {
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
}

/// Purges all columns relating to `slot`.
///
/// Additionally we cleanup the parent of `slot`, by clearing `slot` from
/// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot`
/// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is
/// replayable upon repair of `slot`.
pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result<bool> {
let Some(mut slot_meta) = self.meta(slot)? else {
return Err(BlockstoreError::SlotUnavailable);
};
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");

let columns_purged = self.purge_range(&mut write_batch, slot, slot);
self.purge_special_columns_exact(&mut write_batch, slot, slot)?;

if let Some(parent_slot) = slot_meta.parent_slot {
let parent_slot_meta = self.meta(parent_slot)?;
if let Some(mut parent_slot_meta) = parent_slot_meta {
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
write_batch.put::<cf::SlotMeta>(parent_slot, &parent_slot_meta)?;
} else {
error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this
likely means a cluster restart was performed and your node contains invalid shreds generated
with the wrong shred version, whose ancestors have been cleaned up.
Falling back to duplicate block handling to remedy the situation", parent_slot, slot);
}
}

// Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field.
slot_meta.clear_unconfirmed_slot();
write_batch.put::<cf::SlotMeta>(slot, &slot_meta)?;

if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, slot
);
return Err(e);
}
Ok(columns_purged)
}

/// A helper function to `purge_slots` that executes the ledger clean up.
/// The cleanup applies to \[`from_slot`, `to_slot`\].
///
/// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted.
///
/// If the `purge_type` is `PurgeType::ExactAndCleanupChaining` we require
/// that `from_slot == to_slot` - we are purging only one slot.
/// Note: slots > `to_slot` that chained to a purged slot are not properly
/// cleaned up. This function is not intended to be used if such slots need
/// to be replayed.
pub(crate) fn run_purge_with_stats(
&self,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
purge_stats: &mut PurgeStats,
) -> Result<bool> {
let slot_meta = if purge_type == PurgeType::ExactAndCleanupChaining {
if from_slot != to_slot {
error!(
"Chaining cleanup was requested but a range was specified {} {}",
from_slot, to_slot
);
return Err(BlockstoreError::InvalidRangeForSlotMetaCleanup);
}
let Some(slot_meta) = self.meta(from_slot)? else {
return Err(BlockstoreError::SlotUnavailable);
};
Some(slot_meta)
} else {
None
};
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");
let mut delete_range_timer = Measure::start("delete_range");
let columns_purged = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BlockHeight>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot)
.is_ok();
let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot);
match purge_type {
PurgeType::Exact | PurgeType::ExactAndCleanupChaining => {
PurgeType::Exact => {
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?;
}
PurgeType::CompactionFilter => {
Expand All @@ -261,31 +230,6 @@ impl Blockstore {
}
delete_range_timer.stop();

if purge_type == PurgeType::ExactAndCleanupChaining {
let mut slot_meta = slot_meta.expect("Slot meta should be present by this point");
let slot = from_slot; // also equal to to_slot
if let Some(parent_slot) = slot_meta.parent_slot {
let parent_slot_meta = self.meta(parent_slot)?;
if let Some(mut parent_slot_meta) = parent_slot_meta {
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
write_batch.put::<cf::SlotMeta>(parent_slot, &parent_slot_meta)?;
} else {
error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this
likely means a cluster restart was performed and your node contains invalid shreds generated
with the wrong shred version, whose ancestors have been cleaned up.
Falling back to duplicate block handling to remedy the situation", parent_slot, slot);
}
}

// Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field.
slot_meta.clear_unconfirmed_slot();
write_batch.put::<cf::SlotMeta>(slot, &slot_meta)?;
}

let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
Expand Down Expand Up @@ -320,6 +264,72 @@ impl Blockstore {
Ok(columns_purged)
}

fn purge_range(&self, write_batch: &mut WriteBatch, from_slot: Slot, to_slot: Slot) -> bool {
self.db
.delete_range_cf::<cf::SlotMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredData>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredCode>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DeadSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ErasureMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Orphans>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Index>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Rewards>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Blocktime>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BlockHeight>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(write_batch, from_slot, to_slot)
.is_ok()
}

fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.db
.delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot)
Expand Down Expand Up @@ -1152,39 +1162,21 @@ pub mod tests {
}

#[test]
fn test_purge_exact_and_cleanup_chaining_range() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();

assert!(matches!(
blockstore
.run_purge(4, 6, PurgeType::ExactAndCleanupChaining)
.unwrap_err(),
BlockstoreError::InvalidRangeForSlotMetaCleanup
));
}

#[test]
fn test_purge_exact_and_cleanup_chaining_missing_slot_meta() {
fn test_purge_slot_cleanup_chaining_missing_slot_meta() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();

assert!(matches!(
blockstore
.run_purge(11, 11, PurgeType::ExactAndCleanupChaining)
.unwrap_err(),
blockstore.purge_slot_cleanup_chaining(11).unwrap_err(),
BlockstoreError::SlotUnavailable
));
}

#[test]
fn test_purge_exact_and_cleanup_chaining() {
fn test_purge_slot_cleanup_chaining() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

Expand All @@ -1195,9 +1187,7 @@ pub mod tests {
let (slot_12, _) = make_slot_entries(12, 5, 5, true);
blockstore.insert_shreds(slot_12, None, false).unwrap();

blockstore
.run_purge(5, 5, PurgeType::ExactAndCleanupChaining)
.unwrap();
blockstore.purge_slot_cleanup_chaining(5).unwrap();

let slot_meta = blockstore.meta(5).unwrap().unwrap();
let expected_slot_meta = SlotMeta {
Expand Down
2 changes: 0 additions & 2 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ pub enum BlockstoreError {
MissingTransactionMetadata,
#[error("transaction-index overflow")]
TransactionIndexOverflow,
#[error("invalid purge range for slot meta cleanup")]
InvalidRangeForSlotMetaCleanup,
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

Expand Down

0 comments on commit 6b496a5

Please sign in to comment.