Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow procedure to acquire share lock #3061

Merged
merged 14 commits into from
Jan 3, 2024
Merged
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl Procedure for AlterTableProcedure {
fn lock_key(&self) -> LockKey {
let key = self.lock_key_inner();

LockKey::new(key)
LockKey::new_exclusive(key)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl Procedure for CreateTableProcedure {
table_ref.table,
);

LockKey::single(key)
LockKey::single_exclusive(key)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Procedure for DropTableProcedure {
table_ref.table,
);

LockKey::single(key)
LockKey::single_exclusive(key)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/truncate_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Procedure for TruncateTableProcedure {
table_ref.table,
);

LockKey::single(key)
LockKey::single_exclusive(key)
}
}

Expand Down
25 changes: 11 additions & 14 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod lock;
mod runner;
mod rwlock;

use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -29,11 +29,11 @@ use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};

use self::rwlock::KeyRwLock;
use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef};
Expand All @@ -57,8 +57,6 @@ const META_TTL: Duration = Duration::from_secs(60 * 10);
pub(crate) struct ProcedureMeta {
/// Id of this procedure.
id: ProcedureId,
/// Notify to wait for a lock.
lock_notify: Notify,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
/// Notify to wait for subprocedures.
Expand All @@ -78,7 +76,6 @@ impl ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(ProcedureState::Running);
ProcedureMeta {
id,
lock_notify: Notify::new(),
parent_id,
child_notify: Notify::new(),
lock_key,
Expand Down Expand Up @@ -131,7 +128,7 @@ struct LoadedProcedure {
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
lock_map: LockMap,
key_lock: KeyRwLock<String>,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
Expand All @@ -152,8 +149,8 @@ impl ManagerContext {
/// Returns a new [ManagerContext].
fn new() -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
lock_map: LockMap::new(),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
finished_procedures: Mutex::new(VecDeque::new()),
Expand Down Expand Up @@ -850,7 +847,7 @@ mod tests {
assert!(manager.procedure_watcher(procedure_id).is_none());

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
assert!(manager
.submit(ProcedureWithId {
id: procedure_id,
Expand Down Expand Up @@ -918,7 +915,7 @@ mod tests {
}

fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
LockKey::single_exclusive("test.submit")
}
}

Expand Down Expand Up @@ -955,7 +952,7 @@ mod tests {
let manager = LocalManager::new(config, state_store);

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert_matches!(
manager
Expand Down Expand Up @@ -986,7 +983,7 @@ mod tests {
manager.start().await.unwrap();

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {
Expand Down Expand Up @@ -1018,7 +1015,7 @@ mod tests {
manager.manager_ctx.set_running();

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {
Expand All @@ -1041,7 +1038,7 @@ mod tests {
// The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
manager.stop().await.unwrap();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();

manager.manager_ctx.set_running();
Expand All @@ -1063,7 +1060,7 @@ mod tests {

// After restart
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {
Expand Down
214 changes: 0 additions & 214 deletions src/common/procedure/src/local/lock.rs

This file was deleted.

Loading
Loading