From a5503afe2abc2b563d9b1faf60e8bcc514f14c1c Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 22 Feb 2024 16:20:10 +0900 Subject: [PATCH 01/31] Introduce SchedulingStateMachine --- Cargo.lock | 3 + programs/sbf/Cargo.lock | 3 + unified-scheduler-logic/Cargo.toml | 2 + unified-scheduler-logic/src/lib.rs | 1327 +++++++++++++++++++++++++++- unified-scheduler-pool/Cargo.toml | 1 + unified-scheduler-pool/src/lib.rs | 171 ++-- 6 files changed, 1434 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 528c7d00f33d04..2adb05f629804b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7436,7 +7436,9 @@ dependencies = [ name = "solana-unified-scheduler-logic" version = "1.19.0" dependencies = [ + "assert_matches", "solana-sdk", + "static_assertions", ] [[package]] @@ -7445,6 +7447,7 @@ version = "1.19.0" dependencies = [ "assert_matches", "crossbeam-channel", + "dashmap", "derivative", "log", "solana-ledger", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9e8a39c79ce1c8..e052a2479441f8 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6443,7 +6443,9 @@ dependencies = [ name = "solana-unified-scheduler-logic" version = "1.19.0" dependencies = [ + "assert_matches", "solana-sdk", + "static_assertions", ] [[package]] @@ -6452,6 +6454,7 @@ version = "1.19.0" dependencies = [ "assert_matches", "crossbeam-channel", + "dashmap", "derivative", "log", "solana-ledger", diff --git a/unified-scheduler-logic/Cargo.toml b/unified-scheduler-logic/Cargo.toml index b2e80c79c7a08f..b05cec41a7c862 100644 --- a/unified-scheduler-logic/Cargo.toml +++ b/unified-scheduler-logic/Cargo.toml @@ -10,4 +10,6 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +assert_matches = { workspace = true } solana-sdk = { workspace = true } +static_assertions = { workspace = true } diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 997c6c1745a7c9..c84dddfa6cf1a7 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -1,15 +1,308 @@ -use solana_sdk::transaction::SanitizedTransaction; +#![allow(rustdoc::private_intra_doc_links)] +//! The task (transaction) scheduling code for the unified scheduler +//! +//! ### High-level API and design +//! +//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactons) and +//! may return back them if runnable via +//! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account +//! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to +//! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion +//! of the exeuciton via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that +//! conflicting tasks can be returned from +//! [`::schedule_unblocked_task()`](SchedulingStateMachine::schedule_unblocked_task) as +//! newly-unblocked runnable ones. +//! +//! The design principle of this crate (`solana-unified-scheduler-logic`) is simplicity for the +//! separation of concern. It is interacted only with a few of its public API by +//! `solana-unified-scheduler-pool`. This crate doesn't know about banks, slots, solana-runtime, +//! threads, crossbeam-channel at all. Becasue of this, it's deterministic, easy-to-unit-test, and +//! its perf footprint is well understood. It really focuses on its single job: sorting +//! transactions in executable order. +//! +//! ### Algorithm +//! +//! The algorithm can be said it's based on per-address FIFO queues, which are updated every time +//! both new task is coming (= called _scheduling_) and runnable (= _post-scheduling_) task is +//! finished (= called _descheduling_). +//! +//! For the _non-conflicting scheduling_ case, the story is very simple; it just remembers that all +//! of accessed addresses are write-locked or read-locked with the number of active (= +//! _currently-scheduled-and-not-descheduled-yet_) tasks. Correspondingly, descheduling does the +//! opposite book-keeping process, regardless whether a finished task has been conflicted or not. +//! +//! For the _conflicting scheduling_ case, it remembers that each of **non-conflicting addresses** +//! like the non-conflicting case above. As for **conflicting addresses**, each task is recorded to +//! respective FIFO queues attached to the (conflicting) addresses. Importantly, the number of +//! conflicting addresses of the conflicting task is also remembered. +//! +//! The last missing piece is that the scheduler actually tries to reschedule previously blocked +//! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely, +//! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write +//! lock is released or read lock count is reached to zero), it pops out the first element of the +//! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It +//! also decrements the number of conflicting addresses of the popped-out task. As the final step, +//! if the number reaches to the zero, it means the task has fully finished locking all of its +//! addresses and is directly routed to be runnable. +//! +//! Put differently, this algorigthm tries to gradually lock all of addresses of tasks at different +//! timings while not deviating the execution order from the original task ingestion order. This +//! implies there's no locking retries in general, which is the primary source of non-linear perf. +//! degration. +//! +//! As a ballpark number from a synthesized micro benchmark on usual CPU for `mainnet-beta` +//! validators, it takes roughly 100ns to schedule and deschedule a transaction with 10 accounts. +//! And 1us for a transaction with 100 accounts. Note that this excludes crossbeam communication +//! overhead at all. That's said, it's not unrealistic to say the whole unified scheduler can +//! attain 100k-1m tps overall, assuming those transaction executions aren't bottlenecked. +//! +//! ### Runtime performance characteristics and data structure arrangement +//! +//! Its algorithm is very fast for high throughput, real-time for low latency. The whole +//! unified-scheduler architecture is designed from grounds up to support the fastest execution of +//! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state +//! data structures (called [`Page`]) for all of transaction's accounts, in order to offload the +//! job to other threads from the scheduler thread. This preloading is done inside +//! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling +//! computational complexity is basically reduced to several word-sized loads and stores in the +//! schduler thread (i.e. constant; no allocations nor syscalls), while being proportional to the +//! number of addresses in a given transaction. Note that this statement is held true, regardless +//! of conflicts. This is because the preloading also pre-allocates some scratch-pad area +//! ([`blocked_tasks`](PageInner::blocked_tasks)) to stash blocked ones. So, a conflict only incurs +//! some additional fixed number of mem stores, within error magin of the constant complexity. And +//! additional memory allocation for the scratchpad could said to be amortized, if such unsual +//! event should occur. +//! +//! [`Arc`] is used to implement this preloading mechanism, because `Page`s are shared across tasks +//! accessing the same account, and among threads due to the preloading. Also, interior mutability +//! is needed. However, `SchedulingStateMachine` doesn't use conventional locks like RwLock. +//! Leveraving the fact it's the only state-mutating exclusive thread, it instead uses +//! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell` +//! improses an overly restrictive aliasing rule via rust type system to maintain the memory +//! safety. By localizing any synchronization to the message passing, the scheduling code itself +//! attains maximally possible single-threaed execution without stalling cpu pipelines at all, only +//! constrained to mem access latency, while efficiently utilzing L1-L3 cpu cache with full of +//! `Page`s. +//! +//! ### Buffer bloat insignificance +//! +//! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in +//! unified scheduler, where a run of heavily linearized and blocked tasks could severely hampered +//! by very large number of interleaved runnable tasks along side. The reason is again for +//! separation of concerns. This is acceptable because the scheduling code itself isn't susceptible +//! to the buffer bloat problem by itself as explained by the description and validated by the +//! mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the scheduler +//! pool. +use { + crate::utils::{ShortCounter, Token, TokenCell}, + solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction}, + static_assertions::const_assert_eq, + std::{collections::VecDeque, mem, sync::Arc}, +}; -pub struct Task { +/// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`]. +mod utils { + use std::{ + any::{self, TypeId}, + cell::{RefCell, UnsafeCell}, + collections::BTreeSet, + marker::PhantomData, + thread, + }; + + /// A really tiny counter to hide `.checked_{add,sub}` all over the place. + /// + /// It's caller's reponsibility to ensure this (backed by [`u32`]) never overflow. + #[derive(Debug, Clone, Copy)] + pub(super) struct ShortCounter(u32); + + impl ShortCounter { + pub(super) fn zero() -> Self { + Self(0) + } + + pub(super) fn one() -> Self { + Self(1) + } + + pub(super) fn is_one(&self) -> bool { + self.0 == 1 + } + + pub(super) fn is_zero(&self) -> bool { + self.0 == 0 + } + + pub(super) fn current(&self) -> u32 { + self.0 + } + + #[must_use] + pub(super) fn increment(self) -> Self { + Self(self.0.checked_add(1).unwrap()) + } + + #[must_use] + pub(super) fn decrement(self) -> Self { + Self(self.0.checked_sub(1).unwrap()) + } + + pub(super) fn increment_self(&mut self) -> &mut Self { + *self = self.increment(); + self + } + + pub(super) fn decrement_self(&mut self) -> &mut Self { + *self = self.decrement(); + self + } + + pub(super) fn reset_to_zero(&mut self) -> &mut Self { + self.0 = 0; + self + } + } + + /// A conditionally [`Send`]-able and [`Sync`]-able cell leveraging scheduler's one-by-one data + /// access pattern with zero runtime synchronization cost. + /// + /// To comply with Rust's aliasing rules, these cells require a carefully-created [`Token`] to + /// be passed around to access the inner values. The token is a special-purpose phantom object + /// to get rid of its inherent `unsafe`-ness in [`UnsafeCell`], which is internally used for + /// the interior mutability. + /// + /// The final objective of [`Token`] is to ensure there's only one mutable reference to the + /// [`TokenCell`] at most _at any given moment_. To that end, it's `unsafe` to create it, + /// shifting the responsibility of binding the only singleton instance to a particular thread + /// and not creating more than one, onto the API consumers. And its constructor is non-`const`, + /// and the type is `!Clone` (and `!Copy` as well), `!Default`, `!Send` and `!Sync` to make it + /// relatively hard to cross thread boundaries accidentally. + /// + /// In other words, the token semantically _owns_ all of its associated instances of + /// [`TokenCell`]s. And `&mut Token` is needed to access one of them as if the one is of + /// [`Token`]'s `*_mut()` getters. Thus, the Rust aliasing rule for `UnsafeCell` can + /// transitively be proven to be satisfied simply based on the usual borrow checking of the + /// `&mut` reference of [`Token`] itself via [`::borrow_mut()`](TokenCell::borrow_mut). + /// + /// By extension, it's allowed to create _multiple_ tokens in a _single_ process as long as no + /// instance of [`TokenCell`] is shared by multiple instances of [`Token`]. + /// + /// Note that this is overly restrictive in that it's forbidden, yet, technically possible + /// to _have multiple mutable references to the inner values at the same time, if and only + /// if the respective cells aren't aliased to each other (i.e. different instances)_. This + /// artificial restriction is acceptable for its intended use by the unified scheduler's code + /// because its algorithm only needs to access each instance of [`TokenCell`]-ed data once at a + /// time. Finally, this restriction is traded off for restoration of Rust aliasing rule at zero + /// runtime cost. Without this token mechanism, there's no way to realize this. + #[derive(Debug, Default)] + pub(super) struct TokenCell(UnsafeCell); + + impl TokenCell { + /// Creates a new `TokenCell` with the `value` typed as `V`. + /// + /// Note that this isn't parametric over the its accompanied `Token`'s lifetime to avoid + /// complex handling of non-`'static` heaped data in general. Instead, it's manually + /// required to ensure this instance is accessed only via its associated Token for the + /// entire lifetime. + // non-const to forbid unprotected sharing via static variables among threads. + pub(super) fn new(value: V) -> Self { + Self(UnsafeCell::new(value)) + } + + /// Returns a mutable reference with its lifetime bound to the mutable reference of the + /// given token. + /// + /// In this way, any additional reborrow can never happen at the same time across all + /// instances of [`TokenCell`] conceptually owned by the instance of [`Token`] (a + /// particular thread), unless previous borrow is released. After the release, the used + /// singleton token should be free to be reused for reborrows. + pub(super) fn borrow_mut<'t>(&self, _token: &'t mut Token) -> &'t mut V { + unsafe { &mut *self.0.get() } + } + } + + // Safety: Access to TokenCell is assumed to be only from a single thread by proper use of + // Token once after TokenCell is sent to the thread from other threads; So, both implementing + // Send and Sync can be thought as safe. + // + // In other words, TokenCell is technicall still !Send and !Sync. But there should be no legal + // use happening which requires !Send or !Sync to avoid undefined behavior. + unsafe impl Send for TokenCell {} + unsafe impl Sync for TokenCell {} + + /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system + /// + /// Token semantically owns a collection of `TokenCell` objects and governs the _unique_ + /// existence of mutable access over them by requiring the token itself to be mutably borrowed + /// to get a mutable reference to the internal value of `TokenCell`. + // *mut is used to make this type !Send and !Sync + pub(super) struct Token(PhantomData<*mut V>); + + impl Token { + // Returns the token to acquire a mutable reference to the inner value of [TokenCell]. + // + // Safety: + // This method should be called exactly once for each thread at most. + #[must_use] + pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self { + thread_local! { + static TOKENS: RefCell> = const { RefCell::new(BTreeSet::new()) }; + } + assert!( + TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::())), + "{:?} is wrongly initialized twice on {:?}", + any::type_name::(), + thread::current() + ); + + Self(PhantomData) + } + } + + #[cfg(test)] + mod tests { + use super::Token; + + #[test] + #[should_panic( + expected = "\"solana_unified_scheduler_logic::utils::Token\" is wrongly \ + initialized twice on Thread" + )] + fn test_second_creation_of_tokens_in_a_thread() { + unsafe { + let _ = Token::::assume_exclusive_mutating_thread(); + let _ = Token::::assume_exclusive_mutating_thread(); + } + } + } +} + +/// [`Result`] for locking a [page](Page) with particular [usage](RequestedUsage). +type LockResult = Result; +const_assert_eq!(mem::size_of::(), 8); + +/// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`]. +pub type Task = Arc; +const_assert_eq!(mem::size_of::(), 8); + +/// [`Token`] for [`Page`]. +type PageToken = Token; +const_assert_eq!(mem::size_of::(), 0); + +/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_page_count`). +type BlockedPageCountToken = Token; +const_assert_eq!(mem::size_of::(), 0); + +/// Internal scheduling data about a particular task. +#[derive(Debug)] +pub struct TaskInner { transaction: SanitizedTransaction, index: usize, + lock_attempts: Vec, + blocked_page_count: TokenCell, } -impl Task { - pub fn create_task(transaction: SanitizedTransaction, index: usize) -> Self { - Task { transaction, index } - } - +impl TaskInner { pub fn task_index(&self) -> usize { self.index } @@ -17,4 +310,1024 @@ impl Task { pub fn transaction(&self) -> &SanitizedTransaction { &self.transaction } + + fn lock_attempts(&self) -> &Vec { + &self.lock_attempts + } + + fn blocked_page_count_mut<'t>( + &self, + token: &'t mut BlockedPageCountToken, + ) -> &'t mut ShortCounter { + self.blocked_page_count.borrow_mut(token) + } + + fn set_blocked_page_count(&self, token: &mut BlockedPageCountToken, count: ShortCounter) { + *self.blocked_page_count_mut(token) = count; + } + + #[must_use] + fn try_unblock(self: &Task, token: &mut BlockedPageCountToken) -> Option { + self.blocked_page_count_mut(token) + .decrement_self() + .is_zero() + .then(|| self.clone()) + } +} + +/// [`Task`]'s per-address attempt to use a [page](Page) with [certain kind of +/// request](RequestedUsage). +#[derive(Debug)] +struct LockAttempt { + page: Page, + requested_usage: RequestedUsage, +} +const_assert_eq!(mem::size_of::(), 16); + +impl LockAttempt { + fn new(page: Page, requested_usage: RequestedUsage) -> Self { + Self { + page, + requested_usage, + } + } + + fn page_mut<'t>(&self, page_token: &'t mut PageToken) -> &'t mut PageInner { + self.page.0.borrow_mut(page_token) + } +} + +/// Status about how the [`Page`] is used currently. Unlike [`RequestedUsage`], it has additional +/// variant of [`Unused`](`PageUsage::Unused`). +#[derive(Copy, Clone, Debug, Default)] +enum PageUsage { + #[default] + Unused, + Readonly(ShortCounter), + Writable, +} +const_assert_eq!(mem::size_of::(), 8); + +impl PageUsage { + fn from_requested_usage(requested_usage: RequestedUsage) -> Self { + match requested_usage { + RequestedUsage::Readonly => PageUsage::Readonly(ShortCounter::one()), + RequestedUsage::Writable => PageUsage::Writable, + } + } +} + +/// Status about how a task is requesting to use a particular [`Page`]. Unlike [`PageUsage`], +/// it has only two unit variants. +#[derive(Clone, Copy, Debug)] +enum RequestedUsage { + Readonly, + Writable, +} + +/// Internal scheduling data about a particular address. +/// +/// Specifially, it holds the current [`PageUsage`] (or no usage with [`PageUsage::Unused`]) and +/// which [`Task`]s are blocked to be executed after the current task is notified to be finished +/// via [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) +#[derive(Debug)] +struct PageInner { + usage: PageUsage, + blocked_tasks: VecDeque<(Task, RequestedUsage)>, +} + +impl Default for PageInner { + fn default() -> Self { + Self { + usage: PageUsage::default(), + blocked_tasks: VecDeque::with_capacity(1024), + } + } +} + +impl PageInner { + fn push_blocked_task(&mut self, task: Task, requested_usage: RequestedUsage) { + self.blocked_tasks.push_back((task, requested_usage)); + } + + fn has_no_blocked_task(&self) -> bool { + self.blocked_tasks.is_empty() + } + + #[must_use] + fn pop_unblocked_next_task(&mut self) -> Option<(Task, RequestedUsage)> { + self.blocked_tasks.pop_front() + } + + #[must_use] + fn blocked_next_task(&self) -> Option<(&Task, RequestedUsage)> { + self.blocked_tasks + .front() + .map(|(task, requested_usage)| (task, *requested_usage)) + } + + #[must_use] + fn pop_blocked_next_readonly_task(&mut self) -> Option<(Task, RequestedUsage)> { + if matches!( + self.blocked_next_task(), + Some((_, RequestedUsage::Readonly)) + ) { + self.pop_unblocked_next_task() + } else { + None + } + } +} + +const_assert_eq!(mem::size_of::>(), 40); + +/// Scheduler's internal data for each address ([`Pubkey`](`solana_sdk::pubkey::Pubkey`)). Very +/// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and +/// [`::default()`](Default::default). +#[derive(Debug, Clone, Default)] +pub struct Page(Arc>); +const_assert_eq!(mem::size_of::(), 8); + +/// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by +/// `solana-unified-scheduler-pool`. +pub struct SchedulingStateMachine { + last_task_index: Option, + unblocked_task_queue: VecDeque, + active_task_count: ShortCounter, + handled_task_count: ShortCounter, + unblocked_task_count: ShortCounter, + total_task_count: ShortCounter, + count_token: BlockedPageCountToken, + page_token: PageToken, +} +const_assert_eq!(mem::size_of::(), 64); + +impl SchedulingStateMachine { + pub fn has_no_active_task(&self) -> bool { + self.active_task_count.is_zero() + } + + pub fn unblocked_task_queue_count(&self) -> usize { + self.unblocked_task_queue.len() + } + + pub fn active_task_count(&self) -> u32 { + self.active_task_count.current() + } + + pub fn handled_task_count(&self) -> u32 { + self.handled_task_count.current() + } + + pub fn unblocked_task_count(&self) -> u32 { + self.unblocked_task_count.current() + } + + pub fn total_task_count(&self) -> u32 { + self.total_task_count.current() + } + + #[must_use] + pub fn schedule_task(&mut self, task: Task) -> Option { + let new_task_index = task.task_index(); + if let Some(old_task_index) = self.last_task_index.replace(new_task_index) { + assert!( + new_task_index > old_task_index, + "bad new task index: {new_task_index} > {old_task_index}" + ); + } + self.total_task_count.increment_self(); + self.active_task_count.increment_self(); + self.attempt_lock_for_task(task) + } + + pub fn has_unblocked_task(&self) -> bool { + !self.unblocked_task_queue.is_empty() + } + + #[must_use] + pub fn schedule_unblocked_task(&mut self) -> Option { + self.unblocked_task_queue.pop_front().map(|task| { + self.unblocked_task_count.increment_self(); + task + }) + } + + pub fn deschedule_task(&mut self, task: &Task) { + let descheduled_task_index = task.task_index(); + let largest_task_index = self + .last_task_index + .expect("task should have been scheduled"); + assert!( + descheduled_task_index <= largest_task_index, + "bad descheduled task index: {descheduled_task_index} <= {largest_task_index}" + ); + self.active_task_count.decrement_self(); + self.handled_task_count.increment_self(); + self.unlock_for_task(task); + } + + #[must_use] + fn attempt_lock_pages(&mut self, task: &Task) -> ShortCounter { + let mut blocked_page_count = ShortCounter::zero(); + + for attempt in task.lock_attempts() { + let page = attempt.page_mut(&mut self.page_token); + let lock_status = if page.has_no_blocked_task() { + Self::attempt_lock_page(page, attempt.requested_usage) + } else { + LockResult::Err(()) + }; + match lock_status { + LockResult::Ok(PageUsage::Unused) => unreachable!(), + LockResult::Ok(new_usage) => { + page.usage = new_usage; + } + LockResult::Err(()) => { + blocked_page_count.increment_self(); + page.push_blocked_task(task.clone(), attempt.requested_usage); + } + } + } + + blocked_page_count + } + + fn attempt_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { + match page.usage { + PageUsage::Unused => LockResult::Ok(PageUsage::from_requested_usage(requested_usage)), + PageUsage::Readonly(count) => match requested_usage { + RequestedUsage::Readonly => LockResult::Ok(PageUsage::Readonly(count.increment())), + RequestedUsage::Writable => LockResult::Err(()), + }, + PageUsage::Writable => LockResult::Err(()), + } + } + + #[must_use] + fn unlock_page(page: &mut PageInner, attempt: &LockAttempt) -> Option<(Task, RequestedUsage)> { + let mut is_unused_now = false; + match &mut page.usage { + PageUsage::Readonly(ref mut count) => match attempt.requested_usage { + RequestedUsage::Readonly => { + if count.is_one() { + is_unused_now = true; + } else { + count.decrement_self(); + } + } + RequestedUsage::Writable => unreachable!(), + }, + PageUsage::Writable => match attempt.requested_usage { + RequestedUsage::Writable => { + is_unused_now = true; + } + RequestedUsage::Readonly => unreachable!(), + }, + PageUsage::Unused => unreachable!(), + } + + if is_unused_now { + page.usage = PageUsage::Unused; + page.pop_unblocked_next_task() + } else { + None + } + } + + #[must_use] + fn attempt_lock_for_task(&mut self, task: Task) -> Option { + let blocked_page_count = self.attempt_lock_pages(&task); + + if blocked_page_count.is_zero() { + // succeeded + Some(task) + } else { + // failed + task.set_blocked_page_count(&mut self.count_token, blocked_page_count); + None + } + } + + fn unlock_for_task(&mut self, task: &Task) { + for unlock_attempt in task.lock_attempts() { + let page = unlock_attempt.page_mut(&mut self.page_token); + let mut unblocked_task_from_page = Self::unlock_page(page, unlock_attempt); + + while let Some((task_with_unblocked_page, requested_usage)) = unblocked_task_from_page { + if let Some(task) = task_with_unblocked_page.try_unblock(&mut self.count_token) { + self.unblocked_task_queue.push_back(task); + } + + match Self::attempt_lock_page(page, requested_usage) { + LockResult::Ok(PageUsage::Unused) => unreachable!(), + LockResult::Ok(new_usage) => { + page.usage = new_usage; + // Try to further schedule blocked task for parallelism in the case of + // readonly usages + unblocked_task_from_page = if matches!(new_usage, PageUsage::Readonly(_)) { + page.pop_blocked_next_readonly_task() + } else { + None + }; + } + LockResult::Err(_) => panic!("should never fail in this context"), + } + } + } + } + + /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding [`Page`]s + /// preloaded. + /// + /// Closure (`page_loader`) is used to delegate the (possibly multi-threaded) + /// implementation of [`Page`] look-up by [`pubkey`](Pubkey) to callers. It's the caller's + /// responsibility to ensure the same instance is returned from the closure, given a particular + /// pubkey. + pub fn create_task( + transaction: SanitizedTransaction, + index: usize, + page_loader: &mut impl FnMut(Pubkey) -> Page, + ) -> Task { + // this is safe bla bla + let locks = transaction.get_account_locks_unchecked(); + + let writable_locks = locks + .writable + .iter() + .map(|address| (address, RequestedUsage::Writable)); + let readonly_locks = locks + .readonly + .iter() + .map(|address| (address, RequestedUsage::Readonly)); + + let lock_attempts = writable_locks + .chain(readonly_locks) + .map(|(address, requested_usage)| { + LockAttempt::new(page_loader(**address), requested_usage) + }) + .collect(); + + Task::new(TaskInner { + transaction, + index, + lock_attempts, + blocked_page_count: TokenCell::new(ShortCounter::zero()), + }) + } + + /// Rewind the inactive state machine to be initialized + /// + /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment. + /// This panics if the state machine hasn't properly been finished (i.e. there should be no + /// active task) to uphold invariants of [`Page`]s. + /// + /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe` + /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling) + /// as much as possible) and its (possbily cached) associated [`Page`]s for processing other + /// slots. + pub fn reinitialize(&mut self) { + assert!(self.has_no_active_task()); + assert_eq!(self.unblocked_task_queue.len(), 0); + self.last_task_index = None; + self.active_task_count.reset_to_zero(); + self.handled_task_count.reset_to_zero(); + self.unblocked_task_count.reset_to_zero(); + self.total_task_count.reset_to_zero(); + } + + /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as + /// well, thus carrying over `unsafe`. + /// + /// # Safety + /// Call this exactly once for each thread. See [`TokenCell`] for details. + #[must_use] + pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self { + Self { + last_task_index: None, + unblocked_task_queue: VecDeque::with_capacity(1024), + active_task_count: ShortCounter::zero(), + handled_task_count: ShortCounter::zero(), + unblocked_task_count: ShortCounter::zero(), + total_task_count: ShortCounter::zero(), + count_token: unsafe { BlockedPageCountToken::assume_exclusive_mutating_thread() }, + page_token: unsafe { PageToken::assume_exclusive_mutating_thread() }, + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + assert_matches::assert_matches, + solana_sdk::{ + instruction::{AccountMeta, Instruction}, + message::Message, + pubkey::Pubkey, + signature::Signer, + signer::keypair::Keypair, + transaction::{SanitizedTransaction, Transaction}, + }, + std::{cell::RefCell, collections::HashMap, rc::Rc}, + }; + + fn simplest_transaction() -> SanitizedTransaction { + let payer = Keypair::new(); + let message = Message::new(&[], Some(&payer.pubkey())); + let unsigned = Transaction::new_unsigned(message); + SanitizedTransaction::from_transaction_for_tests(unsigned) + } + + fn transaction_with_readonly_address(address: Pubkey) -> SanitizedTransaction { + let instruction = Instruction { + program_id: Pubkey::default(), + accounts: vec![AccountMeta::new_readonly(address, false)], + data: vec![], + }; + let message = Message::new(&[instruction], Some(&Pubkey::new_unique())); + let unsigned = Transaction::new_unsigned(message); + SanitizedTransaction::from_transaction_for_tests(unsigned) + } + + fn transaction_with_writable_address(address: Pubkey) -> SanitizedTransaction { + let instruction = Instruction { + program_id: Pubkey::default(), + accounts: vec![AccountMeta::new(address, false)], + data: vec![], + }; + let message = Message::new(&[instruction], Some(&Pubkey::new_unique())); + let unsigned = Transaction::new_unsigned(message); + SanitizedTransaction::from_transaction_for_tests(unsigned) + } + + fn create_address_loader( + pages: Option>>>, + ) -> impl FnMut(Pubkey) -> Page { + let pages = pages.unwrap_or_default(); + move |address| pages.borrow_mut().entry(address).or_default().clone() + } + + #[test] + fn test_debug() { + // these are almost meaningless just to see eye-pleasing coverage report.... + assert_eq!( + format!( + "{:?}", + LockResult::Ok(PageUsage::Readonly(ShortCounter::one())) + ), + "Ok(Readonly(ShortCounter(1)))" + ); + let sanitized = simplest_transaction(); + let task = SchedulingStateMachine::create_task(sanitized, 0, &mut |_| Page::default()); + assert!(format!("{:?}", task).contains("TaskInner")); + + assert_eq!( + format!("{:?}", PageInner::default()), + "PageInner { usage: Unused, blocked_tasks: [] }" + ) + } + + #[test] + fn test_scheduling_state_machine_creation() { + let state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_eq!(state_machine.active_task_count(), 0); + assert_eq!(state_machine.total_task_count(), 0); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_scheduling_state_machine_reinitialization() { + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + state_machine.total_task_count.increment_self(); + assert_eq!(state_machine.total_task_count(), 1); + state_machine.last_task_index = Some(1); + state_machine.reinitialize(); + assert_eq!(state_machine.total_task_count(), 0); + assert_eq!(state_machine.last_task_index, None); + } + + #[test] + fn test_create_task() { + let sanitized = simplest_transaction(); + let task = + SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| Page::default()); + assert_eq!(task.task_index(), 3); + assert_eq!(task.transaction(), &sanitized); + } + + #[test] + fn test_non_conflicting_task_related_counts() { + let sanitized = simplest_transaction(); + let address_loader = &mut create_address_loader(None); + let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let task = state_machine.schedule_task(task).unwrap(); + assert_eq!(state_machine.active_task_count(), 1); + assert_eq!(state_machine.total_task_count(), 1); + state_machine.deschedule_task(&task); + assert_eq!(state_machine.active_task_count(), 0); + assert_eq!(state_machine.total_task_count(), 1); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_conflicting_task_related_counts() { + let sanitized = simplest_transaction(); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); + let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + + state_machine.deschedule_task(&task1); + assert!(state_machine.has_unblocked_task()); + assert_eq!(state_machine.unblocked_task_queue_count(), 1); + assert_eq!( + state_machine + .schedule_unblocked_task() + .unwrap() + .task_index(), + task2.task_index() + ); + assert!(!state_machine.has_unblocked_task()); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + state_machine.deschedule_task(&task2); + + assert_matches!( + state_machine + .schedule_task(task3.clone()) + .map(|task| task.task_index()), + Some(103) + ); + state_machine.deschedule_task(&task3); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_unblocked_task_related_counts() { + let sanitized = simplest_transaction(); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + + state_machine.deschedule_task(&task1); + + assert_eq!(state_machine.unblocked_task_count(), 0); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + assert_eq!(state_machine.unblocked_task_count(), 1); + // there's no blocked task anymore; calling schedule_unblocked_task should be noop and + // shouldn't increment the unblocked_task_count(). + assert_matches!(state_machine.schedule_unblocked_task(), None); + assert_eq!(state_machine.unblocked_task_count(), 1); + + state_machine.deschedule_task(&task2); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_existing_blocking_task_then_newly_scheduled_task() { + let sanitized = simplest_transaction(); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); + let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + state_machine.deschedule_task(&task1); + assert_eq!(state_machine.unblocked_task_queue_count(), 1); + + // new task is arriving after task1 is already descheduled and task2 got unblocked + assert_matches!(state_machine.schedule_task(task3.clone()), None); + + assert_eq!(state_machine.unblocked_task_count(), 0); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + assert_eq!(state_machine.unblocked_task_count(), 1); + + state_machine.deschedule_task(&task2); + + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(103) + ); + assert_eq!(state_machine.unblocked_task_count(), 2); + + state_machine.deschedule_task(&task3); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_multiple_readonly_task_and_counts() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_readonly_address(conflicting_address); + let sanitized2 = transaction_with_readonly_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + // both of read-only tasks should be immediately runnable + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!( + state_machine + .schedule_task(task2.clone()) + .map(|t| t.task_index()), + Some(102) + ); + + assert_eq!(state_machine.active_task_count(), 2); + assert_eq!(state_machine.handled_task_count(), 0); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + state_machine.deschedule_task(&task1); + assert_eq!(state_machine.active_task_count(), 1); + assert_eq!(state_machine.handled_task_count(), 1); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + state_machine.deschedule_task(&task2); + assert_eq!(state_machine.active_task_count(), 0); + assert_eq!(state_machine.handled_task_count(), 2); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_all_blocking_redable_tasks_block_writable_task() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_readonly_address(conflicting_address); + let sanitized2 = transaction_with_readonly_address(conflicting_address); + let sanitized3 = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!( + state_machine + .schedule_task(task2.clone()) + .map(|t| t.task_index()), + Some(102) + ); + assert_matches!(state_machine.schedule_task(task3.clone()), None); + + assert_eq!(state_machine.active_task_count(), 3); + assert_eq!(state_machine.handled_task_count(), 0); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + state_machine.deschedule_task(&task1); + assert_eq!(state_machine.active_task_count(), 2); + assert_eq!(state_machine.handled_task_count(), 1); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); + assert_matches!(state_machine.schedule_unblocked_task(), None); + state_machine.deschedule_task(&task2); + assert_eq!(state_machine.active_task_count(), 1); + assert_eq!(state_machine.handled_task_count(), 2); + assert_eq!(state_machine.unblocked_task_queue_count(), 1); + // task3 is finally unblocked after all of readble tasks (task1 and task2) is finished. + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(103) + ); + state_machine.deschedule_task(&task3); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_readonly_then_writable_then_readonly_linearized() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_readonly_address(conflicting_address); + let sanitized2 = transaction_with_writable_address(conflicting_address); + let sanitized3 = transaction_with_readonly_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + assert_matches!(state_machine.schedule_task(task3.clone()), None); + + assert_matches!(state_machine.schedule_unblocked_task(), None); + state_machine.deschedule_task(&task1); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + assert_matches!(state_machine.schedule_unblocked_task(), None); + state_machine.deschedule_task(&task2); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(103) + ); + assert_matches!(state_machine.schedule_unblocked_task(), None); + state_machine.deschedule_task(&task3); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_readonly_then_writable() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_readonly_address(conflicting_address); + let sanitized2 = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + + // descheduling read-locking task1 should equate to unblocking write-locking task2 + state_machine.deschedule_task(&task1); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + state_machine.deschedule_task(&task2); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_blocked_tasks_writable_2_readonly_then_writable() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_writable_address(conflicting_address); + let sanitized2 = transaction_with_readonly_address(conflicting_address); + let sanitized3 = transaction_with_readonly_address(conflicting_address); + let sanitized4 = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader); + let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + assert_matches!(state_machine.schedule_task(task3.clone()), None); + assert_matches!(state_machine.schedule_task(task4.clone()), None); + + state_machine.deschedule_task(&task1); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(103) + ); + // the above deschedule_task(task1) call should only unblock task2 and task3 because these + // are read-locking. And shouldn't unblock task4 because it's write-locking + assert_matches!(state_machine.schedule_unblocked_task(), None); + + state_machine.deschedule_task(&task2); + // still task4 is blocked... + assert_matches!(state_machine.schedule_unblocked_task(), None); + + state_machine.deschedule_task(&task3); + // finally task4 should be unblocked + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(104) + ); + state_machine.deschedule_task(&task4); + assert!(state_machine.has_no_active_task()); + } + + #[test] + fn test_gradual_locking() { + let conflicting_address = Pubkey::new_unique(); + let sanitized1 = transaction_with_writable_address(conflicting_address); + let sanitized2 = transaction_with_writable_address(conflicting_address); + let pages = Rc::new(RefCell::new(HashMap::new())); + let address_loader = &mut create_address_loader(Some(pages.clone())); + let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + assert_matches!( + state_machine + .schedule_task(task1.clone()) + .map(|t| t.task_index()), + Some(101) + ); + assert_matches!(state_machine.schedule_task(task2.clone()), None); + let pages = pages.borrow_mut(); + let page = pages.get(&conflicting_address).unwrap(); + assert_matches!( + page.0.borrow_mut(&mut state_machine.page_token).usage, + PageUsage::Writable + ); + // task2's fee payer should have been locked already even if task2 is blocked still via the + // above the schedule_task(task2) call + let fee_payer = task2.transaction().message().fee_payer(); + let page = pages.get(fee_payer).unwrap(); + assert_matches!( + page.0.borrow_mut(&mut state_machine.page_token).usage, + PageUsage::Writable + ); + } + + #[test] + #[should_panic(expected = "internal error: entered unreachable code")] + fn test_unreachable_unlock_conditions1() { + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let page = Page::default(); + let _ = SchedulingStateMachine::unlock_page( + page.0.borrow_mut(&mut state_machine.page_token), + &LockAttempt::new(page, RequestedUsage::Writable), + ); + } + + #[test] + #[should_panic(expected = "internal error: entered unreachable code")] + fn test_unreachable_unlock_conditions2() { + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let page = Page::default(); + page.0.borrow_mut(&mut state_machine.page_token).usage = PageUsage::Writable; + let _ = SchedulingStateMachine::unlock_page( + page.0.borrow_mut(&mut state_machine.page_token), + &LockAttempt::new(page, RequestedUsage::Readonly), + ); + } + + #[test] + #[should_panic(expected = "internal error: entered unreachable code")] + fn test_unreachable_unlock_conditions3() { + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let page = Page::default(); + page.0.borrow_mut(&mut state_machine.page_token).usage = + PageUsage::Readonly(ShortCounter::one()); + let _ = SchedulingStateMachine::unlock_page( + page.0.borrow_mut(&mut state_machine.page_token), + &LockAttempt::new(page, RequestedUsage::Writable), + ); + } + + #[test] + #[should_panic(expected = "bad new task index: 101 > 101")] + fn test_schedule_same_task() { + let conflicting_address = Pubkey::new_unique(); + let sanitized = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task = SchedulingStateMachine::create_task(sanitized, 101, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let _ = state_machine.schedule_task(task.clone()); + let _ = state_machine.schedule_task(task.clone()); + } + + #[test] + #[should_panic(expected = "bad new task index: 101 > 102")] + fn test_schedule_task_out_of_order() { + let conflicting_address = Pubkey::new_unique(); + let sanitized = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let _ = state_machine.schedule_task(task2.clone()); + let _ = state_machine.schedule_task(task1.clone()); + } + + #[test] + #[should_panic(expected = "task should have been scheduled")] + fn test_deschedule_new_task_wihout_scheduling() { + let conflicting_address = Pubkey::new_unique(); + let sanitized = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + state_machine.deschedule_task(&task); + } + + #[test] + #[should_panic(expected = "bad descheduled task index: 102 <= 101")] + fn test_deschedule_new_task_out_of_order() { + let conflicting_address = Pubkey::new_unique(); + let sanitized = transaction_with_writable_address(conflicting_address); + let address_loader = &mut create_address_loader(None); + let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); + let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); + + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; + let _ = state_machine.schedule_task(task1.clone()); + state_machine.deschedule_task(&task2); + } } diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 7626215b1e1126..9bd668f2799ab0 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } [dependencies] assert_matches = { workspace = true } crossbeam-channel = { workspace = true } +dashmap = { workspace = true } derivative = { workspace = true } log = { workspace = true } solana-ledger = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 09ded82ee88e7d..60cbcd626550fd 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1,3 +1,8 @@ +//! NOTE: While the unified scheduler is fully functional and moderately performant even with +//! mainnet-beta, it has known resource-exhaustion related security issues for replaying +//! specially-crafted blocks produced by malicious leaders. Thus, this functionality is exempt from +//! the bug bounty program for now. +//! //! Transaction scheduling code. //! //! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and @@ -10,7 +15,8 @@ use { assert_matches::assert_matches, - crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, + crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender}, + dashmap::DashMap, derivative::Derivative, log::*, solana_ledger::blockstore_processor::{ @@ -26,8 +32,11 @@ use { }, prioritization_fee_cache::PrioritizationFeeCache, }, - solana_sdk::transaction::{Result, SanitizedTransaction}, - solana_unified_scheduler_logic::Task, + solana_sdk::{ + pubkey::Pubkey, + transaction::{Result, SanitizedTransaction}, + }, + solana_unified_scheduler_logic::{Page, SchedulingStateMachine, Task}, solana_vote::vote_sender_types::ReplayVoteSender, std::{ fmt::Debug, @@ -90,10 +99,8 @@ where replay_vote_sender: Option, prioritization_fee_cache: Arc, ) -> Arc { - let handler_count = handler_count.unwrap_or(1); - // we're hard-coding the number of handler thread to 1, meaning this impl is currently - // single-threaded still. - assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later + let handler_count = handler_count.unwrap_or(Self::default_handler_count()); + assert!(handler_count >= 1); Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), @@ -386,13 +393,25 @@ mod chained_channel { } } +#[derive(Default, Debug)] +pub struct AddressBook { + book: DashMap, +} + +impl AddressBook { + pub fn load(&self, address: Pubkey) -> Page { + self.book.entry(address).or_default().clone() + } +} + +fn disconnected() -> Receiver { + unbounded().1 +} + fn initialized_result_with_timings() -> ResultWithTimings { (Ok(()), ExecuteTimings::default()) } -// Currently, simplest possible implementation (i.e. single-threaded) -// this will be replaced with more proper implementation... -// not usable at all, especially for mainnet-beta #[derive(Debug)] pub struct PooledScheduler { inner: PooledSchedulerInner, @@ -402,6 +421,7 @@ pub struct PooledScheduler { #[derive(Debug)] pub struct PooledSchedulerInner, TH: TaskHandler> { thread_manager: ThreadManager, + address_book: AddressBook, } // This type manages the OS threads for scheduling and executing transactions. The term @@ -427,6 +447,7 @@ impl PooledScheduler { Self::from_inner( PooledSchedulerInner:: { thread_manager: ThreadManager::new(pool), + address_book: AddressBook::default(), }, initial_context, ) @@ -518,7 +539,6 @@ impl, TH: TaskHandler> ThreadManager { let new_task_receiver = self.new_task_receiver.clone(); let mut session_ending = false; - let mut active_task_count: usize = 0; // Now, this is the main loop for the scheduler thread, which is a special beast. // @@ -558,61 +578,78 @@ impl, TH: TaskHandler> ThreadManager { // cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources // like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs // by design or by means of offloading at the last resort. - move || loop { - let mut is_finished = false; - while !is_finished { - select! { - recv(finished_task_receiver) -> executed_task => { - let executed_task = executed_task.unwrap(); - - active_task_count = active_task_count.checked_sub(1).unwrap(); - let result_with_timings = result_with_timings.as_mut().unwrap(); - Self::accumulate_result_with_timings(result_with_timings, executed_task); - }, - recv(new_task_receiver) -> message => { - assert!(!session_ending); - - match message.unwrap() { - NewTaskPayload::Payload(task) => { - // so, we're NOT scheduling at all here; rather, just execute - // tx straight off. the inter-tx locking deps aren't needed to - // be resolved in the case of single-threaded FIFO like this. - runnable_task_sender - .send_payload(task) - .unwrap(); - active_task_count = active_task_count.checked_add(1).unwrap(); - } - NewTaskPayload::OpenSubchannel(context) => { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(context, handler_count) - .unwrap(); - assert_matches!( - result_with_timings.replace(initialized_result_with_timings()), - None - ); - } - NewTaskPayload::CloseSubchannel => { - session_ending = true; - } - } - }, - }; + move || { + let (do_now, dont_now) = (&disconnected::<()>(), &never::<()>()); + let dummy_receiver = |trigger| { + if trigger { + do_now + } else { + dont_now + } + }; - // a really simplistic termination condition, which only works under the - // assumption of single handler thread... - is_finished = session_ending && active_task_count == 0; - } + let mut state_machine = unsafe { + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + }; - if session_ending { - session_result_sender - .send(Some( - result_with_timings - .take() - .unwrap_or_else(initialized_result_with_timings), - )) - .unwrap(); - session_ending = false; + loop { + let mut is_finished = false; + while !is_finished { + select! { + recv(finished_task_receiver) -> executed_task => { + let executed_task = executed_task.unwrap(); + + state_machine.deschedule_task(&executed_task.task); + let result_with_timings = result_with_timings.as_mut().unwrap(); + Self::accumulate_result_with_timings(result_with_timings, executed_task); + }, + recv(dummy_receiver(state_machine.has_unblocked_task())) -> dummy => { + assert_matches!(dummy, Err(RecvError)); + + if let Some(task) = state_machine.schedule_unblocked_task() { + runnable_task_sender.send_payload(task).unwrap(); + } + }, + recv(new_task_receiver) -> message => { + assert!(!session_ending); + + match message.unwrap() { + NewTaskPayload::Payload(task) => { + if let Some(task) = state_machine.schedule_task(task) { + runnable_task_sender.send_payload(task).unwrap(); + } + } + NewTaskPayload::OpenSubchannel(context) => { + // signal about new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(context, handler_count) + .unwrap(); + assert_matches!( + result_with_timings.replace(initialized_result_with_timings()), + None + ); + } + NewTaskPayload::CloseSubchannel => { + session_ending = true; + } + } + }, + }; + + is_finished = session_ending && state_machine.has_no_active_task(); + } + + if session_ending { + state_machine.reinitialize(); + session_result_sender + .send(Some( + result_with_timings + .take() + .unwrap_or_else(initialized_result_with_timings), + )) + .unwrap(); + session_ending = false; + } } } }; @@ -741,7 +778,9 @@ impl InstalledScheduler for PooledScheduler { } fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { - let task = Task::create_task(transaction.clone(), index); + let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { + self.inner.address_book.load(pubkey) + }); self.inner.thread_manager.send_task(task); } @@ -1023,7 +1062,7 @@ mod tests { .result, Ok(_) ); - scheduler.schedule_execution(&(good_tx_after_bad_tx, 0)); + scheduler.schedule_execution(&(good_tx_after_bad_tx, 1)); scheduler.pause_for_recent_blockhash(); // transaction_count should remain same as scheduler should be bailing out. // That's because we're testing the serialized failing execution case in this test. From 324b2d0eba275880989574fe933d0fa0ac5b4803 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 24 Feb 2024 16:22:29 +0900 Subject: [PATCH 02/31] Apply all typo fixes from code review Co-authored-by: Andrew Fitzgerald --- unified-scheduler-logic/src/lib.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index c84dddfa6cf1a7..622c1aef37b331 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -3,12 +3,12 @@ //! //! ### High-level API and design //! -//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactons) and +//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactions) and //! may return back them if runnable via //! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account //! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to //! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion -//! of the exeuciton via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that +//! of the exeuction via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that //! conflicting tasks can be returned from //! [`::schedule_unblocked_task()`](SchedulingStateMachine::schedule_unblocked_task) as //! newly-unblocked runnable ones. @@ -39,13 +39,13 @@ //! The last missing piece is that the scheduler actually tries to reschedule previously blocked //! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely, //! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write -//! lock is released or read lock count is reached to zero), it pops out the first element of the +//! lock is released or read lock count has reached zero), it pops out the first element of the //! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It //! also decrements the number of conflicting addresses of the popped-out task. As the final step, //! if the number reaches to the zero, it means the task has fully finished locking all of its //! addresses and is directly routed to be runnable. //! -//! Put differently, this algorigthm tries to gradually lock all of addresses of tasks at different +//! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different //! timings while not deviating the execution order from the original task ingestion order. This //! implies there's no locking retries in general, which is the primary source of non-linear perf. //! degration. @@ -69,25 +69,25 @@ //! number of addresses in a given transaction. Note that this statement is held true, regardless //! of conflicts. This is because the preloading also pre-allocates some scratch-pad area //! ([`blocked_tasks`](PageInner::blocked_tasks)) to stash blocked ones. So, a conflict only incurs -//! some additional fixed number of mem stores, within error magin of the constant complexity. And -//! additional memory allocation for the scratchpad could said to be amortized, if such unsual +//! some additional fixed number of mem stores, within error margin of the constant complexity. And +//! additional memory allocation for the scratchpad could said to be amortized, if such an unusual //! event should occur. //! //! [`Arc`] is used to implement this preloading mechanism, because `Page`s are shared across tasks //! accessing the same account, and among threads due to the preloading. Also, interior mutability //! is needed. However, `SchedulingStateMachine` doesn't use conventional locks like RwLock. -//! Leveraving the fact it's the only state-mutating exclusive thread, it instead uses +//! Leveraging the fact it's the only state-mutating exclusive thread, it instead uses //! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell` -//! improses an overly restrictive aliasing rule via rust type system to maintain the memory +//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory //! safety. By localizing any synchronization to the message passing, the scheduling code itself //! attains maximally possible single-threaed execution without stalling cpu pipelines at all, only -//! constrained to mem access latency, while efficiently utilzing L1-L3 cpu cache with full of +//! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of //! `Page`s. //! //! ### Buffer bloat insignificance //! //! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in -//! unified scheduler, where a run of heavily linearized and blocked tasks could severely hampered +//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely hampered //! by very large number of interleaved runnable tasks along side. The reason is again for //! separation of concerns. This is acceptable because the scheduling code itself isn't susceptible //! to the buffer bloat problem by itself as explained by the description and validated by the @@ -387,7 +387,7 @@ enum RequestedUsage { /// Internal scheduling data about a particular address. /// -/// Specifially, it holds the current [`PageUsage`] (or no usage with [`PageUsage::Unused`]) and +/// Specifically, it holds the current [`PageUsage`] (or no usage with [`PageUsage::Unused`]) and /// which [`Task`]s are blocked to be executed after the current task is notified to be finished /// via [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) #[derive(Debug)] From 8d8eb3df6fc98b34db2e3b0c2d6463ef302c4985 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 24 Feb 2024 16:31:32 +0900 Subject: [PATCH 03/31] Update word wrapping --- unified-scheduler-logic/src/lib.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 622c1aef37b331..696493d1822345 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -78,21 +78,21 @@ //! is needed. However, `SchedulingStateMachine` doesn't use conventional locks like RwLock. //! Leveraging the fact it's the only state-mutating exclusive thread, it instead uses //! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell` -//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory -//! safety. By localizing any synchronization to the message passing, the scheduling code itself -//! attains maximally possible single-threaed execution without stalling cpu pipelines at all, only +//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety. +//! By localizing any synchronization to the message passing, the scheduling code itself attains +//! maximally possible single-threaed execution without stalling cpu pipelines at all, only //! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of //! `Page`s. //! //! ### Buffer bloat insignificance //! //! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in -//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely hampered -//! by very large number of interleaved runnable tasks along side. The reason is again for -//! separation of concerns. This is acceptable because the scheduling code itself isn't susceptible -//! to the buffer bloat problem by itself as explained by the description and validated by the -//! mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the scheduler -//! pool. +//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely +//! hampered by very large number of interleaved runnable tasks along side. The reason is again +//! for separation of concerns. This is acceptable because the scheduling code itself isn't +//! susceptible to the buffer bloat problem by itself as explained by the description and validated +//! by the mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the +//! scheduler pool. use { crate::utils::{ShortCounter, Token, TokenCell}, solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction}, From a6022cc1c070548b9e814d53b03d03a7a1901dfd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 24 Feb 2024 16:35:41 +0900 Subject: [PATCH 04/31] Clarify Token::assume_exclusive_mutating_thread() --- unified-scheduler-logic/src/lib.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 696493d1822345..e9c6e537343eb4 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -239,15 +239,24 @@ mod utils { pub(super) struct Token(PhantomData<*mut V>); impl Token { - // Returns the token to acquire a mutable reference to the inner value of [TokenCell]. - // - // Safety: - // This method should be called exactly once for each thread at most. + /// Returns the token to acquire a mutable reference to the inner value of [TokenCell]. + /// + /// # Panics + /// + /// This function will `panic!()` if called multiple times with same type `V` from the same + /// thread to detect potential misuses. + /// + /// # Safety + /// + /// This method should be called exactly once for each thread at most to avoid undefined + /// behavior when used with [`Token`]. #[must_use] pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self { thread_local! { static TOKENS: RefCell> = const { RefCell::new(BTreeSet::new()) }; } + // TOKEN.with_borrow_mut can't panic because it's the only non-overlapping + // bound-to-local-variable borrow of the _thread local_ variable. assert!( TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::())), "{:?} is wrongly initialized twice on {:?}", From a54bc7f3668c5523836c555dc8ba7834c91a1050 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 24 Feb 2024 16:50:33 +0900 Subject: [PATCH 05/31] Use slice instead of &Vec<_> --- unified-scheduler-logic/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index e9c6e537343eb4..5f735ff76aa4dc 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -320,7 +320,7 @@ impl TaskInner { &self.transaction } - fn lock_attempts(&self) -> &Vec { + fn lock_attempts(&self) -> &[LockAttempt] { &self.lock_attempts } From e0415eab925680c42539c918a996ae45d982360d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 25 Feb 2024 13:27:57 +0900 Subject: [PATCH 06/31] Improve non-const explanation --- unified-scheduler-logic/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 5f735ff76aa4dc..1e9c9daf973fdf 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -204,7 +204,9 @@ mod utils { /// complex handling of non-`'static` heaped data in general. Instead, it's manually /// required to ensure this instance is accessed only via its associated Token for the /// entire lifetime. - // non-const to forbid unprotected sharing via static variables among threads. + /// + /// This is intentionally left to be non-`const` to forbid unprotected sharing via static + /// variables among threads. pub(super) fn new(value: V) -> Self { Self(UnsafeCell::new(value)) } @@ -241,6 +243,9 @@ mod utils { impl Token { /// Returns the token to acquire a mutable reference to the inner value of [TokenCell]. /// + /// This is intentionally left to be non-`const` to forbid unprotected sharing via static + /// variables among threads. + /// /// # Panics /// /// This function will `panic!()` if called multiple times with same type `V` from the same From 446a2ff8221691ddf0bd6067dec942a5efb5e1cf Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 25 Feb 2024 13:36:32 +0900 Subject: [PATCH 07/31] Document consecutive readonly rescheduling opt. --- unified-scheduler-logic/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 1e9c9daf973fdf..c2d40a6b32de6c 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -43,7 +43,9 @@ //! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It //! also decrements the number of conflicting addresses of the popped-out task. As the final step, //! if the number reaches to the zero, it means the task has fully finished locking all of its -//! addresses and is directly routed to be runnable. +//! addresses and is directly routed to be runnable. Lastly, if the next first element of the +//! blocked-task queue is trying to read-lock the address like the popped-out one, this +//! rescheduling is repeated as an optimization to increase parallelism of task execution. //! //! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different //! timings while not deviating the execution order from the original task ingestion order. This From 7a72de8c2ee1d75a9a4e0b54597fa7135a08638c Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 25 Feb 2024 14:31:19 +0900 Subject: [PATCH 08/31] Make test_gradual_locking terminate for miri --- unified-scheduler-logic/src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index c2d40a6b32de6c..b199398b0c009c 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -1242,6 +1242,15 @@ mod tests { page.0.borrow_mut(&mut state_machine.page_token).usage, PageUsage::Writable ); + state_machine.deschedule_task(&task1); + assert_matches!( + state_machine + .schedule_unblocked_task() + .map(|t| t.task_index()), + Some(102) + ); + state_machine.deschedule_task(&task2); + assert!(state_machine.has_no_active_task()); } #[test] From 92a9ba4e932a8d5dcd8ef1f950a8ad48d2a0c6bf Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 26 Feb 2024 13:37:53 +0900 Subject: [PATCH 09/31] Avoid unnecessary Task::clone() --- unified-scheduler-logic/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index b199398b0c009c..0ebbe5ea69eaa2 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -343,11 +343,11 @@ impl TaskInner { } #[must_use] - fn try_unblock(self: &Task, token: &mut BlockedPageCountToken) -> Option { + fn try_unblock(self: Task, token: &mut BlockedPageCountToken) -> Option { self.blocked_page_count_mut(token) .decrement_self() .is_zero() - .then(|| self.clone()) + .then_some(self) } } From 62e5a3802a3aed72bc9be7a5e0653f79c1e42309 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 26 Feb 2024 15:36:01 +0900 Subject: [PATCH 10/31] Rename: lock_{status,result} and no attempt_...() --- unified-scheduler-logic/src/lib.rs | 58 +++++++++++++----------------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 0ebbe5ea69eaa2..41865c1ada2005 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -514,7 +514,7 @@ impl SchedulingStateMachine { } self.total_task_count.increment_self(); self.active_task_count.increment_self(); - self.attempt_lock_for_task(task) + self.try_lock_for_task(task) } pub fn has_unblocked_task(&self) -> bool { @@ -543,33 +543,7 @@ impl SchedulingStateMachine { self.unlock_for_task(task); } - #[must_use] - fn attempt_lock_pages(&mut self, task: &Task) -> ShortCounter { - let mut blocked_page_count = ShortCounter::zero(); - - for attempt in task.lock_attempts() { - let page = attempt.page_mut(&mut self.page_token); - let lock_status = if page.has_no_blocked_task() { - Self::attempt_lock_page(page, attempt.requested_usage) - } else { - LockResult::Err(()) - }; - match lock_status { - LockResult::Ok(PageUsage::Unused) => unreachable!(), - LockResult::Ok(new_usage) => { - page.usage = new_usage; - } - LockResult::Err(()) => { - blocked_page_count.increment_self(); - page.push_blocked_task(task.clone(), attempt.requested_usage); - } - } - } - - blocked_page_count - } - - fn attempt_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { + fn try_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { match page.usage { PageUsage::Unused => LockResult::Ok(PageUsage::from_requested_usage(requested_usage)), PageUsage::Readonly(count) => match requested_usage { @@ -612,14 +586,32 @@ impl SchedulingStateMachine { } #[must_use] - fn attempt_lock_for_task(&mut self, task: Task) -> Option { - let blocked_page_count = self.attempt_lock_pages(&task); + fn try_lock_for_task(&mut self, task: Task) -> Option { + let mut blocked_page_count = ShortCounter::zero(); + + for attempt in task.lock_attempts() { + let page = attempt.page_mut(&mut self.page_token); + let lock_result = if page.has_no_blocked_task() { + Self::try_lock_page(page, attempt.requested_usage) + } else { + LockResult::Err(()) + }; + match lock_result { + LockResult::Ok(PageUsage::Unused) => unreachable!(), + LockResult::Ok(new_usage) => { + page.usage = new_usage; + } + LockResult::Err(()) => { + blocked_page_count.increment_self(); + page.push_blocked_task(task.clone(), attempt.requested_usage); + } + } + } + // no blocked page means success if blocked_page_count.is_zero() { - // succeeded Some(task) } else { - // failed task.set_blocked_page_count(&mut self.count_token, blocked_page_count); None } @@ -635,7 +627,7 @@ impl SchedulingStateMachine { self.unblocked_task_queue.push_back(task); } - match Self::attempt_lock_page(page, requested_usage) { + match Self::try_lock_page(page, requested_usage) { LockResult::Ok(PageUsage::Unused) => unreachable!(), LockResult::Ok(new_usage) => { page.usage = new_usage; From 192f0f58607c4acd2a8efd248b84aa193ef2bed7 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 28 Feb 2024 23:38:17 +0900 Subject: [PATCH 11/31] Add safety comment for get_account_locks_unchecked --- runtime/src/bank.rs | 3 +++ unified-scheduler-logic/src/lib.rs | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 3ea316f857a2bc..b41e22ac7ca589 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4248,6 +4248,9 @@ impl Bank { transaction: &'a SanitizedTransaction, ) -> TransactionBatch<'_, '_> { let tx_account_lock_limit = self.get_transaction_account_lock_limit(); + // Note that switching this to .get_account_locks_unchecked() is unacceptable currently. + // The unified scheduler relies on the checks enforced here. + // See a comment in SchedulingStateMachine::create_task(). let lock_result = transaction .get_account_locks(tx_account_lock_limit) .map(|_| ()); diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 41865c1ada2005..bba213961b76a1 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -657,7 +657,20 @@ impl SchedulingStateMachine { index: usize, page_loader: &mut impl FnMut(Pubkey) -> Page, ) -> Task { - // this is safe bla bla + // Calling the _unchecked() version here is safe for faster operation, because + // `get_account_locks()` (the safe variant) is ensured to be called in + // DefaultTransactionHandler::handle() via Bank::prepare_unlocked_batch_from_single_tx(). + // + // The safe variant has additional account-locking related verifications, which is crutial. + // + // Currently the replaying stage is redundantly calling `get_accont_locks()` when unified + // scheduler is enabled on the given transaction at the blockstore. This will be relaxed + // for optimization in the future. As for banking stage with unified scheduler, it will + // need to run .get_account_locks() at least once somewhere in the code path. In the + // distant future, this function `create_task()` should be adjusted so that both stages do + // the checks before calling this (say, with some ad-hoc type like + // `SanitizedTransactionWithCheckedAccountLocks`) or do the chccks here, resulting in + // eliminating the redudant one in the replaying stage and in the handler. let locks = transaction.get_account_locks_unchecked(); let writable_locks = locks From efee73cc82c507fd54acfbe8501973f85c3f7223 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 28 Feb 2024 23:50:01 +0900 Subject: [PATCH 12/31] Reduce and comment about Page::blocked_tasks cap. --- unified-scheduler-logic/src/lib.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index bba213961b76a1..15386b8a09d508 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -416,7 +416,16 @@ impl Default for PageInner { fn default() -> Self { Self { usage: PageUsage::default(), - blocked_tasks: VecDeque::with_capacity(1024), + // Capacity should be configurable to create with large capacity like 1024 inside the + // (multi-threaded) closures passed to create_task(). In this way, reallocs can be + // avoided happening in the scheduler thread. Also, this configurability is desired for + // unified-scheduler-logic's motto: separation of concerns (the pure logic should be + // sufficiently distanced from any some random knob's constants needed for messy + // reality for author's personal preference...). + // + // Note that large cap should be accompanied with proper scheduler cleaning after use, + // which should be handled by higher layers (i.e. scheduler pool). + blocked_tasks: VecDeque::with_capacity(128), } } } @@ -726,6 +735,8 @@ impl SchedulingStateMachine { pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self { Self { last_task_index: None, + // It's very unlikely this is desired to be configurable, like + // `PageInner::blocked_tasks`'s cap. unblocked_task_queue: VecDeque::with_capacity(1024), active_task_count: ShortCounter::zero(), handled_task_count: ShortCounter::zero(), From 05d9e402641003bd6b13668848e9f275197836ca Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 00:14:22 +0900 Subject: [PATCH 13/31] Document SchedulingStateMachine::schedule_task() --- unified-scheduler-logic/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 15386b8a09d508..b401880042bc1c 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -512,6 +512,13 @@ impl SchedulingStateMachine { self.total_task_count.current() } + /// Schedules given `task`, returning it if successful. + /// + /// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`, + /// indicating the task is blocked currently. + /// + /// Note that this function's type signature is intentionally redundant to take the ownership + /// of given task _conditionally_ for future optimization. #[must_use] pub fn schedule_task(&mut self, task: Task) -> Option { let new_task_index = task.task_index(); From b72fda0091a959f668d054d0a8f193c6fe6b63cd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 00:23:08 +0900 Subject: [PATCH 14/31] Add justification of closure in create_task --- unified-scheduler-logic/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index b401880042bc1c..9055438eed63e0 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -668,6 +668,12 @@ impl SchedulingStateMachine { /// implementation of [`Page`] look-up by [`pubkey`](Pubkey) to callers. It's the caller's /// responsibility to ensure the same instance is returned from the closure, given a particular /// pubkey. + /// + /// Closure is used here to delegate the responsibility of general ownership of `Page` (and + /// caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of + /// shared owndership of `Page`s are released and Page state is identical to just after + /// created, if `has_no_active_task()` is `true`. Also note that this is desired for separation + /// of concern. pub fn create_task( transaction: SanitizedTransaction, index: usize, From 745f4d927941afe70d4a2b65bd3353c20415f306 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 00:24:49 +0900 Subject: [PATCH 15/31] Use the From trait for PageUsage --- unified-scheduler-logic/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 9055438eed63e0..77165708922395 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -384,8 +384,8 @@ enum PageUsage { } const_assert_eq!(mem::size_of::(), 8); -impl PageUsage { - fn from_requested_usage(requested_usage: RequestedUsage) -> Self { +impl From for PageUsage { + fn from(requested_usage: RequestedUsage) -> Self { match requested_usage { RequestedUsage::Readonly => PageUsage::Readonly(ShortCounter::one()), RequestedUsage::Writable => PageUsage::Writable, @@ -561,7 +561,7 @@ impl SchedulingStateMachine { fn try_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { match page.usage { - PageUsage::Unused => LockResult::Ok(PageUsage::from_requested_usage(requested_usage)), + PageUsage::Unused => LockResult::Ok(PageUsage::from(requested_usage)), PageUsage::Readonly(count) => match requested_usage { RequestedUsage::Readonly => LockResult::Ok(PageUsage::Readonly(count.increment())), RequestedUsage::Writable => LockResult::Err(()), From d99d06b4dccffcbcd20b1ab55722f6c9e03f8a06 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 12:41:16 +0900 Subject: [PATCH 16/31] Replace unneeded if-let with .expect() --- unified-scheduler-pool/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 60cbcd626550fd..26c40e5518bc25 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -606,9 +606,8 @@ impl, TH: TaskHandler> ThreadManager { recv(dummy_receiver(state_machine.has_unblocked_task())) -> dummy => { assert_matches!(dummy, Err(RecvError)); - if let Some(task) = state_machine.schedule_unblocked_task() { - runnable_task_sender.send_payload(task).unwrap(); - } + let task = state_machine.schedule_unblocked_task().expect("unblocked task"); + runnable_task_sender.send_payload(task).unwrap(); }, recv(new_task_receiver) -> message => { assert!(!session_ending); From 8ceb1b5a841d11b19b94026f8a81ae2c75218a0d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 14:27:38 +0900 Subject: [PATCH 17/31] Add helpful comments for peculiar crossbeam usage --- unified-scheduler-pool/src/lib.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 26c40e5518bc25..3dda9858acb643 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -404,7 +404,11 @@ impl AddressBook { } } +// (this is slow needing atomic mem reads. However, this can be turned into a lot faster +// optimizer-friendly version as shown in this crossbeam pr: +// https://github.com/crossbeam-rs/crossbeam/pull/1047) fn disconnected() -> Receiver { + // drop the sender residing at .0, returning an always-disconnected receiver. unbounded().1 } @@ -595,6 +599,26 @@ impl, TH: TaskHandler> ThreadManager { loop { let mut is_finished = false; while !is_finished { + // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, + // which isn't great and is inconsistent with `if`s in the Rust's match + // arm. So, eagerly binding the result to a variable unconditionally here + // makes no perf. difference... + let dummy_unblocked_task_receiver = + dummy_receiver(state_machine.has_unblocked_task()); + + // (Assume this is biased; i.e. select_biased! in this crossbeam pr: + // https://github.com/rust-lang/futures-rs/pull/1976) + // + // There's something special called dummy_unblocked_task_receiver here. + // This odd pattern was needed to react to newly unblocked tasks from + // _not-crossbeam-channel_ event sources, precisely at the specified + // precedence among other selectors, while delegating the conrol flow to + // select_biased!. + // + // In this way, hot looping is avoided and overall control flow is much + // consistent. Note that unified scheduler will go + // into busy looping to seek lowest latency eventually. However, not now, + // to measure _actual_ cpu usage easily with the select approach. select! { recv(finished_task_receiver) -> executed_task => { let executed_task = executed_task.unwrap(); @@ -603,7 +627,7 @@ impl, TH: TaskHandler> ThreadManager { let result_with_timings = result_with_timings.as_mut().unwrap(); Self::accumulate_result_with_timings(result_with_timings, executed_task); }, - recv(dummy_receiver(state_machine.has_unblocked_task())) -> dummy => { + recv(dummy_unblocked_task_receiver) -> dummy => { assert_matches!(dummy, Err(RecvError)); let task = state_machine.schedule_unblocked_task().expect("unblocked task"); From c62e8351f00a6851cdc2bc2770a1a589011452aa Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 14:55:35 +0900 Subject: [PATCH 18/31] Fix typo --- unified-scheduler-logic/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 77165708922395..dd156dbfffd300 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -683,7 +683,7 @@ impl SchedulingStateMachine { // `get_account_locks()` (the safe variant) is ensured to be called in // DefaultTransactionHandler::handle() via Bank::prepare_unlocked_batch_from_single_tx(). // - // The safe variant has additional account-locking related verifications, which is crutial. + // The safe variant has additional account-locking related verifications, which is crucial. // // Currently the replaying stage is redundantly calling `get_accont_locks()` when unified // scheduler is enabled on the given transaction at the blockstore. This will be relaxed From 4c703a96cf8e8d624a37ff152d2680170db42ff1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 14:57:20 +0900 Subject: [PATCH 19/31] Make bug-bounty-exempt statement more clear --- unified-scheduler-pool/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 3dda9858acb643..74482c19704505 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1,7 +1,7 @@ //! NOTE: While the unified scheduler is fully functional and moderately performant even with //! mainnet-beta, it has known resource-exhaustion related security issues for replaying -//! specially-crafted blocks produced by malicious leaders. Thus, this functionality is exempt from -//! the bug bounty program for now. +//! specially-crafted blocks produced by malicious leaders. Thus, this experimental and +//! nondefault functionality is exempt from the bug bounty program for now. //! //! Transaction scheduling code. //! From 1d4b08705935f7558056ed6f178e7d968893c3f1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 14:59:57 +0900 Subject: [PATCH 20/31] Add test_enfoced_get_account_locks_verification --- unified-scheduler-pool/src/lib.rs | 38 +++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 74482c19704505..be1918807892b2 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1309,4 +1309,42 @@ mod tests { 4 ); } + + // See comment in SchedulingStateMachine::create_task() for the justification of this test + #[test] + fn test_enfoced_get_account_locks_validation() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + ref mint_keypair, + .. + } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank = &setup_dummy_fork_graph(bank); + + let mut tx = system_transaction::transfer( + mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + ); + // mangle the transfer tx to try to lock fee_payer (= mint_keypair) address twice! + tx.message.account_keys.push(tx.message.account_keys[0]); + let tx = &SanitizedTransaction::from_transaction_for_tests(tx); + + // this internally should call SanitizedTransaction::get_account_locks(). + let result = &mut Ok(()); + let timings = &mut ExecuteTimings::default(); + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let handler_context = &HandlerContext { + log_messages_bytes_limit: None, + transaction_status_sender: None, + replay_vote_sender: None, + prioritization_fee_cache, + }; + + DefaultTaskHandler::handle(result, timings, bank, tx, 0, handler_context); + assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); + } } From eb87f1dd2bc3fd6621d050f2d0aa439e8b49e481 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 16:40:11 +0900 Subject: [PATCH 21/31] Fix typos... --- unified-scheduler-logic/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index dd156dbfffd300..0e8f63c599b924 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -685,14 +685,14 @@ impl SchedulingStateMachine { // // The safe variant has additional account-locking related verifications, which is crucial. // - // Currently the replaying stage is redundantly calling `get_accont_locks()` when unified + // Currently the replaying stage is redundantly calling `get_account_locks()` when unified // scheduler is enabled on the given transaction at the blockstore. This will be relaxed // for optimization in the future. As for banking stage with unified scheduler, it will // need to run .get_account_locks() at least once somewhere in the code path. In the // distant future, this function `create_task()` should be adjusted so that both stages do // the checks before calling this (say, with some ad-hoc type like - // `SanitizedTransactionWithCheckedAccountLocks`) or do the chccks here, resulting in - // eliminating the redudant one in the replaying stage and in the handler. + // `SanitizedTransactionWithCheckedAccountLocks`) or do the checks here, resulting in + // eliminating the redundant one in the replaying stage and in the handler. let locks = transaction.get_account_locks_unchecked(); let writable_locks = locks From 03a0bd1a1a5d3007bd29f701fe43f37cf125653b Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 16:10:52 +0900 Subject: [PATCH 22/31] Big rename: Page => UsageQueue --- unified-scheduler-logic/src/lib.rs | 349 ++++++++++++++++------------- unified-scheduler-pool/src/lib.rs | 18 +- 2 files changed, 201 insertions(+), 166 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 0e8f63c599b924..a7a0b872d94e41 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -63,28 +63,28 @@ //! Its algorithm is very fast for high throughput, real-time for low latency. The whole //! unified-scheduler architecture is designed from grounds up to support the fastest execution of //! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state -//! data structures (called [`Page`]) for all of transaction's accounts, in order to offload the -//! job to other threads from the scheduler thread. This preloading is done inside +//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload +//! the job to other threads from the scheduler thread. This preloading is done inside //! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling //! computational complexity is basically reduced to several word-sized loads and stores in the //! schduler thread (i.e. constant; no allocations nor syscalls), while being proportional to the //! number of addresses in a given transaction. Note that this statement is held true, regardless //! of conflicts. This is because the preloading also pre-allocates some scratch-pad area -//! ([`blocked_tasks`](PageInner::blocked_tasks)) to stash blocked ones. So, a conflict only incurs -//! some additional fixed number of mem stores, within error margin of the constant complexity. And -//! additional memory allocation for the scratchpad could said to be amortized, if such an unusual -//! event should occur. +//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked +//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error +//! margin of the constant complexity. And additional memory allocation for the scratchpad could +//! said to be amortized, if such an unusual event should occur. //! -//! [`Arc`] is used to implement this preloading mechanism, because `Page`s are shared across tasks -//! accessing the same account, and among threads due to the preloading. Also, interior mutability -//! is needed. However, `SchedulingStateMachine` doesn't use conventional locks like RwLock. -//! Leveraging the fact it's the only state-mutating exclusive thread, it instead uses +//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across +//! tasks accessing the same account, and among threads due to the preloading. Also, interior +//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like +//! RwLock. Leveraging the fact it's the only state-mutating exclusive thread, it instead uses //! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell` //! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety. //! By localizing any synchronization to the message passing, the scheduling code itself attains //! maximally possible single-threaed execution without stalling cpu pipelines at all, only //! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of -//! `Page`s. +//! `UsageQueue`s. //! //! ### Buffer bloat insignificance //! @@ -293,21 +293,22 @@ mod utils { } } -/// [`Result`] for locking a [page](Page) with particular [usage](RequestedUsage). -type LockResult = Result; +/// [`Result`] for locking a [usage_queue](UsageQueue) with particular +/// [current_usage](RequestedUsage). +type LockResult = Result; const_assert_eq!(mem::size_of::(), 8); /// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`]. pub type Task = Arc; const_assert_eq!(mem::size_of::(), 8); -/// [`Token`] for [`Page`]. -type PageToken = Token; -const_assert_eq!(mem::size_of::(), 0); +/// [`Token`] for [`UsageQueue`]. +type UsageQueueToken = Token; +const_assert_eq!(mem::size_of::(), 0); -/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_page_count`). -type BlockedPageCountToken = Token; -const_assert_eq!(mem::size_of::(), 0); +/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`). +type BlockedUsageCountToken = Token; +const_assert_eq!(mem::size_of::(), 0); /// Internal scheduling data about a particular task. #[derive(Debug)] @@ -315,7 +316,7 @@ pub struct TaskInner { transaction: SanitizedTransaction, index: usize, lock_attempts: Vec, - blocked_page_count: TokenCell, + blocked_usage_count: TokenCell, } impl TaskInner { @@ -331,70 +332,73 @@ impl TaskInner { &self.lock_attempts } - fn blocked_page_count_mut<'t>( + fn blocked_usage_count_mut<'t>( &self, - token: &'t mut BlockedPageCountToken, + token: &'t mut BlockedUsageCountToken, ) -> &'t mut ShortCounter { - self.blocked_page_count.borrow_mut(token) + self.blocked_usage_count.borrow_mut(token) } - fn set_blocked_page_count(&self, token: &mut BlockedPageCountToken, count: ShortCounter) { - *self.blocked_page_count_mut(token) = count; + fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) { + *self.blocked_usage_count_mut(token) = count; } #[must_use] - fn try_unblock(self: Task, token: &mut BlockedPageCountToken) -> Option { - self.blocked_page_count_mut(token) + fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option { + self.blocked_usage_count_mut(token) .decrement_self() .is_zero() .then_some(self) } } -/// [`Task`]'s per-address attempt to use a [page](Page) with [certain kind of +/// [`Task`]'s per-address attempt to use a [usage_queue](UsageQueue) with [certain kind of /// request](RequestedUsage). #[derive(Debug)] struct LockAttempt { - page: Page, + usage_queue: UsageQueue, requested_usage: RequestedUsage, } const_assert_eq!(mem::size_of::(), 16); impl LockAttempt { - fn new(page: Page, requested_usage: RequestedUsage) -> Self { + fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self { Self { - page, + usage_queue, requested_usage, } } - fn page_mut<'t>(&self, page_token: &'t mut PageToken) -> &'t mut PageInner { - self.page.0.borrow_mut(page_token) + fn usage_queue_mut<'t>( + &self, + usage_queue_token: &'t mut UsageQueueToken, + ) -> &'t mut UsageQueueInner { + self.usage_queue.0.borrow_mut(usage_queue_token) } } -/// Status about how the [`Page`] is used currently. Unlike [`RequestedUsage`], it has additional -/// variant of [`Unused`](`PageUsage::Unused`). +/// Status about how the [`UsageQueue`] is used currently. Unlike [`RequestedUsage`], it has +/// additional variant of [`Unused`](`Usage::Unused`). #[derive(Copy, Clone, Debug, Default)] -enum PageUsage { +enum Usage { #[default] Unused, Readonly(ShortCounter), Writable, } -const_assert_eq!(mem::size_of::(), 8); +const_assert_eq!(mem::size_of::(), 8); -impl From for PageUsage { +impl From for Usage { fn from(requested_usage: RequestedUsage) -> Self { match requested_usage { - RequestedUsage::Readonly => PageUsage::Readonly(ShortCounter::one()), - RequestedUsage::Writable => PageUsage::Writable, + RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()), + RequestedUsage::Writable => Usage::Writable, } } } -/// Status about how a task is requesting to use a particular [`Page`]. Unlike [`PageUsage`], -/// it has only two unit variants. +/// Status about how a task is requesting to use a particular [`UsageQueue`]. Unlike [`Usage`], it +/// has only two unit variants. #[derive(Clone, Copy, Debug)] enum RequestedUsage { Readonly, @@ -403,19 +407,19 @@ enum RequestedUsage { /// Internal scheduling data about a particular address. /// -/// Specifically, it holds the current [`PageUsage`] (or no usage with [`PageUsage::Unused`]) and -/// which [`Task`]s are blocked to be executed after the current task is notified to be finished -/// via [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) +/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which +/// [`Task`]s are blocked to be executed after the current task is notified to be finished via +/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) #[derive(Debug)] -struct PageInner { - usage: PageUsage, - blocked_tasks: VecDeque<(Task, RequestedUsage)>, +struct UsageQueueInner { + current_usage: Usage, + blocked_usages_from_tasks: VecDeque<(RequestedUsage, Task)>, } -impl Default for PageInner { +impl Default for UsageQueueInner { fn default() -> Self { Self { - usage: PageUsage::default(), + current_usage: Usage::default(), // Capacity should be configurable to create with large capacity like 1024 inside the // (multi-threaded) closures passed to create_task(). In this way, reallocs can be // avoided happening in the scheduler thread. Also, this configurability is desired for @@ -425,37 +429,38 @@ impl Default for PageInner { // // Note that large cap should be accompanied with proper scheduler cleaning after use, // which should be handled by higher layers (i.e. scheduler pool). - blocked_tasks: VecDeque::with_capacity(128), + blocked_usages_from_tasks: VecDeque::with_capacity(128), } } } -impl PageInner { - fn push_blocked_task(&mut self, task: Task, requested_usage: RequestedUsage) { - self.blocked_tasks.push_back((task, requested_usage)); +impl UsageQueueInner { + fn push_blocked_task(&mut self, requested_usage: RequestedUsage, task: Task) { + self.blocked_usages_from_tasks + .push_back((requested_usage, task)); } fn has_no_blocked_task(&self) -> bool { - self.blocked_tasks.is_empty() + self.blocked_usages_from_tasks.is_empty() } #[must_use] - fn pop_unblocked_next_task(&mut self) -> Option<(Task, RequestedUsage)> { - self.blocked_tasks.pop_front() + fn pop_unblocked_next_task(&mut self) -> Option<(RequestedUsage, Task)> { + self.blocked_usages_from_tasks.pop_front() } #[must_use] - fn blocked_next_task(&self) -> Option<(&Task, RequestedUsage)> { - self.blocked_tasks + fn blocked_next_task(&self) -> Option<(RequestedUsage, &Task)> { + self.blocked_usages_from_tasks .front() - .map(|(task, requested_usage)| (task, *requested_usage)) + .map(|(requested_usage, task)| (*requested_usage, task)) } #[must_use] - fn pop_blocked_next_readonly_task(&mut self) -> Option<(Task, RequestedUsage)> { + fn pop_blocked_next_readonly_task(&mut self) -> Option<(RequestedUsage, Task)> { if matches!( self.blocked_next_task(), - Some((_, RequestedUsage::Readonly)) + Some((RequestedUsage::Readonly, _)) ) { self.pop_unblocked_next_task() } else { @@ -464,14 +469,14 @@ impl PageInner { } } -const_assert_eq!(mem::size_of::>(), 40); +const_assert_eq!(mem::size_of::>(), 40); /// Scheduler's internal data for each address ([`Pubkey`](`solana_sdk::pubkey::Pubkey`)). Very /// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and /// [`::default()`](Default::default). #[derive(Debug, Clone, Default)] -pub struct Page(Arc>); -const_assert_eq!(mem::size_of::(), 8); +pub struct UsageQueue(Arc>); +const_assert_eq!(mem::size_of::(), 8); /// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by /// `solana-unified-scheduler-pool`. @@ -482,8 +487,8 @@ pub struct SchedulingStateMachine { handled_task_count: ShortCounter, unblocked_task_count: ShortCounter, total_task_count: ShortCounter, - count_token: BlockedPageCountToken, - page_token: PageToken, + count_token: BlockedUsageCountToken, + usage_queue_token: UsageQueueToken, } const_assert_eq!(mem::size_of::(), 64); @@ -559,22 +564,28 @@ impl SchedulingStateMachine { self.unlock_for_task(task); } - fn try_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { - match page.usage { - PageUsage::Unused => LockResult::Ok(PageUsage::from(requested_usage)), - PageUsage::Readonly(count) => match requested_usage { - RequestedUsage::Readonly => LockResult::Ok(PageUsage::Readonly(count.increment())), + fn try_lock_usage_queue( + usage_queue: &UsageQueueInner, + requested_usage: RequestedUsage, + ) -> LockResult { + match usage_queue.current_usage { + Usage::Unused => LockResult::Ok(Usage::from(requested_usage)), + Usage::Readonly(count) => match requested_usage { + RequestedUsage::Readonly => LockResult::Ok(Usage::Readonly(count.increment())), RequestedUsage::Writable => LockResult::Err(()), }, - PageUsage::Writable => LockResult::Err(()), + Usage::Writable => LockResult::Err(()), } } #[must_use] - fn unlock_page(page: &mut PageInner, attempt: &LockAttempt) -> Option<(Task, RequestedUsage)> { + fn unlock_usage_queue( + usage_queue: &mut UsageQueueInner, + attempt: &LockAttempt, + ) -> Option<(RequestedUsage, Task)> { let mut is_unused_now = false; - match &mut page.usage { - PageUsage::Readonly(ref mut count) => match attempt.requested_usage { + match &mut usage_queue.current_usage { + Usage::Readonly(ref mut count) => match attempt.requested_usage { RequestedUsage::Readonly => { if count.is_one() { is_unused_now = true; @@ -584,18 +595,18 @@ impl SchedulingStateMachine { } RequestedUsage::Writable => unreachable!(), }, - PageUsage::Writable => match attempt.requested_usage { + Usage::Writable => match attempt.requested_usage { RequestedUsage::Writable => { is_unused_now = true; } RequestedUsage::Readonly => unreachable!(), }, - PageUsage::Unused => unreachable!(), + Usage::Unused => unreachable!(), } if is_unused_now { - page.usage = PageUsage::Unused; - page.pop_unblocked_next_task() + usage_queue.current_usage = Usage::Unused; + usage_queue.pop_unblocked_next_task() } else { None } @@ -603,54 +614,56 @@ impl SchedulingStateMachine { #[must_use] fn try_lock_for_task(&mut self, task: Task) -> Option { - let mut blocked_page_count = ShortCounter::zero(); + let mut blocked_usage_count = ShortCounter::zero(); for attempt in task.lock_attempts() { - let page = attempt.page_mut(&mut self.page_token); - let lock_result = if page.has_no_blocked_task() { - Self::try_lock_page(page, attempt.requested_usage) + let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); + let lock_result = if usage_queue.has_no_blocked_task() { + Self::try_lock_usage_queue(usage_queue, attempt.requested_usage) } else { LockResult::Err(()) }; match lock_result { - LockResult::Ok(PageUsage::Unused) => unreachable!(), + LockResult::Ok(Usage::Unused) => unreachable!(), LockResult::Ok(new_usage) => { - page.usage = new_usage; + usage_queue.current_usage = new_usage; } LockResult::Err(()) => { - blocked_page_count.increment_self(); - page.push_blocked_task(task.clone(), attempt.requested_usage); + blocked_usage_count.increment_self(); + usage_queue.push_blocked_task(attempt.requested_usage, task.clone()); } } } - // no blocked page means success - if blocked_page_count.is_zero() { + // no blocked usage_queue means success + if blocked_usage_count.is_zero() { Some(task) } else { - task.set_blocked_page_count(&mut self.count_token, blocked_page_count); + task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count); None } } fn unlock_for_task(&mut self, task: &Task) { for unlock_attempt in task.lock_attempts() { - let page = unlock_attempt.page_mut(&mut self.page_token); - let mut unblocked_task_from_page = Self::unlock_page(page, unlock_attempt); + let usage_queue = unlock_attempt.usage_queue_mut(&mut self.usage_queue_token); + let mut unblocked_task_from_queue = + Self::unlock_usage_queue(usage_queue, unlock_attempt); - while let Some((task_with_unblocked_page, requested_usage)) = unblocked_task_from_page { - if let Some(task) = task_with_unblocked_page.try_unblock(&mut self.count_token) { + while let Some((requested_usage, task_with_unblocked_queue)) = unblocked_task_from_queue + { + if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token) { self.unblocked_task_queue.push_back(task); } - match Self::try_lock_page(page, requested_usage) { - LockResult::Ok(PageUsage::Unused) => unreachable!(), + match Self::try_lock_usage_queue(usage_queue, requested_usage) { + LockResult::Ok(Usage::Unused) => unreachable!(), LockResult::Ok(new_usage) => { - page.usage = new_usage; + usage_queue.current_usage = new_usage; // Try to further schedule blocked task for parallelism in the case of // readonly usages - unblocked_task_from_page = if matches!(new_usage, PageUsage::Readonly(_)) { - page.pop_blocked_next_readonly_task() + unblocked_task_from_queue = if matches!(new_usage, Usage::Readonly(_)) { + usage_queue.pop_blocked_next_readonly_task() } else { None }; @@ -661,23 +674,23 @@ impl SchedulingStateMachine { } } - /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding [`Page`]s - /// preloaded. + /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding + /// [`UsageQueue`]s preloaded. /// - /// Closure (`page_loader`) is used to delegate the (possibly multi-threaded) - /// implementation of [`Page`] look-up by [`pubkey`](Pubkey) to callers. It's the caller's - /// responsibility to ensure the same instance is returned from the closure, given a particular - /// pubkey. + /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded) + /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the + /// caller's responsibility to ensure the same instance is returned from the closure, given a + /// particular pubkey. /// - /// Closure is used here to delegate the responsibility of general ownership of `Page` (and - /// caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of - /// shared owndership of `Page`s are released and Page state is identical to just after - /// created, if `has_no_active_task()` is `true`. Also note that this is desired for separation - /// of concern. + /// Closure is used here to delegate the responsibility of general ownership of `UsageQueue` + /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of + /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just + /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for + /// separation of concern. pub fn create_task( transaction: SanitizedTransaction, index: usize, - page_loader: &mut impl FnMut(Pubkey) -> Page, + usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue, ) -> Task { // Calling the _unchecked() version here is safe for faster operation, because // `get_account_locks()` (the safe variant) is ensured to be called in @@ -707,7 +720,7 @@ impl SchedulingStateMachine { let lock_attempts = writable_locks .chain(readonly_locks) .map(|(address, requested_usage)| { - LockAttempt::new(page_loader(**address), requested_usage) + LockAttempt::new(usage_queue_loader(**address), requested_usage) }) .collect(); @@ -715,7 +728,7 @@ impl SchedulingStateMachine { transaction, index, lock_attempts, - blocked_page_count: TokenCell::new(ShortCounter::zero()), + blocked_usage_count: TokenCell::new(ShortCounter::zero()), }) } @@ -723,12 +736,12 @@ impl SchedulingStateMachine { /// /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment. /// This panics if the state machine hasn't properly been finished (i.e. there should be no - /// active task) to uphold invariants of [`Page`]s. + /// active task) to uphold invariants of [`UsageQueue`]s. /// /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe` /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling) - /// as much as possible) and its (possbily cached) associated [`Page`]s for processing other - /// slots. + /// as much as possible) and its (possbily cached) associated [`UsageQueue`]s for processing + /// other slots. pub fn reinitialize(&mut self) { assert!(self.has_no_active_task()); assert_eq!(self.unblocked_task_queue.len(), 0); @@ -749,14 +762,14 @@ impl SchedulingStateMachine { Self { last_task_index: None, // It's very unlikely this is desired to be configurable, like - // `PageInner::blocked_tasks`'s cap. + // `UsageQueueInner::blocked_usages_from_tasks`'s cap. unblocked_task_queue: VecDeque::with_capacity(1024), active_task_count: ShortCounter::zero(), handled_task_count: ShortCounter::zero(), unblocked_task_count: ShortCounter::zero(), total_task_count: ShortCounter::zero(), - count_token: unsafe { BlockedPageCountToken::assume_exclusive_mutating_thread() }, - page_token: unsafe { PageToken::assume_exclusive_mutating_thread() }, + count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() }, + usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() }, } } } @@ -807,29 +820,33 @@ mod tests { } fn create_address_loader( - pages: Option>>>, - ) -> impl FnMut(Pubkey) -> Page { - let pages = pages.unwrap_or_default(); - move |address| pages.borrow_mut().entry(address).or_default().clone() + usage_queues: Option>>>, + ) -> impl FnMut(Pubkey) -> UsageQueue { + let usage_queues = usage_queues.unwrap_or_default(); + move |address| { + usage_queues + .borrow_mut() + .entry(address) + .or_default() + .clone() + } } #[test] fn test_debug() { // these are almost meaningless just to see eye-pleasing coverage report.... assert_eq!( - format!( - "{:?}", - LockResult::Ok(PageUsage::Readonly(ShortCounter::one())) - ), + format!("{:?}", LockResult::Ok(Usage::Readonly(ShortCounter::one()))), "Ok(Readonly(ShortCounter(1)))" ); let sanitized = simplest_transaction(); - let task = SchedulingStateMachine::create_task(sanitized, 0, &mut |_| Page::default()); + let task = + SchedulingStateMachine::create_task(sanitized, 0, &mut |_| UsageQueue::default()); assert!(format!("{:?}", task).contains("TaskInner")); assert_eq!( - format!("{:?}", PageInner::default()), - "PageInner { usage: Unused, blocked_tasks: [] }" + format!("{:?}", UsageQueueInner::default()), + "UsageQueueInner { current_usage: Unused, blocked_usages_from_tasks: [] }" ) } @@ -859,8 +876,9 @@ mod tests { #[test] fn test_create_task() { let sanitized = simplest_transaction(); - let task = - SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| Page::default()); + let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| { + UsageQueue::default() + }); assert_eq!(task.task_index(), 3); assert_eq!(task.transaction(), &sanitized); } @@ -1242,8 +1260,8 @@ mod tests { let conflicting_address = Pubkey::new_unique(); let sanitized1 = transaction_with_writable_address(conflicting_address); let sanitized2 = transaction_with_writable_address(conflicting_address); - let pages = Rc::new(RefCell::new(HashMap::new())); - let address_loader = &mut create_address_loader(Some(pages.clone())); + let usage_queues = Rc::new(RefCell::new(HashMap::new())); + let address_loader = &mut create_address_loader(Some(usage_queues.clone())); let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); @@ -1257,19 +1275,25 @@ mod tests { Some(101) ); assert_matches!(state_machine.schedule_task(task2.clone()), None); - let pages = pages.borrow_mut(); - let page = pages.get(&conflicting_address).unwrap(); + let usage_queues = usage_queues.borrow_mut(); + let usage_queue = usage_queues.get(&conflicting_address).unwrap(); assert_matches!( - page.0.borrow_mut(&mut state_machine.page_token).usage, - PageUsage::Writable + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage, + Usage::Writable ); // task2's fee payer should have been locked already even if task2 is blocked still via the // above the schedule_task(task2) call let fee_payer = task2.transaction().message().fee_payer(); - let page = pages.get(fee_payer).unwrap(); + let usage_queue = usage_queues.get(fee_payer).unwrap(); assert_matches!( - page.0.borrow_mut(&mut state_machine.page_token).usage, - PageUsage::Writable + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage, + Usage::Writable ); state_machine.deschedule_task(&task1); assert_matches!( @@ -1288,10 +1312,12 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Writable), + let usage_queue = UsageQueue::default(); + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Writable), ); } @@ -1301,11 +1327,16 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - page.0.borrow_mut(&mut state_machine.page_token).usage = PageUsage::Writable; - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Readonly), + let usage_queue = UsageQueue::default(); + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage = Usage::Writable; + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Readonly), ); } @@ -1315,12 +1346,16 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - page.0.borrow_mut(&mut state_machine.page_token).usage = - PageUsage::Readonly(ShortCounter::one()); - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Writable), + let usage_queue = UsageQueue::default(); + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage = Usage::Readonly(ShortCounter::one()); + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Writable), ); } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index be1918807892b2..ed4b354fbd8658 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -36,7 +36,7 @@ use { pubkey::Pubkey, transaction::{Result, SanitizedTransaction}, }, - solana_unified_scheduler_logic::{Page, SchedulingStateMachine, Task}, + solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, solana_vote::vote_sender_types::ReplayVoteSender, std::{ fmt::Debug, @@ -394,13 +394,13 @@ mod chained_channel { } #[derive(Default, Debug)] -pub struct AddressBook { - book: DashMap, +pub struct UsageQueueLoader { + usage_queues: DashMap, } -impl AddressBook { - pub fn load(&self, address: Pubkey) -> Page { - self.book.entry(address).or_default().clone() +impl UsageQueueLoader { + pub fn load(&self, address: Pubkey) -> UsageQueue { + self.usage_queues.entry(address).or_default().clone() } } @@ -425,7 +425,7 @@ pub struct PooledScheduler { #[derive(Debug)] pub struct PooledSchedulerInner, TH: TaskHandler> { thread_manager: ThreadManager, - address_book: AddressBook, + usage_queue_loader: UsageQueueLoader, } // This type manages the OS threads for scheduling and executing transactions. The term @@ -451,7 +451,7 @@ impl PooledScheduler { Self::from_inner( PooledSchedulerInner:: { thread_manager: ThreadManager::new(pool), - address_book: AddressBook::default(), + usage_queue_loader: UsageQueueLoader::default(), }, initial_context, ) @@ -802,7 +802,7 @@ impl InstalledScheduler for PooledScheduler { fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { - self.inner.address_book.load(pubkey) + self.inner.usage_queue_loader.load(pubkey) }); self.inner.thread_manager.send_task(task); } From fe2efa2e41c6d1162086e4703dedc3591282e54a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 16:44:04 +0900 Subject: [PATCH 23/31] Document UsageQueueLoader --- unified-scheduler-logic/src/lib.rs | 2 +- unified-scheduler-pool/src/lib.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index a7a0b872d94e41..f5e6bf86ce9e15 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -682,7 +682,7 @@ impl SchedulingStateMachine { /// caller's responsibility to ensure the same instance is returned from the closure, given a /// particular pubkey. /// - /// Closure is used here to delegate the responsibility of general ownership of `UsageQueue` + /// Closure is used here to delegate the responsibility of primary ownership of `UsageQueue` /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index ed4b354fbd8658..4eace6d7a93792 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -393,6 +393,12 @@ mod chained_channel { } } +/// The primary owner of all [`UsageQueue`]s used for particular [`PooledScheduler`]. +/// +/// Currently, the simplest implementation. This grows memory usage in unbounded way. Cleaning will +/// be added later. This struct is here to be put outside `solana-unified-scheduler-logic` for the +/// crate's original intent (separation of logics from this crate). Some practical and mundane +/// pruning will be implemented in this type. #[derive(Default, Debug)] pub struct UsageQueueLoader { usage_queues: DashMap, From 16ac5b3dc47f0db17b7eb9a3ca7c2f979a426fd5 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 22:59:36 +0900 Subject: [PATCH 24/31] Various minor cleanings for beautifier diff --- unified-scheduler-logic/src/lib.rs | 69 ++++++++++++++---------------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index f5e6bf86ce9e15..82b7bbfe524dda 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -225,12 +225,12 @@ mod utils { } } - // Safety: Access to TokenCell is assumed to be only from a single thread by proper use of - // Token once after TokenCell is sent to the thread from other threads; So, both implementing + // Safety: Access to `TokenCell` is assumed to be only from a single thread by proper use of + // Token once after `TokenCell` is sent to the thread from other threads; So, both implementing // Send and Sync can be thought as safe. // - // In other words, TokenCell is technicall still !Send and !Sync. But there should be no legal - // use happening which requires !Send or !Sync to avoid undefined behavior. + // In other words, TokenCell is technically still `!Send` and `!Sync`. But there should be no + // legalized usage which depends on real `Send` and `Sync` to avoid undefined behaviors. unsafe impl Send for TokenCell {} unsafe impl Sync for TokenCell {} @@ -413,9 +413,11 @@ enum RequestedUsage { #[derive(Debug)] struct UsageQueueInner { current_usage: Usage, - blocked_usages_from_tasks: VecDeque<(RequestedUsage, Task)>, + blocked_usages_from_tasks: VecDeque, } +type UsageFromTask = (RequestedUsage, Task); + impl Default for UsageQueueInner { fn default() -> Self { Self { @@ -435,38 +437,30 @@ impl Default for UsageQueueInner { } impl UsageQueueInner { - fn push_blocked_task(&mut self, requested_usage: RequestedUsage, task: Task) { - self.blocked_usages_from_tasks - .push_back((requested_usage, task)); - } - - fn has_no_blocked_task(&self) -> bool { - self.blocked_usages_from_tasks.is_empty() + fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) { + self.blocked_usages_from_tasks.push_back(usage_from_task); } #[must_use] - fn pop_unblocked_next_task(&mut self) -> Option<(RequestedUsage, Task)> { + fn pop_unblocked_usage_from_task(&mut self) -> Option { self.blocked_usages_from_tasks.pop_front() } #[must_use] - fn blocked_next_task(&self) -> Option<(RequestedUsage, &Task)> { - self.blocked_usages_from_tasks - .front() - .map(|(requested_usage, task)| (*requested_usage, task)) - } - - #[must_use] - fn pop_blocked_next_readonly_task(&mut self) -> Option<(RequestedUsage, Task)> { + fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option { if matches!( - self.blocked_next_task(), + self.blocked_usages_from_tasks.front(), Some((RequestedUsage::Readonly, _)) ) { - self.pop_unblocked_next_task() + self.pop_unblocked_usage_from_task() } else { None } } + + fn has_no_blocked_usage(&self) -> bool { + self.blocked_usages_from_tasks.is_empty() + } } const_assert_eq!(mem::size_of::>(), 40); @@ -497,6 +491,10 @@ impl SchedulingStateMachine { self.active_task_count.is_zero() } + pub fn has_unblocked_task(&self) -> bool { + !self.unblocked_task_queue.is_empty() + } + pub fn unblocked_task_queue_count(&self) -> usize { self.unblocked_task_queue.len() } @@ -538,15 +536,10 @@ impl SchedulingStateMachine { self.try_lock_for_task(task) } - pub fn has_unblocked_task(&self) -> bool { - !self.unblocked_task_queue.is_empty() - } - #[must_use] pub fn schedule_unblocked_task(&mut self) -> Option { - self.unblocked_task_queue.pop_front().map(|task| { + self.unblocked_task_queue.pop_front().inspect(|_| { self.unblocked_task_count.increment_self(); - task }) } @@ -606,7 +599,7 @@ impl SchedulingStateMachine { if is_unused_now { usage_queue.current_usage = Usage::Unused; - usage_queue.pop_unblocked_next_task() + usage_queue.pop_unblocked_usage_from_task() } else { None } @@ -618,7 +611,7 @@ impl SchedulingStateMachine { for attempt in task.lock_attempts() { let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); - let lock_result = if usage_queue.has_no_blocked_task() { + let lock_result = if usage_queue.has_no_blocked_usage() { Self::try_lock_usage_queue(usage_queue, attempt.requested_usage) } else { LockResult::Err(()) @@ -630,12 +623,13 @@ impl SchedulingStateMachine { } LockResult::Err(()) => { blocked_usage_count.increment_self(); - usage_queue.push_blocked_task(attempt.requested_usage, task.clone()); + let usage_from_task = (attempt.requested_usage, task.clone()); + usage_queue.push_blocked_usage_from_task(usage_from_task); } } } - // no blocked usage_queue means success + // no blocked usage count means success if blocked_usage_count.is_zero() { Some(task) } else { @@ -645,10 +639,9 @@ impl SchedulingStateMachine { } fn unlock_for_task(&mut self, task: &Task) { - for unlock_attempt in task.lock_attempts() { - let usage_queue = unlock_attempt.usage_queue_mut(&mut self.usage_queue_token); - let mut unblocked_task_from_queue = - Self::unlock_usage_queue(usage_queue, unlock_attempt); + for attempt in task.lock_attempts() { + let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); + let mut unblocked_task_from_queue = Self::unlock_usage_queue(usage_queue, attempt); while let Some((requested_usage, task_with_unblocked_queue)) = unblocked_task_from_queue { @@ -663,7 +656,7 @@ impl SchedulingStateMachine { // Try to further schedule blocked task for parallelism in the case of // readonly usages unblocked_task_from_queue = if matches!(new_usage, Usage::Readonly(_)) { - usage_queue.pop_blocked_next_readonly_task() + usage_queue.pop_unblocked_readonly_usage_from_task() } else { None }; From 737e473a795544261b2be2ec1bdb34d47d334ee6 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 1 Mar 2024 15:01:18 +0900 Subject: [PATCH 25/31] Ensure reinitialize() is maintained for new fields --- unified-scheduler-logic/src/lib.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 82b7bbfe524dda..9010410f47c347 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -738,11 +738,23 @@ impl SchedulingStateMachine { pub fn reinitialize(&mut self) { assert!(self.has_no_active_task()); assert_eq!(self.unblocked_task_queue.len(), 0); - self.last_task_index = None; - self.active_task_count.reset_to_zero(); - self.handled_task_count.reset_to_zero(); - self.unblocked_task_count.reset_to_zero(); - self.total_task_count.reset_to_zero(); + // nice trick to ensure all fields are handled here if new one is added. + let Self { + last_task_index, + unblocked_task_queue: _, + active_task_count, + handled_task_count, + unblocked_task_count, + total_task_count, + count_token: _, + usage_queue_token: _, + // don't add ".." here + } = self; + *last_task_index = None; + active_task_count.reset_to_zero(); + handled_task_count.reset_to_zero(); + unblocked_task_count.reset_to_zero(); + total_task_count.reset_to_zero(); } /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as From 4fcb36096c9f5e11f9e0ecc6638e7c85c2ba3b7a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 7 Mar 2024 13:53:46 +0900 Subject: [PATCH 26/31] Remove uneeded impl Send for TokenCell & doc upd. --- unified-scheduler-logic/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 9010410f47c347..2c155cc8534434 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -225,13 +225,13 @@ mod utils { } } - // Safety: Access to `TokenCell` is assumed to be only from a single thread by proper use of - // Token once after `TokenCell` is sent to the thread from other threads; So, both implementing - // Send and Sync can be thought as safe. + // Safety: Once after a (`Send`-able) `TokenCell` is transferred to a thread from other + // threads, access to `TokenCell` is assumed to be only from the single thread by proper use of + // Token. Thereby, implementing `Sync` can be thought as safe and doing so is needed for the + // particular implementation pattern in the unified scheduler (multi-threaded off-loading). // - // In other words, TokenCell is technically still `!Send` and `!Sync`. But there should be no - // legalized usage which depends on real `Send` and `Sync` to avoid undefined behaviors. - unsafe impl Send for TokenCell {} + // In other words, TokenCell is technically still `!Sync`. But there should be no + // legalized usage which depends on real `Sync` to avoid undefined behaviors. unsafe impl Sync for TokenCell {} /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system From a0baa102b0514d54aad80c39efa03807aea08e42 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 7 Mar 2024 15:34:46 +0900 Subject: [PATCH 27/31] Apply typo fixes from code review Co-authored-by: Andrew Fitzgerald --- unified-scheduler-logic/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 2c155cc8534434..018a50b3d42544 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -1075,7 +1075,7 @@ mod tests { } #[test] - fn test_all_blocking_redable_tasks_block_writable_task() { + fn test_all_blocking_readable_tasks_block_writable_task() { let conflicting_address = Pubkey::new_unique(); let sanitized1 = transaction_with_readonly_address(conflicting_address); let sanitized2 = transaction_with_readonly_address(conflicting_address); @@ -1114,7 +1114,7 @@ mod tests { assert_eq!(state_machine.active_task_count(), 1); assert_eq!(state_machine.handled_task_count(), 2); assert_eq!(state_machine.unblocked_task_queue_count(), 1); - // task3 is finally unblocked after all of readble tasks (task1 and task2) is finished. + // task3 is finally unblocked after all of readable tasks (task1 and task2) is finished. assert_matches!( state_machine .schedule_unblocked_task() From 2cbf64a8cf4160ed05799133076798a7f0314cb4 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 7 Mar 2024 15:49:17 +0900 Subject: [PATCH 28/31] Merge similar tests into one --- unified-scheduler-logic/src/lib.rs | 52 +++++++----------------------- 1 file changed, 12 insertions(+), 40 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 018a50b3d42544..ef8be294d92d61 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -928,14 +928,23 @@ mod tests { state_machine.deschedule_task(&task1); assert!(state_machine.has_unblocked_task()); assert_eq!(state_machine.unblocked_task_queue_count(), 1); + + // unblocked_task_count() should be incremented + assert_eq!(state_machine.unblocked_task_count(), 0); assert_eq!( state_machine .schedule_unblocked_task() - .unwrap() - .task_index(), - task2.task_index() + .map(|t| t.task_index()), + Some(102) ); + assert_eq!(state_machine.unblocked_task_count(), 1); + + // there's no blocked task anymore; calling schedule_unblocked_task should be noop and + // shouldn't increment the unblocked_task_count(). assert!(!state_machine.has_unblocked_task()); + assert_matches!(state_machine.schedule_unblocked_task(), None); + assert_eq!(state_machine.unblocked_task_count(), 1); + assert_eq!(state_machine.unblocked_task_queue_count(), 0); state_machine.deschedule_task(&task2); @@ -949,43 +958,6 @@ mod tests { assert!(state_machine.has_no_active_task()); } - #[test] - fn test_unblocked_task_related_counts() { - let sanitized = simplest_transaction(); - let address_loader = &mut create_address_loader(None); - let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); - let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); - - let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() - }; - assert_matches!( - state_machine - .schedule_task(task1.clone()) - .map(|t| t.task_index()), - Some(101) - ); - assert_matches!(state_machine.schedule_task(task2.clone()), None); - - state_machine.deschedule_task(&task1); - - assert_eq!(state_machine.unblocked_task_count(), 0); - assert_matches!( - state_machine - .schedule_unblocked_task() - .map(|t| t.task_index()), - Some(102) - ); - assert_eq!(state_machine.unblocked_task_count(), 1); - // there's no blocked task anymore; calling schedule_unblocked_task should be noop and - // shouldn't increment the unblocked_task_count(). - assert_matches!(state_machine.schedule_unblocked_task(), None); - assert_eq!(state_machine.unblocked_task_count(), 1); - - state_machine.deschedule_task(&task2); - assert!(state_machine.has_no_active_task()); - } - #[test] fn test_existing_blocking_task_then_newly_scheduled_task() { let sanitized = simplest_transaction(); From 3b9fb2cc5c65ce9aa8db459b51280c950c737e09 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 7 Mar 2024 15:56:21 +0900 Subject: [PATCH 29/31] Remove test_debug --- unified-scheduler-logic/src/lib.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index ef8be294d92d61..bd43746923b9ad 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -837,24 +837,6 @@ mod tests { } } - #[test] - fn test_debug() { - // these are almost meaningless just to see eye-pleasing coverage report.... - assert_eq!( - format!("{:?}", LockResult::Ok(Usage::Readonly(ShortCounter::one()))), - "Ok(Readonly(ShortCounter(1)))" - ); - let sanitized = simplest_transaction(); - let task = - SchedulingStateMachine::create_task(sanitized, 0, &mut |_| UsageQueue::default()); - assert!(format!("{:?}", task).contains("TaskInner")); - - assert_eq!( - format!("{:?}", UsageQueueInner::default()), - "UsageQueueInner { current_usage: Unused, blocked_usages_from_tasks: [] }" - ) - } - #[test] fn test_scheduling_state_machine_creation() { let state_machine = unsafe { From d072efdd4d721613bd6ea89f96e3af060aa92888 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 7 Mar 2024 16:08:39 +0900 Subject: [PATCH 30/31] Remove assertions of task_index() --- unified-scheduler-logic/src/lib.rs | 84 +----------------------------- 1 file changed, 1 insertion(+), 83 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index bd43746923b9ad..647f5e4ca3a0f6 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -475,7 +475,6 @@ const_assert_eq!(mem::size_of::(), 8); /// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by /// `solana-unified-scheduler-pool`. pub struct SchedulingStateMachine { - last_task_index: Option, unblocked_task_queue: VecDeque, active_task_count: ShortCounter, handled_task_count: ShortCounter, @@ -484,7 +483,7 @@ pub struct SchedulingStateMachine { count_token: BlockedUsageCountToken, usage_queue_token: UsageQueueToken, } -const_assert_eq!(mem::size_of::(), 64); +const_assert_eq!(mem::size_of::(), 48); impl SchedulingStateMachine { pub fn has_no_active_task(&self) -> bool { @@ -524,13 +523,6 @@ impl SchedulingStateMachine { /// of given task _conditionally_ for future optimization. #[must_use] pub fn schedule_task(&mut self, task: Task) -> Option { - let new_task_index = task.task_index(); - if let Some(old_task_index) = self.last_task_index.replace(new_task_index) { - assert!( - new_task_index > old_task_index, - "bad new task index: {new_task_index} > {old_task_index}" - ); - } self.total_task_count.increment_self(); self.active_task_count.increment_self(); self.try_lock_for_task(task) @@ -544,14 +536,6 @@ impl SchedulingStateMachine { } pub fn deschedule_task(&mut self, task: &Task) { - let descheduled_task_index = task.task_index(); - let largest_task_index = self - .last_task_index - .expect("task should have been scheduled"); - assert!( - descheduled_task_index <= largest_task_index, - "bad descheduled task index: {descheduled_task_index} <= {largest_task_index}" - ); self.active_task_count.decrement_self(); self.handled_task_count.increment_self(); self.unlock_for_task(task); @@ -740,7 +724,6 @@ impl SchedulingStateMachine { assert_eq!(self.unblocked_task_queue.len(), 0); // nice trick to ensure all fields are handled here if new one is added. let Self { - last_task_index, unblocked_task_queue: _, active_task_count, handled_task_count, @@ -750,7 +733,6 @@ impl SchedulingStateMachine { usage_queue_token: _, // don't add ".." here } = self; - *last_task_index = None; active_task_count.reset_to_zero(); handled_task_count.reset_to_zero(); unblocked_task_count.reset_to_zero(); @@ -765,7 +747,6 @@ impl SchedulingStateMachine { #[must_use] pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self { Self { - last_task_index: None, // It's very unlikely this is desired to be configurable, like // `UsageQueueInner::blocked_usages_from_tasks`'s cap. unblocked_task_queue: VecDeque::with_capacity(1024), @@ -854,10 +835,8 @@ mod tests { }; state_machine.total_task_count.increment_self(); assert_eq!(state_machine.total_task_count(), 1); - state_machine.last_task_index = Some(1); state_machine.reinitialize(); assert_eq!(state_machine.total_task_count(), 0); - assert_eq!(state_machine.last_task_index, None); } #[test] @@ -1317,65 +1296,4 @@ mod tests { &LockAttempt::new(usage_queue, RequestedUsage::Writable), ); } - - #[test] - #[should_panic(expected = "bad new task index: 101 > 101")] - fn test_schedule_same_task() { - let conflicting_address = Pubkey::new_unique(); - let sanitized = transaction_with_writable_address(conflicting_address); - let address_loader = &mut create_address_loader(None); - let task = SchedulingStateMachine::create_task(sanitized, 101, address_loader); - - let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() - }; - let _ = state_machine.schedule_task(task.clone()); - let _ = state_machine.schedule_task(task.clone()); - } - - #[test] - #[should_panic(expected = "bad new task index: 101 > 102")] - fn test_schedule_task_out_of_order() { - let conflicting_address = Pubkey::new_unique(); - let sanitized = transaction_with_writable_address(conflicting_address); - let address_loader = &mut create_address_loader(None); - let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); - let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); - - let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() - }; - let _ = state_machine.schedule_task(task2.clone()); - let _ = state_machine.schedule_task(task1.clone()); - } - - #[test] - #[should_panic(expected = "task should have been scheduled")] - fn test_deschedule_new_task_wihout_scheduling() { - let conflicting_address = Pubkey::new_unique(); - let sanitized = transaction_with_writable_address(conflicting_address); - let address_loader = &mut create_address_loader(None); - let task = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); - - let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() - }; - state_machine.deschedule_task(&task); - } - - #[test] - #[should_panic(expected = "bad descheduled task index: 102 <= 101")] - fn test_deschedule_new_task_out_of_order() { - let conflicting_address = Pubkey::new_unique(); - let sanitized = transaction_with_writable_address(conflicting_address); - let address_loader = &mut create_address_loader(None); - let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader); - let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader); - - let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() - }; - let _ = state_machine.schedule_task(task1.clone()); - state_machine.deschedule_task(&task2); - } } From 001b10e17d6444ba296c27d25191408f614bdf85 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 18 Mar 2024 15:31:07 +0900 Subject: [PATCH 31/31] Fix UB in TokenCell --- unified-scheduler-logic/src/lib.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 647f5e4ca3a0f6..dcf045b90bbe86 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -220,7 +220,12 @@ mod utils { /// instances of [`TokenCell`] conceptually owned by the instance of [`Token`] (a /// particular thread), unless previous borrow is released. After the release, the used /// singleton token should be free to be reused for reborrows. - pub(super) fn borrow_mut<'t>(&self, _token: &'t mut Token) -> &'t mut V { + /// + /// Note that the returned reference's lifetime is restricted to 'self, not 'token to avoid + /// use-after-free undefined behaviors. + // As it's protected by token, it's okay to suppress this clippy lint + #[allow(clippy::mut_from_ref)] + pub(super) fn borrow_mut(&self, _token: &mut Token) -> &mut V { unsafe { &mut *self.0.get() } } } @@ -332,10 +337,7 @@ impl TaskInner { &self.lock_attempts } - fn blocked_usage_count_mut<'t>( - &self, - token: &'t mut BlockedUsageCountToken, - ) -> &'t mut ShortCounter { + fn blocked_usage_count_mut(&self, token: &mut BlockedUsageCountToken) -> &mut ShortCounter { self.blocked_usage_count.borrow_mut(token) } @@ -369,10 +371,7 @@ impl LockAttempt { } } - fn usage_queue_mut<'t>( - &self, - usage_queue_token: &'t mut UsageQueueToken, - ) -> &'t mut UsageQueueInner { + fn usage_queue_mut(&self, usage_queue_token: &mut UsageQueueToken) -> &mut UsageQueueInner { self.usage_queue.0.borrow_mut(usage_queue_token) } } @@ -1251,11 +1250,12 @@ mod tests { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; let usage_queue = UsageQueue::default(); + let usage_queue_for_lock_attempt = UsageQueue::default(); let _ = SchedulingStateMachine::unlock_usage_queue( usage_queue .0 .borrow_mut(&mut state_machine.usage_queue_token), - &LockAttempt::new(usage_queue, RequestedUsage::Writable), + &LockAttempt::new(usage_queue_for_lock_attempt, RequestedUsage::Writable), ); } @@ -1270,11 +1270,12 @@ mod tests { .0 .borrow_mut(&mut state_machine.usage_queue_token) .current_usage = Usage::Writable; + let usage_queue_for_lock_attempt = UsageQueue::default(); let _ = SchedulingStateMachine::unlock_usage_queue( usage_queue .0 .borrow_mut(&mut state_machine.usage_queue_token), - &LockAttempt::new(usage_queue, RequestedUsage::Readonly), + &LockAttempt::new(usage_queue_for_lock_attempt, RequestedUsage::Readonly), ); } @@ -1289,11 +1290,12 @@ mod tests { .0 .borrow_mut(&mut state_machine.usage_queue_token) .current_usage = Usage::Readonly(ShortCounter::one()); + let usage_queue_for_lock_attempt = UsageQueue::default(); let _ = SchedulingStateMachine::unlock_usage_queue( usage_queue .0 .borrow_mut(&mut state_machine.usage_queue_token), - &LockAttempt::new(usage_queue, RequestedUsage::Writable), + &LockAttempt::new(usage_queue_for_lock_attempt, RequestedUsage::Writable), ); } }