From 52a63e9e3541edcbedd7538d1156cb8650d395d4 Mon Sep 17 00:00:00 2001 From: G8XSU <3442979+G8XSU@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:01:09 -0800 Subject: [PATCH 1/2] Fix PaginatedKVStore interface to account for time ordering in pagination. --- ldk-server/src/io/paginated_kv_store.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ldk-server/src/io/paginated_kv_store.rs b/ldk-server/src/io/paginated_kv_store.rs index f0792d6..28c4ad1 100644 --- a/ldk-server/src/io/paginated_kv_store.rs +++ b/ldk-server/src/io/paginated_kv_store.rs @@ -72,7 +72,7 @@ pub trait PaginatedKVStore { /// `primary_namespace`, ordered in descending order of `time`. /// /// The `list` method returns the latest records first, based on the `time` associated with each key. - /// Pagination is controlled by the `next_page_token`, which is an `Option` + /// Pagination is controlled by the `next_page_token`, which is an `Option<(String, i64)>` /// used to determine the starting point for the next page of results. If `next_page_token` is `None`, /// the listing starts from the most recent entry. The `next_page_token` in the returned /// [`ListResponse`] can be used to fetch the next page of results. @@ -85,7 +85,8 @@ pub trait PaginatedKVStore { /// /// [`ListResponse`]: struct.ListResponse.html fn list( - &self, primary_namespace: &str, secondary_namespace: &str, next_page_token: Option, + &self, primary_namespace: &str, secondary_namespace: &str, + next_page_token: Option<(String, i64)>, ) -> Result; } @@ -98,5 +99,5 @@ pub struct ListResponse { pub keys: Vec, /// A token that can be used to retrieve the next set of keys. - pub next_page_token: Option, + pub next_page_token: Option<(String, i64)>, } From 7eab8bf01bc7f6b4225fb70752fbd92b773b2f79 Mon Sep 17 00:00:00 2001 From: G8XSU <3442979+G8XSU@users.noreply.github.com> Date: Tue, 26 Nov 2024 09:52:10 -0800 Subject: [PATCH 2/2] Add Sqlite based PaginatedKVStore impl. --- Cargo.lock | 2 + ldk-server/Cargo.toml | 4 + ldk-server/src/io/mod.rs | 2 + ldk-server/src/io/sqlite_store/mod.rs | 440 ++++++++++++++++++++++++++ ldk-server/src/io/utils.rs | 99 ++++++ 5 files changed, 547 insertions(+) create mode 100644 ldk-server/src/io/sqlite_store/mod.rs create mode 100644 ldk-server/src/io/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 33c2133..507668c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -946,6 +946,8 @@ dependencies = [ "ldk-node", "ldk-server-protos", "prost", + "rand", + "rusqlite", "serde", "serde_json", "tokio", diff --git a/ldk-server/Cargo.toml b/ldk-server/Cargo.toml index 984e207..ceebf1c 100644 --- a/ldk-server/Cargo.toml +++ b/ldk-server/Cargo.toml @@ -15,3 +15,7 @@ prost = { version = "0.11.6", default-features = false, features = ["std"] } ldk-server-protos = { path = "../ldk-server-protos" } bytes = "1.4.0" hex = { package = "hex-conservative", version = "0.2.1", default-features = false } +rusqlite = { version = "0.28.0", features = ["bundled"] } + +[dev-dependencies] +rand = "0.8.5" diff --git a/ldk-server/src/io/mod.rs b/ldk-server/src/io/mod.rs index ff65caf..6cda7ce 100644 --- a/ldk-server/src/io/mod.rs +++ b/ldk-server/src/io/mod.rs @@ -1 +1,3 @@ pub(crate) mod paginated_kv_store; +pub(crate) mod sqlite_store; +pub(crate) mod utils; diff --git a/ldk-server/src/io/sqlite_store/mod.rs b/ldk-server/src/io/sqlite_store/mod.rs new file mode 100644 index 0000000..cee2bae --- /dev/null +++ b/ldk-server/src/io/sqlite_store/mod.rs @@ -0,0 +1,440 @@ +use crate::io::paginated_kv_store::{ListResponse, PaginatedKVStore}; +use crate::io::utils::check_namespace_key_validity; +use ldk_node::lightning::types::string::PrintableString; +use rusqlite::{named_params, Connection}; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::{fs, io}; + +/// The default database file name. +pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_server_data.sqlite"; + +/// The default table in which we store all the paginated data. +pub const DEFAULT_PAGINATED_KV_TABLE_NAME: &str = "ldk_paginated_data"; + +// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration. +const SCHEMA_USER_VERSION: u16 = 1; + +// The maximum number of keys retrieved per page in paginated list operation. +const LIST_KEYS_MAX_PAGE_SIZE: i32 = 100; + +pub struct SqliteStore { + connection: Arc>, + data_dir: PathBuf, + paginated_kv_table_name: String, +} + +impl SqliteStore { + /// Constructs a new [`SqliteStore`]. + /// + /// If not already existing, a new SQLite database will be created in the given `data_dir` under the + /// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`). + /// + /// Similarly, the given `paginated_kv_table_name` will be used or default to [`DEFAULT_PAGINATED_KV_TABLE_NAME`]. + pub fn new( + data_dir: PathBuf, db_file_name: Option, paginated_kv_table_name: Option, + ) -> io::Result { + let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string()); + let paginated_kv_table_name = + paginated_kv_table_name.unwrap_or(DEFAULT_PAGINATED_KV_TABLE_NAME.to_string()); + + fs::create_dir_all(data_dir.clone()).map_err(|e| { + let msg = format!( + "Failed to create database destination directory {}: {}", + data_dir.display(), + e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + let mut db_file_path = data_dir.clone(); + db_file_path.push(db_file_name); + + let connection = Connection::open(db_file_path.clone()).map_err(|e| { + let msg = + format!("Failed to open/create database file {}: {}", db_file_path.display(), e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let sql = format!("SELECT user_version FROM pragma_user_version"); + let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); + + if version_res == 0 { + // New database, set our SCHEMA_USER_VERSION and continue + connection + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + SCHEMA_USER_VERSION, + |_| Ok(()), + ) + .map_err(|e| { + let msg = format!("Failed to set PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + } else if version_res > SCHEMA_USER_VERSION { + let msg = format!( + "Failed to open database: incompatible schema version {}. Expected: {}", + version_res, SCHEMA_USER_VERSION + ); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let create_paginated_kv_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + primary_namespace TEXT NOT NULL, + secondary_namespace TEXT DEFAULT \"\" NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + creation_time INTEGER NOT NULL, + value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key ) + );", + paginated_kv_table_name + ); + + connection.execute(&create_paginated_kv_table_sql, []).map_err(|e| { + let msg = format!("Failed to create table {}: {}", paginated_kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let index_creation_time_sql = format!( + "CREATE INDEX idx_creation_time ON {} (creation_time);", + paginated_kv_table_name + ); + + connection.execute(&index_creation_time_sql, []).map_err(|e| { + let msg = format!( + "Failed to create index on creation_time, table {}: {}", + paginated_kv_table_name, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let connection = Arc::new(Mutex::new(connection)); + Ok(Self { connection, data_dir, paginated_kv_table_name }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.data_dir.clone() + } + + fn read_internal( + &self, kv_table_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + + let locked_conn = self.connection.lock().unwrap(); + let sql = + format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", + kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let res = stmt + .query_row( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + }, + |row| row.get(0), + ) + .map_err(|e| match e { + rusqlite::Error::QueryReturnedNoRows => { + let msg = format!( + "Failed to read as key could not be found: {}/{}/{}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + io::Error::new(io::ErrorKind::NotFound, msg) + }, + e => { + let msg = format!( + "Failed to read from key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + io::Error::new(io::ErrorKind::Other, msg) + }, + })?; + Ok(res) + } + + fn remove_internal( + &self, kv_table_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + }) + .map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + Ok(()) + } +} + +impl PaginatedKVStore for SqliteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.read_internal( + &self.paginated_kv_table_name, + primary_namespace, + secondary_namespace, + key, + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, time: i64, buf: &[u8], + ) -> io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!( + "INSERT INTO {} (primary_namespace, secondary_namespace, key, creation_time, value) + VALUES (:primary_namespace, :secondary_namespace, :key, :creation_time, :value) + ON CONFLICT(primary_namespace, secondary_namespace, key) + DO UPDATE SET value = excluded.value;", + self.paginated_kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + ":creation_time": time, + ":value": buf, + }) + .map(|_| ()) + .map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + io::Error::new(io::ErrorKind::Other, msg) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> io::Result<()> { + self.remove_internal( + &self.paginated_kv_table_name, + primary_namespace, + secondary_namespace, + key, + ) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + page_token: Option<(String, i64)>, + ) -> io::Result { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!( + "SELECT key, creation_time FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace \ + AND ( creation_time < :creation_time_token OR (creation_time = :creation_time_token AND key > :key_token) ) \ + ORDER BY creation_time DESC, key ASC LIMIT :page_size", + self.paginated_kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut keys: Vec = Vec::new(); + let page_token = page_token.unwrap_or(("".to_string(), i64::MAX)); + + let rows_iter = stmt + .query_map( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key_token": page_token.0, + ":creation_time_token": page_token.1, + ":page_size": LIST_KEYS_MAX_PAGE_SIZE, + }, + |row| { + let key: String = row.get(0)?; + let creation_time: i64 = row.get(1)?; + Ok((key, creation_time)) + }, + ) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut last_creation_time: Option = None; + for r in rows_iter { + let (k, ct) = r.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + keys.push(k); + last_creation_time = Some(ct); + } + + let last_key = keys.last().cloned(); + let next_page_token = if let (Some(k), Some(ct)) = (last_key, last_creation_time) { + Some((k, ct)) + } else { + None + }; + + Ok(ListResponse { keys, next_page_token }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ldk_node::lightning::util::persist::KVSTORE_NAMESPACE_KEY_MAX_LEN; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + use std::panic::RefUnwindSafe; + + impl Drop for SqliteStore { + fn drop(&mut self) { + match fs::remove_dir_all(&self.data_dir) { + Err(e) => println!("Failed to remove test store directory: {}", e), + _ => {}, + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = random_storage_path(); + temp_path.push("read_write_remove_list_persist"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + do_read_write_remove_list_persist(&store); + } + + pub(crate) fn random_storage_path() -> PathBuf { + let mut temp_path = std::env::temp_dir(); + let mut rng = thread_rng(); + let rand_dir: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + temp_path.push(rand_dir); + temp_path + } + + pub(crate) fn do_read_write_remove_list_persist( + kv_store: &K, + ) { + let data = [42u8; 32]; + + let primary_namespace = "testspace"; + let secondary_namespace = "testsubspace"; + let testkey = "testkey_0"; + + let list_all_keys = |primary_namespace: &str, secondary_namespace: &str| -> Vec { + let mut all_keys = Vec::new(); + let mut page_token = None; + loop { + let list_response = + kv_store.list(primary_namespace, secondary_namespace, page_token).unwrap(); + assert!(list_response.keys.len() <= LIST_KEYS_MAX_PAGE_SIZE as usize); + all_keys.extend(list_response.keys); + if list_response.next_page_token.is_none() { + break; + } + page_token = list_response.next_page_token; + } + all_keys + }; + + // Test the basic KVStore operations. + for i in 0..110 { + kv_store + .write(primary_namespace, secondary_namespace, &format!("testkey_{}", i), 0, &data) + .unwrap(); + } + + // Test empty primary/secondary namespaces are allowed, but not empty primary namespace and non-empty + // secondary primary_namespace, and not empty key. + kv_store.write("", "", testkey, 0, &data).unwrap(); + let res = + std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, testkey, 0, &data)); + assert!(res.is_err()); + let res = std::panic::catch_unwind(|| { + kv_store.write(primary_namespace, secondary_namespace, "", 0, &data) + }); + assert!(res.is_err()); + + let listed_keys = list_all_keys(primary_namespace, secondary_namespace); + assert_eq!(listed_keys.len(), 110); + assert_eq!(listed_keys[0], testkey); + + let read_data = kv_store.read(primary_namespace, secondary_namespace, testkey).unwrap(); + assert_eq!(data, &*read_data); + + kv_store.remove(primary_namespace, secondary_namespace, testkey, false).unwrap(); + + let listed_keys = list_all_keys(primary_namespace, secondary_namespace); + assert_eq!(listed_keys.len(), 109); + + // Ensure we have no issue operating with primary_namespace/secondary_namespace/key being KVSTORE_NAMESPACE_KEY_MAX_LEN + let max_chars: String = + std::iter::repeat('A').take(KVSTORE_NAMESPACE_KEY_MAX_LEN).collect(); + kv_store.write(&max_chars, &max_chars, &max_chars, 0, &data).unwrap(); + + println!("{:?}", listed_keys); + + let listed_keys = list_all_keys(&max_chars, &max_chars); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], max_chars); + + let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap(); + assert_eq!(data, &*read_data); + + kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap(); + + let listed_keys = list_all_keys(&max_chars, &max_chars); + assert_eq!(listed_keys.len(), 0); + } +} diff --git a/ldk-server/src/io/utils.rs b/ldk-server/src/io/utils.rs new file mode 100644 index 0000000..b9bfe59 --- /dev/null +++ b/ldk-server/src/io/utils.rs @@ -0,0 +1,99 @@ +use ldk_node::lightning::types::string::PrintableString; +use ldk_node::lightning::util::persist::{ + KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, +}; + +pub(crate) fn check_namespace_key_validity( + primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, operation: &str, +) -> Result<(), std::io::Error> { + if let Some(key) = key { + if key.is_empty() { + debug_assert!( + false, + "Failed to {} {}/{}/{}: key may not be empty.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + let msg = format!( + "Failed to {} {}/{}/{}: key may not be empty.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if primary_namespace.is_empty() && !secondary_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, + PrintableString(primary_namespace), PrintableString(secondary_namespace), PrintableString(key) + ); + let msg = format!( + "Failed to {} {}/{}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", operation, + PrintableString(primary_namespace), PrintableString(secondary_namespace), PrintableString(key) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if !is_valid_kvstore_str(primary_namespace) + || !is_valid_kvstore_str(secondary_namespace) + || !is_valid_kvstore_str(key) + { + debug_assert!( + false, + "Failed to {} {}/{}/{}: primary namespace, secondary namespace, and key must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + let msg = format!( + "Failed to {} {}/{}/{}: primary namespace, secondary namespace, and key must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } else { + if primary_namespace.is_empty() && !secondary_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, PrintableString(primary_namespace), PrintableString(secondary_namespace) + ); + let msg = format!( + "Failed to {} {}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, PrintableString(primary_namespace), PrintableString(secondary_namespace) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + if !is_valid_kvstore_str(primary_namespace) || !is_valid_kvstore_str(secondary_namespace) { + debug_assert!( + false, + "Failed to {} {}/{}: primary namespace and secondary namespace must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace) + ); + let msg = format!( + "Failed to {} {}/{}: primary namespace and secondary namespace must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } + + Ok(()) +} + +pub(crate) fn is_valid_kvstore_str(key: &str) -> bool { + key.len() <= KVSTORE_NAMESPACE_KEY_MAX_LEN + && key.chars().all(|c| KVSTORE_NAMESPACE_KEY_ALPHABET.contains(c)) +}