diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 5f2a88061d162c..d1a98b9149bc44 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -21,7 +21,7 @@ use { saturating_add_assign, }, std::{ - collections::HashMap, + collections::{hash_map::Entry, HashMap}, fmt::{Debug, Formatter}, sync::{ atomic::{AtomicU64, Ordering}, @@ -227,7 +227,7 @@ pub struct ProgramCacheStats { pub prunes_orphan: AtomicU64, /// a program got pruned because it was not recompiled for the next epoch pub prunes_environment: AtomicU64, - /// the [SecondLevel] was empty because all slot versions got pruned + /// a program had no entries because all slot versions got pruned pub empty_entries: AtomicU64, } @@ -578,18 +578,6 @@ impl LoadingTaskWaiter { } } -/// Contains all the program versions at a specific address. -#[derive(Debug, Default)] -struct SecondLevel { - /// List of all versions (across all forks) of a program sorted by the slot in which they were modified - slot_versions: Vec>, - /// `Some` if there is currently a cooperative loading task for this program address - /// - /// It is possible that multiple TX batches from different slots need different versions of a program. - /// However, that can only be figured out once a program is loaded and its deployment slot is known. - cooperative_loading_lock: Option<(Slot, std::thread::ThreadId)>, -} - /// This structure is the global cache of loaded, verified and compiled programs. /// /// It ... @@ -608,8 +596,18 @@ struct SecondLevel { pub struct ProgramCache { /// A two level index: /// - /// The first level is for the address at which programs are deployed and the second level for the slot (and thus also fork). - entries: HashMap, + /// - the first level is for the address at which programs are deployed + /// - the second level for the slot (and thus also fork), sorted by slot number. + entries: HashMap>>, + /// The entries that are getting loaded and have not yet finished loading. + /// + /// The key is the program address, the value is a tuple of the slot in which the program is + /// being loaded and the thread ID doing the load. + /// + /// It is possible that multiple TX batches from different slots need different versions of a + /// program. The deployment slot of a program is only known after load tho, + /// so all loads for a given program key are serialized. + loading_entries: Mutex>, /// The slot of the last rerooting pub latest_root_slot: Slot, /// The epoch of the last rerooting @@ -776,6 +774,7 @@ impl ProgramCache { pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self { Self { entries: HashMap::new(), + loading_entries: Mutex::new(HashMap::new()), latest_root_slot: root_slot, latest_root_epoch: root_epoch, environments: ProgramRuntimeEnvironments::default(), @@ -819,7 +818,7 @@ impl ProgramCache { &entry.program, ProgramCacheEntryType::DelayVisibility )); - let slot_versions = &mut self.entries.entry(key).or_default().slot_versions; + let slot_versions = &mut self.entries.entry(key).or_default(); match slot_versions.binary_search_by(|at| { at.effective_slot .cmp(&entry.effective_slot) @@ -860,9 +859,7 @@ impl ProgramCache { pub fn prune_by_deployment_slot(&mut self, slot: Slot) { for second_level in self.entries.values_mut() { - second_level - .slot_versions - .retain(|entry| entry.deployment_slot != slot); + second_level.retain(|entry| entry.deployment_slot != slot); } self.remove_programs_with_no_entries(); } @@ -890,8 +887,7 @@ impl ProgramCache { // Remove entries un/re/deployed on orphan forks let mut first_ancestor_found = false; let mut first_ancestor_env = None; - second_level.slot_versions = second_level - .slot_versions + *second_level = second_level .iter() .rev() .filter(|entry| { @@ -940,7 +936,7 @@ impl ProgramCache { }) .cloned() .collect(); - second_level.slot_versions.reverse(); + second_level.reverse(); } self.remove_programs_with_no_entries(); debug_assert!(self.latest_root_slot <= new_root_slot); @@ -974,7 +970,7 @@ impl ProgramCache { /// Extracts a subset of the programs relevant to a transaction batch /// and returns which program accounts the accounts DB needs to load. pub fn extract( - &mut self, + &self, search_for: &mut Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))>, loaded_programs_for_tx_batch: &mut ProgramCacheForTxBatch, is_first_round: bool, @@ -983,8 +979,8 @@ impl ProgramCache { let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap(); let mut cooperative_loading_task = None; search_for.retain(|(key, (match_criteria, usage_count))| { - if let Some(second_level) = self.entries.get_mut(key) { - for entry in second_level.slot_versions.iter().rev() { + if let Some(second_level) = self.entries.get(key) { + for entry in second_level.iter().rev() { if entry.deployment_slot <= self.latest_root_slot || matches!( locked_fork_graph.relationship( @@ -1033,15 +1029,14 @@ impl ProgramCache { } } if cooperative_loading_task.is_none() { - // We have not selected a task so far - let second_level = self.entries.entry(*key).or_default(); - if second_level.cooperative_loading_lock.is_none() { - // Select this missing entry which is not selected by any other TX batch yet - cooperative_loading_task = Some((*key, *usage_count)); - second_level.cooperative_loading_lock = Some(( + let mut loading_entries = self.loading_entries.lock().unwrap(); + let entry = loading_entries.entry(*key); + if let Entry::Vacant(entry) = entry { + entry.insert(( loaded_programs_for_tx_batch.slot, std::thread::current().id(), )); + cooperative_loading_task = Some((*key, *usage_count)); } } true @@ -1066,12 +1061,8 @@ impl ProgramCache { key: Pubkey, loaded_program: Arc, ) -> bool { - let second_level = self.entries.entry(key).or_default(); - debug_assert_eq!( - second_level.cooperative_loading_lock, - Some((slot, std::thread::current().id())) - ); - second_level.cooperative_loading_lock = None; + let loading_thread = self.loading_entries.lock().unwrap().remove(&key); + debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); // Check that it will be visible to our own fork once inserted if loaded_program.deployment_slot > self.latest_root_slot && !matches!( @@ -1107,7 +1098,6 @@ impl ProgramCache { .iter() .flat_map(|(id, second_level)| { second_level - .slot_versions .iter() .filter_map(move |program| match program.program { ProgramCacheEntryType::Loaded(_) => { @@ -1132,19 +1122,16 @@ impl ProgramCache { self.entries .iter() .flat_map(|(id, second_level)| { - second_level - .slot_versions - .iter() - .map(|program| (*id, program.clone())) + second_level.iter().map(|program| (*id, program.clone())) }) .collect() } - /// Returns the `slot_versions` of the second level for the given program id. + /// Returns the slot versions for the given program id. pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc] { self.entries .get(key) - .map(|second_level| second_level.slot_versions.as_ref()) + .map(|second_level| second_level.as_ref()) .unwrap_or(&[]) } @@ -1205,7 +1192,6 @@ impl ProgramCache { fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc) { let second_level = self.entries.get_mut(program).expect("Cache lookup failed"); let candidate = second_level - .slot_versions .iter_mut() .find(|entry| entry == &remove_entry) .expect("Program entry not found"); @@ -1237,10 +1223,8 @@ impl ProgramCache { fn remove_programs_with_no_entries(&mut self) { let num_programs_before_removal = self.entries.len(); - self.entries.retain(|_, second_level| { - !second_level.slot_versions.is_empty() - || second_level.cooperative_loading_lock.is_some() - }); + self.entries + .retain(|_key, second_level| !second_level.is_empty()); if self.entries.len() < num_programs_before_removal { self.stats.empty_entries.fetch_add( num_programs_before_removal.saturating_sub(self.entries.len()) as u64, @@ -2072,7 +2056,7 @@ mod tests { keys.iter() .filter_map(|key| { let visible_entry = cache.entries.get(key).and_then(|second_level| { - second_level.slot_versions.iter().rev().find(|entry| { + second_level.iter().rev().find(|entry| { matches!( locked_fork_graph.relationship(entry.deployment_slot, loading_slot), BlockRelation::Equal | BlockRelation::Ancestor, diff --git a/svm/src/transaction_processor.rs b/svm/src/transaction_processor.rs index 2b75da5d97c250..d1f72e29e361ab 100644 --- a/svm/src/transaction_processor.rs +++ b/svm/src/transaction_processor.rs @@ -387,11 +387,10 @@ impl TransactionBatchProcessor { .collect(); let mut loaded_programs_for_txs = None; - let mut program_to_store = None; loop { - let (program_to_load, task_cookie, task_waiter) = { + let (program_to_store, task_cookie, task_waiter) = { // Lock the global cache. - let mut program_cache = self.program_cache.write().unwrap(); + let program_cache = self.program_cache.read().unwrap(); // Initialize our local cache. let is_first_round = loaded_programs_for_txs.is_none(); if is_first_round { @@ -401,49 +400,51 @@ impl TransactionBatchProcessor { &program_cache, )); } - // Submit our last completed loading task. - if let Some((key, program)) = program_to_store.take() { - loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true; - if program_cache.finish_cooperative_loading_task(self.slot, key, program) - && limit_to_load_programs - { - // This branch is taken when there is an error in assigning a program to a - // cache slot. It is not possible to mock this error for SVM unit - // tests purposes. - let mut ret = ProgramCacheForTxBatch::new_from_cache( - self.slot, - self.epoch, - &program_cache, - ); - ret.hit_max_limit = true; - return ret; - } - } // Figure out which program needs to be loaded next. let program_to_load = program_cache.extract( &mut missing_programs, loaded_programs_for_txs.as_mut().unwrap(), is_first_round, ); + + let program_to_store = program_to_load.map(|(key, count)| { + // Load, verify and compile one program. + let program = load_program_with_pubkey( + callback, + &program_cache, + &key, + self.slot, + self.epoch, + &self.epoch_schedule, + false, + ) + .expect("called load_program_with_pubkey() with nonexistent account"); + program.tx_usage_counter.store(count, Ordering::Relaxed); + (key, program) + }); + let task_waiter = Arc::clone(&program_cache.loading_task_waiter); - (program_to_load, task_waiter.cookie(), task_waiter) + (program_to_store, task_waiter.cookie(), task_waiter) // Unlock the global cache again. }; - if let Some((key, count)) = program_to_load { - // Load, verify and compile one program. - let program = load_program_with_pubkey( - callback, - &self.program_cache.read().unwrap(), - &key, - self.slot, - self.epoch, - &self.epoch_schedule, - false, - ) - .expect("called load_program_with_pubkey() with nonexistent account"); - program.tx_usage_counter.store(count, Ordering::Relaxed); - program_to_store = Some((key, program)); + if let Some((key, program)) = program_to_store { + let mut program_cache = self.program_cache.write().unwrap(); + // Submit our last completed loading task. + if program_cache.finish_cooperative_loading_task(self.slot, key, program) + && limit_to_load_programs + { + // This branch is taken when there is an error in assigning a program to a + // cache slot. It is not possible to mock this error for SVM unit + // tests purposes. + let mut ret = ProgramCacheForTxBatch::new_from_cache( + self.slot, + self.epoch, + &program_cache, + ); + ret.hit_max_limit = true; + return ret; + } } else if missing_programs.is_empty() { break; } else {