Skip to content

Commit

Permalink
refactor: cleanup KeyRwLock staled locks before granting new lock
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 1, 2024
1 parent 44eed75 commit 8410e7c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl ExecResult {
struct ProcedureGuard {
meta: ProcedureMetaRef,
manager_ctx: Arc<ManagerContext>,
key_guards: Vec<OwnedKeyRwLockGuard<String>>,
key_guards: Vec<OwnedKeyRwLockGuard>,
finish: bool,
}

Expand Down
113 changes: 36 additions & 77 deletions src/common/procedure/src/local/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,57 +19,23 @@ use std::sync::{Arc, Mutex};

use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, TryLockError};

pub enum OwnedKeyRwLockGuard<K: Eq + Hash> {
Read(OwnedKeyRwLockReadGuard<K>),
Write(OwnedKeyRwLockWriteGuard<K>),
pub enum OwnedKeyRwLockGuard {
Read(OwnedRwLockReadGuard<()>),
Write(OwnedRwLockWriteGuard<()>),
}

impl<K: Eq + Hash> From<OwnedKeyRwLockReadGuard<K>> for OwnedKeyRwLockGuard<K> {
fn from(guard: OwnedKeyRwLockReadGuard<K>) -> Self {
impl From<OwnedRwLockReadGuard<()>> for OwnedKeyRwLockGuard {
fn from(guard: OwnedRwLockReadGuard<()>) -> Self {
OwnedKeyRwLockGuard::Read(guard)
}
}

impl<K: Eq + Hash> From<OwnedKeyRwLockWriteGuard<K>> for OwnedKeyRwLockGuard<K> {
fn from(guard: OwnedKeyRwLockWriteGuard<K>) -> Self {
impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
fn from(guard: OwnedRwLockWriteGuard<()>) -> Self {
OwnedKeyRwLockGuard::Write(guard)
}
}

pub struct OwnedKeyRwLockReadGuard<K: Eq + Hash> {
key: K,
inner: Arc<Mutex<HashMap<K, Arc<RwLock<()>>>>>,
guard: Option<OwnedRwLockReadGuard<()>>,
}

pub struct OwnedKeyRwLockWriteGuard<K: Eq + Hash> {
key: K,
inner: Arc<Mutex<HashMap<K, Arc<RwLock<()>>>>>,
guard: Option<OwnedRwLockWriteGuard<()>>,
}

impl<K: Eq + Hash> Drop for OwnedKeyRwLockWriteGuard<K> {
fn drop(&mut self) {
// Always releases inner lock first.
{
self.guard.take().unwrap();
}
let mut locks = self.inner.lock().unwrap();
KeyRwLock::remove_key(&mut locks, &self.key);
}
}

impl<K: Eq + Hash> Drop for OwnedKeyRwLockReadGuard<K> {
fn drop(&mut self) {
// Always releases inner lock first.
{
self.guard.take().unwrap();
}
let mut locks = self.inner.lock().unwrap();
KeyRwLock::remove_key(&mut locks, &self.key);
}
}

/// Locks based on a key, allowing other keys to lock independently.
#[derive(Debug)]
pub struct KeyRwLock<K> {
Expand All @@ -79,14 +45,23 @@ pub struct KeyRwLock<K> {

impl<K> KeyRwLock<K>
where
K: Eq + Hash,
K: Eq + Hash + Clone,
{
/// Removes a key lock if it's exists and no one in use.
pub(crate) fn remove_key(locks: &mut HashMap<K, Arc<RwLock<()>>>, key: &K) {
if let Some(lock) = locks.get(key) {
if lock.try_write().is_ok() {
locks.remove(key);
}
/// Remove locks that are not locked currently.
fn clean_up(locks: &mut HashMap<K, Arc<RwLock<()>>>) {
let keys = locks
.iter()
.filter_map(|(key, lock)| {
if lock.try_write().is_ok() {
Some(key.clone())
} else {
None
}
})
.collect::<Vec<_>>();

for key in keys {
locks.remove(&key);
}
}
}
Expand All @@ -102,63 +77,47 @@ where
}

/// Locks the key with shared read access, returning a guard.
pub async fn read(&self, key: K) -> OwnedKeyRwLockReadGuard<K> {
pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

OwnedKeyRwLockReadGuard {
key,
inner: self.inner.clone(),
guard: Some(lock.read_owned().await),
}
lock.read_owned().await
}

/// Locks the key with exclusive write access, returning a guard.
pub async fn write(&self, key: K) -> OwnedKeyRwLockWriteGuard<K> {
pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

OwnedKeyRwLockWriteGuard {
key,
inner: self.inner.clone(),
guard: Some(lock.write_owned().await),
}
lock.write_owned().await
}

/// Tries to lock the key with shared read access, returning immediately.
pub fn try_read(&self, key: K) -> Result<OwnedKeyRwLockReadGuard<K>, TryLockError> {
pub fn try_read(&self, key: K) -> Result<OwnedRwLockReadGuard<()>, TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

let guard = lock.try_read_owned()?;

Ok(OwnedKeyRwLockReadGuard {
key,
inner: self.inner.clone(),
guard: Some(guard),
})
lock.try_read_owned()
}

/// Tries lock this key with exclusive write access, returning immediately.
pub fn try_write(&self, key: K) -> Result<OwnedKeyRwLockWriteGuard<K>, TryLockError> {
pub fn try_write(&self, key: K) -> Result<OwnedRwLockWriteGuard<()>, TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

let guard = lock.try_write_owned()?;

Ok(OwnedKeyRwLockWriteGuard {
key,
inner: self.inner.clone(),
guard: Some(guard),
})
lock.try_write_owned()
}

/// Returns number of keys.
Expand Down Expand Up @@ -195,6 +154,6 @@ mod tests {
assert!(lock_key.try_write("test1").is_err());
}

assert!(lock_key.is_empty());
assert_eq!(lock_key.len(), 2);
}
}

0 comments on commit 8410e7c

Please sign in to comment.