-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
program cache: reduce contention #1192
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,11 +387,10 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> { | |
.collect(); | ||
|
||
let mut loaded_programs_for_txs = None; | ||
let mut program_to_store = None; | ||
loop { | ||
let (program_to_load, task_cookie, task_waiter) = { | ||
let (program_to_store, task_cookie, task_waiter) = { | ||
// Lock the global cache. | ||
let mut program_cache = self.program_cache.write().unwrap(); | ||
let program_cache = self.program_cache.read().unwrap(); | ||
// Initialize our local cache. | ||
let is_first_round = loaded_programs_for_txs.is_none(); | ||
if is_first_round { | ||
|
@@ -401,49 +400,51 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> { | |
&program_cache, | ||
)); | ||
} | ||
// Submit our last completed loading task. | ||
if let Some((key, program)) = program_to_store.take() { | ||
loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alessandrod I think you accidentally deleted this line here while moving the code. It was added in #1037 which was open around the same time as this PR, so most likely a merge conflict that was not resolved properly. When I investigated the validator node of @buffalojoec (which went out of disk space as it was not configured to rotate log files) I discovered that the program cache was growing far beyond the capacity as it was not evicting anymore. Can you open a PR to add the line back in? I will add a metric for the number of loaded entries (complementary to the evictions metric we already have) recorded in |
||
if program_cache.finish_cooperative_loading_task(self.slot, key, program) | ||
&& limit_to_load_programs | ||
{ | ||
// This branch is taken when there is an error in assigning a program to a | ||
// cache slot. It is not possible to mock this error for SVM unit | ||
// tests purposes. | ||
let mut ret = ProgramCacheForTxBatch::new_from_cache( | ||
self.slot, | ||
self.epoch, | ||
&program_cache, | ||
); | ||
ret.hit_max_limit = true; | ||
return ret; | ||
} | ||
} | ||
// Figure out which program needs to be loaded next. | ||
let program_to_load = program_cache.extract( | ||
&mut missing_programs, | ||
loaded_programs_for_txs.as_mut().unwrap(), | ||
is_first_round, | ||
); | ||
|
||
let program_to_store = program_to_load.map(|(key, count)| { | ||
// Load, verify and compile one program. | ||
let program = load_program_with_pubkey( | ||
callback, | ||
&program_cache, | ||
&key, | ||
self.slot, | ||
self.epoch, | ||
&self.epoch_schedule, | ||
false, | ||
) | ||
.expect("called load_program_with_pubkey() with nonexistent account"); | ||
program.tx_usage_counter.store(count, Ordering::Relaxed); | ||
(key, program) | ||
}); | ||
|
||
let task_waiter = Arc::clone(&program_cache.loading_task_waiter); | ||
(program_to_load, task_waiter.cookie(), task_waiter) | ||
(program_to_store, task_waiter.cookie(), task_waiter) | ||
// Unlock the global cache again. | ||
}; | ||
|
||
if let Some((key, count)) = program_to_load { | ||
// Load, verify and compile one program. | ||
let program = load_program_with_pubkey( | ||
callback, | ||
&self.program_cache.read().unwrap(), | ||
&key, | ||
self.slot, | ||
self.epoch, | ||
&self.epoch_schedule, | ||
false, | ||
) | ||
.expect("called load_program_with_pubkey() with nonexistent account"); | ||
program.tx_usage_counter.store(count, Ordering::Relaxed); | ||
program_to_store = Some((key, program)); | ||
if let Some((key, program)) = program_to_store { | ||
let mut program_cache = self.program_cache.write().unwrap(); | ||
// Submit our last completed loading task. | ||
if program_cache.finish_cooperative_loading_task(self.slot, key, program) | ||
&& limit_to_load_programs | ||
{ | ||
// This branch is taken when there is an error in assigning a program to a | ||
// cache slot. It is not possible to mock this error for SVM unit | ||
// tests purposes. | ||
let mut ret = ProgramCacheForTxBatch::new_from_cache( | ||
self.slot, | ||
self.epoch, | ||
&program_cache, | ||
); | ||
ret.hit_max_limit = true; | ||
return ret; | ||
} | ||
} else if missing_programs.is_empty() { | ||
break; | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the replacement of the index structure I am working on I also experimented with pulling
cooperative_loading_lock
out of theSecondLevel
. We might be able to take this even further because when we use the account last-modified-slot we could also parallelize the loading of missing programs. Basically the "The deployment slot of a program is only known after load tho" part could be improved then.