From 19586566fec857f36b156c0b3195f9cff456b686 Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Wed, 24 Jan 2024 15:44:42 -0700 Subject: [PATCH] feat: Modify recon storage tables, trait and sqlite config to improve throughput (#243) * feat: split recon values into their own table * feat: Remove recon add value operation and combine it with insert insert and insert_many have been modified to perform better in sqlite. We grab a transaction and do all our writes, quitting early if we've done anything before. This has appeared to about 2x throughput from the shared pool and multiple write operations we had before * feat: adjust sqlite config options synchronous = normal is typically sufficient in WAL journal mode, and demonstrates a 2x increase in writes in my benchmarks * fix: only return true from insert when key is updated * fix: update recon test expectations (and typo) we need to figure out how to make these deterministic. currently they exact order of exchange depends on the speed at which the other side takes certain actions. speeding up the inserts appears to have caused a resp before a second req in some cases. --------- Co-authored-by: David Estes --- api/src/server.rs | 131 ++++++++++---------- core/src/lib.rs | 2 +- core/src/sql.rs | 19 ++- p2p/src/node.rs | 6 +- recon/src/client.rs | 39 ++---- recon/src/metrics.rs | 16 ++- recon/src/protocol.rs | 19 ++- recon/src/recon.rs | 151 +++++++++++++++------- recon/src/recon/btreestore.rs | 35 ++++-- recon/src/recon/sqlitestore.rs | 206 ++++++++++++++++++++++--------- recon/src/recon/store_metrics.rs | 22 ++-- recon/src/recon/tests.rs | 32 ++--- 12 files changed, 429 insertions(+), 249 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 6c7addf8c..2af1588a6 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -37,7 +37,7 @@ pub trait Recon: Clone + Send + Sync { type Key: Key; type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>; - async fn insert(&self, key: Self::Key) -> Result<()>; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()>; async fn range( &self, start: Self::Key, @@ -47,7 +47,6 @@ pub trait Recon: Clone + Send + Sync { ) -> Result>; async fn value_for_key(&self, key: Self::Key) -> Result>>; - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>; } #[async_trait] @@ -59,8 +58,8 @@ where type Key = K; type Hash = H; - async fn insert(&self, key: Self::Key) -> Result<()> { - let _ = recon::Client::insert(self, key).await?; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + let _ = recon::Client::insert(self, key, value).await?; Ok(()) } @@ -78,9 +77,6 @@ where async fn value_for_key(&self, key: Self::Key) -> Result>> { recon::Client::value_for_key(self, key).await } - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> { - recon::Client::store_value_for_key(self, key, value).await - } } #[derive(Clone)] @@ -146,13 +142,10 @@ where let event_id = decode_event_id(&event.event_id)?; let event_data = decode_event_data(&event.event_data)?; self.model - .insert(event_id.clone()) - .await - .map_err(|err| ApiError(format!("failed to insert key: {err}")))?; - self.model - .store_value_for_key(event_id, &event_data) + .insert(event_id.clone(), Some(event_data)) .await .map_err(|err| ApiError(format!("failed to insert key: {err}")))?; + Ok(EventsPostResponse::Success) } @@ -240,7 +233,7 @@ where .with_not_after(0) .build(); self.interest - .insert(interest) + .insert(interest, None) .await .map_err(|err| ApiError(format!("failed to update interest: {err}")))?; @@ -297,7 +290,7 @@ mod tests { struct Context; mock! { pub ReconInterestTest { - fn insert(&self, key: Interest) -> Result<()>; + fn insert(&self, key: Interest, value: Option>) -> Result<()>; fn range( &self, start: Interest, @@ -316,8 +309,8 @@ mod tests { impl Recon for MockReconInterestTest { type Key = Interest; type Hash = Sha256a; - async fn insert(&self, key: Self::Key) -> Result<()> { - self.insert(key) + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + self.insert(key, value) } async fn range( &self, @@ -331,14 +324,11 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } } mock! { pub ReconModelTest { - fn insert(&self, key: EventId) -> Result<()>; + fn insert(&self, key: EventId, value: Option>) -> Result<()>; fn range( &self, start: EventId, @@ -356,8 +346,8 @@ mod tests { impl Recon for MockReconModelTest { type Key = EventId; type Hash = Sha256a; - async fn insert(&self, key: Self::Key) -> Result<()> { - self.insert(key) + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + self.insert(key, value) } async fn range( &self, @@ -371,9 +361,6 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } } #[tokio::test] @@ -391,19 +378,23 @@ mod tests { &Cid::from_str("baejbeicqtpe5si4qvbffs2s7vtbk5ccbsfg6owmpidfj3zeluqz4hlnz6m").unwrap(), // cspell:disable-line ); let event_id_str = multibase::encode(Base::Base16Lower, event_id.to_bytes()); + let event_data = "f".to_string(); let mock_interest = MockReconInterestTest::new(); let mut mock_model = MockReconModelTest::new(); mock_model .expect_insert() - .with(predicate::eq(event_id)) + .with( + predicate::eq(event_id), + predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let server = Server::new(peer_id, network, mock_interest, mock_model); let resp = server .events_post( models::Event { event_id: event_id_str, - event_data: "f".to_string(), + event_data, }, &Context, ) @@ -461,16 +452,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -524,16 +518,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -587,16 +584,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() @@ -650,16 +650,19 @@ mod tests { let mut mock_interest = MockReconInterestTest::new(); mock_interest .expect_insert() - .with(predicate::eq( - Interest::builder() - .with_sort_key("model") - .with_peer_id(&peer_id) - .with_range((start.as_slice(), end.as_slice())) - .with_not_after(0) - .build(), - )) + .with( + predicate::eq( + Interest::builder() + .with_sort_key("model") + .with_peer_id(&peer_id) + .with_range((start.as_slice(), end.as_slice())) + .with_not_after(0) + .build(), + ), + predicate::eq(None), + ) .times(1) - .returning(|_| Ok(())); + .returning(|_, _| Ok(())); let mut mock_model = MockReconModelTest::new(); mock_model .expect_range() diff --git a/core/src/lib.rs b/core/src/lib.rs index 7069a1342..1e5bd0ade 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -20,7 +20,7 @@ pub use jws::{Jws, JwsSignature}; pub use network::Network; pub use range::RangeOpen; pub use signer::{JwkSigner, Signer}; -pub use sql::SqlitePool; +pub use sql::{DbTx, SqlitePool}; pub use stream_id::{StreamId, StreamIdType}; pub use cid::Cid; diff --git a/core/src/sql.rs b/core/src/sql.rs index d81ef3de2..b97712d48 100644 --- a/core/src/sql.rs +++ b/core/src/sql.rs @@ -1,6 +1,12 @@ use std::{path::Path, str::FromStr}; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; +use sqlx::{ + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, + Sqlite, Transaction, +}; + +/// A trivial wrapper around a sqlx Sqlite database transaction +pub type DbTx<'a> = Transaction<'a, Sqlite>; #[derive(Clone, Debug)] /// The sqlite pool is split into a writer and a reader pool. @@ -19,16 +25,20 @@ impl SqlitePool { // A few ideas: number of RO connections, synchronize = NORMAL, mmap_size, temp_store = memory let conn_opts = SqliteConnectOptions::from_str(&db_path)? .journal_mode(SqliteJournalMode::Wal) + .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) .create_if_missing(true) .optimize_on_close(true, None); let ro_opts = conn_opts.clone().read_only(true); let writer = SqlitePoolOptions::new() + .min_connections(1) .max_connections(1) + .acquire_timeout(std::time::Duration::from_secs(1)) .connect_with(conn_opts) .await?; let reader = SqlitePoolOptions::new() + .min_connections(1) .max_connections(8) .connect_with(ro_opts) .await?; @@ -37,10 +47,17 @@ impl SqlitePool { } /// Get a reference to the writer database pool. The writer pool has only one connection. + /// If you are going to do multiple writes in a row, instead use `tx` and `commit`. pub fn writer(&self) -> &sqlx::SqlitePool { &self.writer } + /// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock. + /// Use this method to perform simultaneous writes to the database, calling `commit` when you are done. + pub async fn tx(&self) -> anyhow::Result { + Ok(self.writer.begin().await?) + } + /// Get a reference to the reader database pool. The reader pool has many connections. pub fn reader(&self) -> &sqlx::SqlitePool { &self.reader diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 8c685d776..06c08ebf3 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1247,7 +1247,7 @@ mod tests { type Key = K; type Hash = Sha256a; - async fn insert(&self, _key: Self::Key) -> Result<()> { + async fn insert(&self, _key: Self::Key, _value: Option>) -> Result<()> { unreachable!() } @@ -1268,9 +1268,7 @@ mod tests { async fn value_for_key(&self, _key: Self::Key) -> Result>> { Ok(None) } - async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> { - Ok(()) - } + async fn interests(&self) -> Result>> { unreachable!() } diff --git a/recon/src/client.rs b/recon/src/client.rs index e7a17399f..d287203f0 100644 --- a/recon/src/client.rs +++ b/recon/src/client.rs @@ -7,7 +7,7 @@ use tokio::sync::{ use tracing::warn; use crate::{ - recon::{Range, SyncState}, + recon::{Range, ReconItem, SyncState}, AssociativeHash, InterestProvider, Key, Metrics, Recon, Store, }; @@ -24,9 +24,11 @@ where H: AssociativeHash, { /// Sends an insert request to the server and awaits the response. - pub async fn insert(&self, key: K) -> Result { + pub async fn insert(&self, key: K, value: Option>) -> Result { let (ret, rx) = oneshot::channel(); - self.sender.send(Request::Insert { key, ret }).await?; + self.sender + .send(Request::Insert { key, value, ret }) + .await?; rx.await? } @@ -77,19 +79,6 @@ where rx.await? } - /// Store the value associated with a key so we can sync it later. - pub async fn store_value_for_key(&self, key: K, value: &[u8]) -> Result<()> { - let (ret, rx) = oneshot::channel(); - self.sender - .send(Request::StoreValueForKey { - key, - value: value.to_vec(), - ret, - }) - .await?; - rx.await? - } - /// Report the local nodes interests. pub async fn interests(&self) -> Result>> { let (ret, rx) = oneshot::channel(); @@ -133,6 +122,7 @@ where enum Request { Insert { key: K, + value: Option>, ret: oneshot::Sender>, }, Len { @@ -152,11 +142,6 @@ enum Request { key: K, ret: oneshot::Sender>>>, }, - StoreValueForKey { - key: K, - value: Vec, - ret: oneshot::Sender>, - }, Interests { ret: oneshot::Sender>>>, }, @@ -227,8 +212,12 @@ where let request = self.requests.recv().await; if let Some(request) = request { match request { - Request::Insert { key, ret } => { - send(ret, self.recon.insert(&key).await); + Request::Insert { key, value, ret } => { + let val = self + .recon + .insert(ReconItem::new(&key, value.as_deref())) + .await; + send(ret, val); } Request::Len { ret } => { send(ret, self.recon.len().await); @@ -254,10 +243,6 @@ where let value = self.recon.value_for_key(key).await; send(ret, value); } - Request::StoreValueForKey { key, value, ret } => { - let ok = self.recon.store_value_for_key(key, value).await; - send(ret, ok); - } Request::Interests { ret } => { let value = self.recon.interests().await; send(ret, value); diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs index c110af591..5bfbfdc27 100644 --- a/recon/src/metrics.rs +++ b/recon/src/metrics.rs @@ -216,10 +216,12 @@ impl Metrics { } } -pub(crate) struct KeyInsertEvent; +pub(crate) struct KeyInsertEvent { + pub(crate) cnt: u64, +} impl Recorder for Metrics { - fn record(&self, _event: &KeyInsertEvent) { - self.key_insert_count.inc(); + fn record(&self, event: &KeyInsertEvent) { + self.key_insert_count.inc_by(event.cnt); } } @@ -237,10 +239,12 @@ impl Recorder for Metrics { } } -pub(crate) struct ValueInsertEvent; +pub(crate) struct ValueInsertEvent { + pub(crate) cnt: u64, +} impl Recorder for Metrics { - fn record(&self, _event: &ValueInsertEvent) { - self.value_insert_count.inc(); + fn record(&self, event: &ValueInsertEvent) { + self.value_insert_count.inc_by(event.cnt); } } diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index c4d43bdb5..f3eb58496 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -678,11 +678,11 @@ where Ok(()) } async fn process_value_response(&mut self, key: R::Key, value: Vec) -> Result<()> { - self.recon.insert(key.clone()).await.context("store key")?; self.recon - .store_value_for_key(key, &value) + .insert(key, Some(value)) .await - .context("store value for key") + .context("process value response")?; + Ok(()) } // The remote is missing all keys in the range send them over. async fn process_remote_missing_range(&mut self, range: &Range) -> Result<()> { @@ -807,7 +807,7 @@ pub trait Recon: Clone + Send + Sync + 'static { type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>; /// Insert a new key into the key space. - async fn insert(&self, key: Self::Key) -> Result<()>; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()>; /// Get all keys in the specified range async fn range( @@ -829,9 +829,6 @@ pub trait Recon: Clone + Send + Sync + 'static { /// retrieve a value associated with a recon key async fn value_for_key(&self, key: Self::Key) -> Result>>; - /// associate a value with a recon key - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>; - /// Reports the interests of this recon instance async fn interests(&self) -> Result>>; @@ -866,8 +863,8 @@ where type Key = K; type Hash = H; - async fn insert(&self, key: Self::Key) -> Result<()> { - let _ = Client::insert(self, key).await?; + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + let _ = Client::insert(self, key, value).await?; Ok(()) } @@ -892,9 +889,7 @@ where async fn value_for_key(&self, key: Self::Key) -> Result>> { Client::value_for_key(self, key).await } - async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> { - Client::store_value_for_key(self, key, value).await - } + async fn interests(&self) -> Result>> { Client::interests(self).await } diff --git a/recon/src/recon.rs b/recon/src/recon.rs index c3cb79359..c48e1b568 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -98,12 +98,31 @@ where /// Reports any new keys and what the range indicates about how the local and remote node are /// synchronized. pub async fn process_range(&mut self, range: Range) -> Result<(SyncState, Vec)> { + let mut should_add = Vec::with_capacity(2); let mut new_keys = Vec::with_capacity(2); - if !range.first.is_fencepost() && self.insert(&range.first).await? { - new_keys.push(range.first.clone()); + + if !range.first.is_fencepost() { + should_add.push(range.first.clone()); + } + + if !range.last.is_fencepost() { + should_add.push(range.last.clone()); } - if !range.last.is_fencepost() && self.insert(&range.last).await? { - new_keys.push(range.last.clone()); + + if !should_add.is_empty() { + let new = self + .insert_many(should_add.iter().map(|key| ReconItem::new_key(key))) + .await?; + debug_assert_eq!( + new.len(), + should_add.len(), + "new and should_add must be same length" + ); + for (idx, key) in should_add.into_iter().enumerate() { + if new[idx] { + new_keys.push(key); + } + } } let calculated_hash = self.store.hash_range(&range.first, &range.last).await?; @@ -224,31 +243,40 @@ where self.store.value_for_key(&key).await } - /// Associate a value with a recon key - pub async fn store_value_for_key(&mut self, key: K, value: Vec) -> Result<()> { - if self.store.store_value_for_key(&key, &value).await? { - self.metrics.record(&ValueInsertEvent); - } - Ok(()) - } + /// Insert key into the key space. Includes an optional value. + /// Returns a boolean (true) indicating if the key was new. + pub async fn insert(&mut self, item: ReconItem<'_, K>) -> Result { + let new_val = item.value.is_some(); + let new = self.store.insert(item).await?; - /// Insert a new key into the key space. - /// Returns true if the key did not previously exist. - pub async fn insert(&mut self, key: &K) -> Result { - let new_key = self.store.insert(key).await?; - if new_key { - self.metrics.record(&KeyInsertEvent); + if new { + self.metrics.record(&KeyInsertEvent { cnt: 1 }); } - Ok(new_key) + if new_val { + self.metrics.record(&ValueInsertEvent { cnt: 1 }); + } + + Ok(new) } - /// Insert many keys into the key space. - pub async fn insert_many<'a, IT>(&mut self, keys: IT) -> Result + /// Insert many keys into the key space. Includes an optional value for each key. + /// Returns an array with a boolean for each key indicating if the key was new. + /// The order is the same as the order of the keys. True means new, false means not new. + pub async fn insert_many<'a, IT>(&mut self, items: IT) -> Result> where - IT: Iterator + Send, + IT: ExactSizeIterator> + Send + Sync, { - let new_key = self.store.insert_many(keys).await?; - Ok(new_key) + let result = self.store.insert_many(items).await?; + let key_cnt = result.keys.iter().filter(|k| **k).count(); + + self.metrics.record(&KeyInsertEvent { + cnt: key_cnt as u64, + }); + self.metrics.record(&ValueInsertEvent { + cnt: result.value_count as u64, + }); + + Ok(result.keys) } /// Reports total number of keys @@ -329,6 +357,57 @@ impl From for HashCount { } } +#[derive(Clone, Debug)] +pub struct ReconItem<'a, K> +where + K: Key, +{ + pub key: &'a K, + pub value: Option<&'a [u8]>, +} + +impl<'a, K> ReconItem<'a, K> +where + K: Key, +{ + pub fn new(key: &'a K, value: Option<&'a [u8]>) -> Self { + Self { key, value } + } + + pub fn new_key(key: &'a K) -> Self { + Self { key, value: None } + } + + pub fn new_with_value(key: &'a K, value: &'a [u8]) -> Self { + Self { + key, + value: Some(value), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct InsertResult { + /// A true/false list indicating whether or not the key was new. + /// It is in the same order as the input list of keys. + pub keys: Vec, + pub value_count: usize, +} + +impl InsertResult { + pub fn new(new_keys: Vec, value_count: usize) -> Self { + Self { + keys: new_keys, + value_count, + } + } + + /// true if any key is new, false otherwise + pub fn included_new_key(&self) -> bool { + self.keys.iter().any(|new| *new) + } +} + /// Store defines the API needed to store the Recon set. #[async_trait] pub trait Store: std::fmt::Debug { @@ -337,22 +416,16 @@ pub trait Store: std::fmt::Debug { /// Type of the AssociativeHash to compute over keys. type Hash: AssociativeHash; - /// Insert a new key into the key space. - /// Returns true if the key did not previously exist. - async fn insert(&mut self, key: &Self::Key) -> Result; + /// Insert a new key into the key space. Returns true if the key did not exist. + /// The value will be updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result; /// Insert new keys into the key space. - /// Returns true if a key did not previously exist. - async fn insert_many<'a, I>(&mut self, keys: I) -> Result + /// Returns true for each key if it did not previously exist, in the + /// same order as the input iterator. + async fn insert_many<'a, I>(&mut self, items: I) -> Result where - I: Iterator + Send, - { - let mut new = false; - for key in keys { - new |= self.insert(key).await?; - } - Ok(new) - } + I: ExactSizeIterator> + Send + Sync; /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. @@ -471,12 +544,6 @@ pub trait Store: std::fmt::Debug { Ok(self.len().await? == 0) } - /// store_value_for_key returns - /// Ok(true) if stored, - /// Ok(false) if already present, and - /// Err(e) if store failed. - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result; - /// value_for_key returns /// Ok(Some(value)) if stored, /// Ok(None) if not stored, and diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index 68243a012..14430cf4e 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -2,9 +2,9 @@ use anyhow::Result; use async_trait::async_trait; use std::{collections::BTreeMap, ops::Bound}; -use crate::recon::{AssociativeHash, Key, MaybeHashedKey, Store}; +use crate::recon::{AssociativeHash, Key, MaybeHashedKey, ReconItem, Store}; -use super::HashCount; +use super::{HashCount, InsertResult}; /// An implementation of a Store that stores keys in an in-memory BTree #[derive(Clone, Debug)] @@ -115,8 +115,31 @@ where type Key = K; type Hash = H; - async fn insert(&mut self, key: &Self::Key) -> Result { - Ok(self.keys.insert(key.to_owned(), H::digest(key)).is_none()) + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let new = self + .keys + .insert(item.key.clone(), H::digest(item.key)) + .is_none(); + + if let Some(val) = item.value { + self.values.insert(item.key.clone(), val.to_vec()); + } + Ok(new) + } + + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + let mut new = vec![false; items.len()]; + let mut new_val_cnt = 0; + for (idx, item) in items.enumerate() { + if item.value.is_some() { + new_val_cnt += 1; + } + new[idx] = self.insert(item).await?; + } + Ok(InsertResult::new(new, new_val_cnt)) } async fn hash_range( @@ -179,10 +202,6 @@ where } } - /// store_value_for_key returns Some(true) is inserting, Some(false) if present, and Err if store failed. - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - Ok(self.values.insert(key.clone(), value.to_vec()).is_none()) - } /// value_for_key returns an Error is retrieving failed and None if the key is not stored. async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { Ok(self.values.get(key).cloned()) diff --git a/recon/src/recon/sqlitestore.rs b/recon/src/recon/sqlitestore.rs index 6994b8405..d3098366b 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/recon/src/recon/sqlitestore.rs @@ -1,10 +1,10 @@ #![warn(missing_docs, missing_debug_implementations, clippy::all)] -use super::HashCount; +use super::{HashCount, InsertResult, ReconItem}; use crate::{AssociativeHash, Key, Store}; use anyhow::Result; use async_trait::async_trait; -use ceramic_core::SqlitePool; +use ceramic_core::{DbTx, SqlitePool}; use sqlx::Row; use std::marker::PhantomData; use std::result::Result::Ok; @@ -49,6 +49,7 @@ where { /// Initialize the recon table. async fn create_table_if_not_exists(&mut self) -> Result<()> { + // Do we want to remove CID and block_retrieved from the table? const CREATE_RECON_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon ( sort_key TEXT, -- the field in the event header to sort by e.g. model key BLOB, -- network_id sort_value controller StreamID height event_cid @@ -61,47 +62,84 @@ where ahash_6 INTEGER, ahash_7 INTEGER, CID TEXT, - value BLOB, block_retrieved BOOL, -- indicates if we still want the block PRIMARY KEY(sort_key, key) )"; - sqlx::query(CREATE_RECON_TABLE) - .execute(self.pool.writer()) + const CREATE_RECON_VALUE_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon_value ( + sort_key TEXT, + key BLOB, + value BLOB, + PRIMARY KEY(sort_key, key) + )"; + + let mut tx = self.pool.tx().await?; + sqlx::query(CREATE_RECON_TABLE).execute(&mut *tx).await?; + sqlx::query(CREATE_RECON_VALUE_TABLE) + .execute(&mut *tx) .await?; + tx.commit().await?; Ok(()) } -} -#[async_trait] -impl Store for SQLiteStore -where - K: Key, - H: AssociativeHash, -{ - type Key = K; - type Hash = H; + /// returns (new_key, new_val) tuple + async fn insert_item_int( + &mut self, + item: &ReconItem<'_, K>, + conn: &mut DbTx<'_>, + ) -> Result<(bool, bool)> { + // we insert the value first as it's possible we already have the key and can skip that step + // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again + if let Some(val) = item.value { + if self.insert_value_int(item.key, val, conn).await? { + return Ok((false, true)); + } + } + let new_key = self.insert_key_int(item.key, conn).await?; + Ok((new_key, item.value.is_some())) + } - // Ok(true): inserted the key - // Ok(false): did not insert the key ConstraintViolation - // Err(e): sql error - #[instrument(skip(self))] - async fn insert(&mut self, key: &Self::Key) -> Result { - let query = sqlx::query( + /// returns true if the key already exists in the recon table + async fn insert_value_int(&mut self, key: &K, val: &[u8], conn: &mut DbTx<'_>) -> Result { + let value_insert = sqlx::query( + r#"INSERT INTO recon_value (value, sort_key, key) + VALUES (?, ?, ?) + ON CONFLICT (sort_key, key) DO UPDATE + SET value=excluded.value + RETURNING + EXISTS(select 1 from recon where sort_key=? and key=?)"#, + ); + + let resp = value_insert + .bind(val) + .bind(&self.sort_key) + .bind(key.as_bytes()) + .bind(&self.sort_key) + .bind(key.as_bytes()) + .fetch_one(&mut **conn) + .await?; + + let v = resp.get::<'_, bool, _>(0); + Ok(v) + } + + async fn insert_key_int(&mut self, key: &K, conn: &mut DbTx<'_>) -> Result { + let key_insert = sqlx::query( "INSERT INTO recon ( - sort_key, key, - ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7, - block_retrieved - ) VALUES ( - ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ? - );", + sort_key, key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7, + block_retrieved + ) VALUES ( + ?, ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ? + );", ); + let hash = H::digest(key); - let resp = query + let resp = key_insert .bind(&self.sort_key) .bind(key.as_bytes()) .bind(hash.as_u32s()[0]) @@ -113,7 +151,7 @@ where .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) .bind(false) - .fetch_all(self.pool.writer()) + .execute(&mut **conn) .await; match resp { std::result::Result::Ok(_rows) => Ok(true), @@ -127,6 +165,50 @@ where Err(err) => Err(err.into()), } } +} + +#[async_trait] +impl Store for SQLiteStore +where + K: Key, + H: AssociativeHash, +{ + type Key = K; + type Hash = H; + + /// Returns true if the key was new. The value is always updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let mut tx = self.pool.writer().begin().await?; + let (new_key, _new_val) = self.insert_item_int(&item, &mut tx).await?; + tx.commit().await?; + Ok(new_key) + } + + /// Insert new keys into the key space. + /// Returns true if a key did not previously exist. + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + match items.len() { + 0 => Ok(InsertResult::new(vec![], 0)), + _ => { + let mut results = vec![false; items.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await?; + + for (idx, item) in items.enumerate() { + let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; + results[idx] = new_key; + if new_val { + new_val_cnt += 1; + } + } + tx.commit().await?; + Ok(InsertResult::new(results, new_val_cnt)) + } + } + } /// return the hash and count for a range #[instrument(skip(self))] @@ -359,21 +441,9 @@ where } } - #[instrument(skip(self))] - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - let query = sqlx::query("UPDATE recon SET value=? WHERE sort_key=? AND key=?;"); - query - .bind(value) - .bind(&self.sort_key) - .bind(key.as_bytes()) - .fetch_all(self.pool.writer()) - .await?; - Ok(true) - } - #[instrument(skip(self))] async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { - let query = sqlx::query("SELECT value FROM recon WHERE sort_key=? AND key=?;"); + let query = sqlx::query("SELECT value FROM recon_value WHERE sort_key=? AND key=?;"); let row = query .bind(&self.sort_key) .bind(key.as_bytes()) @@ -387,6 +457,7 @@ where mod tests { use super::*; + use crate::recon::ReconItem; use crate::tests::AlphaNumBytes; use crate::Sha256a; @@ -403,8 +474,14 @@ mod tests { #[test(tokio::test)] async fn test_hash_range_query() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); let hash: Sha256a = store .hash_range(&b"a".as_slice().into(), &b"z".as_slice().into()) .await @@ -417,8 +494,14 @@ mod tests { #[test(tokio::test)] async fn test_range_query() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); let ids = store .range( &b"a".as_slice().into(), @@ -453,7 +536,11 @@ mod tests { ) "# ] - .assert_debug_eq(&store.insert(&AlphaNumBytes::from("hello")).await); + .assert_debug_eq( + &store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await, + ); // reject the second insert of same key expect![ @@ -463,14 +550,24 @@ mod tests { ) "# ] - .assert_debug_eq(&store.insert(&AlphaNumBytes::from("hello")).await); + .assert_debug_eq( + &store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await, + ); } #[test(tokio::test)] async fn test_first_and_last() { let mut store = new_store().await; - store.insert(&AlphaNumBytes::from("hello")).await.unwrap(); - store.insert(&AlphaNumBytes::from("world")).await.unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .await + .unwrap(); // Only one key in range let ret = store @@ -526,9 +623,8 @@ mod tests { let mut store = new_store().await; let key = AlphaNumBytes::from("hello"); let store_value = AlphaNumBytes::from("world"); - store.insert(&key).await.unwrap(); store - .store_value_for_key(&key, store_value.as_slice()) + .insert(ReconItem::new_with_value(&key, store_value.as_slice())) .await .unwrap(); let value = store.value_for_key(&key).await.unwrap().unwrap(); diff --git a/recon/src/recon/store_metrics.rs b/recon/src/recon/store_metrics.rs index caa6c7618..be96b2467 100644 --- a/recon/src/recon/store_metrics.rs +++ b/recon/src/recon/store_metrics.rs @@ -6,6 +6,8 @@ use tokio::time::Instant; use crate::{metrics::StoreQuery, recon::HashCount, AssociativeHash, Key, Metrics, Store}; +use super::{InsertResult, ReconItem}; + /// Implement the Store and record metrics #[derive(Debug)] pub struct StoreMetricsMiddleware { @@ -40,18 +42,19 @@ where type Key = K; type Hash = H; - async fn insert(&mut self, key: &Self::Key) -> Result { - StoreMetricsMiddleware::::record(self.metrics.clone(), "insert", self.store.insert(key)) + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + StoreMetricsMiddleware::::record(self.metrics.clone(), "insert", self.store.insert(item)) .await } - async fn insert_many<'a, I>(&mut self, keys: I) -> Result + + async fn insert_many<'a, I>(&mut self, items: I) -> Result where - I: Iterator + Send, + I: ExactSizeIterator> + Send + Sync, { StoreMetricsMiddleware::::record( self.metrics.clone(), "insert_many", - self.store.insert_many(keys), + self.store.insert_many(items), ) .await } @@ -165,15 +168,6 @@ where .await } - async fn store_value_for_key(&mut self, key: &Self::Key, value: &[u8]) -> Result { - StoreMetricsMiddleware::::record( - self.metrics.clone(), - "store_value_for_key", - self.store.store_value_for_key(key, value), - ) - .await - } - async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { StoreMetricsMiddleware::::record( self.metrics.clone(), diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index 7c482e51c..db5ad52e0 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -40,7 +40,7 @@ use pretty::{Arena, DocAllocator, DocBuilder, Pretty}; use crate::{ protocol::{self, InitiatorMessage, ResponderMessage, ValueResponse}, - recon::{FullInterests, HashCount, InterestProvider, Range}, + recon::{FullInterests, HashCount, InterestProvider, Range, ReconItem}, tests::AlphaNumBytes, AssociativeHash, BTreeStore, Client, Key, Metrics, Recon, Server, Sha256a, Store, }; @@ -569,10 +569,12 @@ async fn word_lists() { ); for key in s.split([' ', '\n']).map(|s| s.to_string()) { if !s.is_empty() { - r.insert(&key.as_bytes().into()).await.unwrap(); - r.store_value_for_key(key.as_bytes().into(), key.to_uppercase().as_bytes().into()) - .await - .unwrap(); + r.insert(ReconItem::new( + &key.as_bytes().into(), + key.to_uppercase().as_bytes().into(), + )) + .await + .unwrap(); } } start_recon(r) @@ -1300,10 +1302,10 @@ async fn disjoint() { cat: [a: A, b: B, c: C, e: , f: , g: ] -> value_req(e) cat: [a: A, b: B, c: C, e: , f: , g: ] - <- value_resp(e: E) - dog: [a: A, b: B, c: C, e: E, f: F, g: G] -> value_req(f) cat: [a: A, b: B, c: C, e: , f: , g: ] + <- value_resp(e: E) + dog: [a: A, b: B, c: C, e: E, f: F, g: G] -> value_req(g) cat: [a: A, b: B, c: C, e: E, f: , g: ] <- value_resp(f: F) @@ -1480,10 +1482,10 @@ async fn paper() { cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU] <- range_resp({gnu h(hog)#1 𝛀 }) dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] - -> listen_only - cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU, hog: HOG] <- value_resp(doe: DOE) dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] + -> listen_only + cat: [ape: APE, bee: BEE, cot: COT, doe: , eel: EEL, fox: FOX, gnu: GNU, hog: HOG] <- listen_only dog: [ape: APE, bee: BEE, cot: COT, doe: DOE, eel: EEL, fox: FOX, gnu: GNU, hog: HOG] -> finished @@ -1727,18 +1729,18 @@ async fn alternating() { cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] -> value_req(u) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] + <- value_resp(u: U) + dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> value_req(w) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: , v: V, w: , x: X, y: , z: Z] - <- value_resp(u: U) + <- value_resp(w: W) dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> value_req(y) cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: , x: X, y: , z: Z] - <- value_resp(w: W) + <- value_resp(y: Y) dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> listen_only cat: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: , z: Z] - <- value_resp(y: Y) - dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] <- listen_only dog: [a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, o: O, p: P, q: Q, r: R, s: S, t: T, u: U, v: V, w: W, x: X, y: Y, z: Z] -> finished @@ -1957,10 +1959,10 @@ async fn subset_interest() { dog: [b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, r: ] -> value_req(i) cat: [b: , c: C, e: , f: F, g: G, i: , m: , n: N, r: R] - -> value_req(m) - cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] <- range_resp({c h(d)#1 e}) dog: [b: B, c: C, d: D, e: E, f: F, g: G, h: H, i: I, j: J, k: K, l: L, m: M, n: N, r: ] + -> value_req(m) + cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] -> value_resp(r: R) cat: [b: , c: C, d: D, e: , f: F, g: G, i: , m: , n: N, r: R] <- range_resp({e 0 f})