Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modify recon storage tables, trait and sqlite config to improve throughput #243

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 67 additions & 64 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait Recon: Clone + Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

async fn insert(&self, key: Self::Key) -> Result<()>;
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()>;
async fn range(
&self,
start: Self::Key,
Expand All @@ -47,7 +47,6 @@ pub trait Recon: Clone + Send + Sync {
) -> Result<Vec<Self::Key>>;

async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>>;
async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()>;
}

#[async_trait]
Expand All @@ -59,8 +58,8 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, key: Self::Key) -> Result<()> {
let _ = recon::Client::insert(self, key).await?;
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
let _ = recon::Client::insert(self, key, value).await?;
Ok(())
}

Expand All @@ -78,9 +77,6 @@ where
async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>> {
recon::Client::value_for_key(self, key).await
}
async fn store_value_for_key(&self, key: Self::Key, value: &[u8]) -> Result<()> {
recon::Client::store_value_for_key(self, key, value).await
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -146,13 +142,10 @@ where
let event_id = decode_event_id(&event.event_id)?;
let event_data = decode_event_data(&event.event_data)?;
self.model
.insert(event_id.clone())
.await
.map_err(|err| ApiError(format!("failed to insert key: {err}")))?;
self.model
.store_value_for_key(event_id, &event_data)
.insert(event_id.clone(), Some(event_data))
.await
.map_err(|err| ApiError(format!("failed to insert key: {err}")))?;

Ok(EventsPostResponse::Success)
}

Expand Down Expand Up @@ -240,7 +233,7 @@ where
.with_not_after(0)
.build();
self.interest
.insert(interest)
.insert(interest, None)
.await
.map_err(|err| ApiError(format!("failed to update interest: {err}")))?;

Expand Down Expand Up @@ -297,7 +290,7 @@ mod tests {
struct Context;
mock! {
pub ReconInterestTest {
fn insert(&self, key: Interest) -> Result<()>;
fn insert(&self, key: Interest, value: Option<Vec<u8>>) -> Result<()>;
fn range(
&self,
start: Interest,
Expand All @@ -316,8 +309,8 @@ mod tests {
impl Recon for MockReconInterestTest {
type Key = Interest;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key) -> Result<()> {
self.insert(key)
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
&self,
Expand All @@ -331,14 +324,11 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}
}

mock! {
pub ReconModelTest {
fn insert(&self, key: EventId) -> Result<()>;
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<()>;
fn range(
&self,
start: EventId,
Expand All @@ -356,8 +346,8 @@ mod tests {
impl Recon for MockReconModelTest {
type Key = EventId;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key) -> Result<()> {
self.insert(key)
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
&self,
Expand All @@ -371,9 +361,6 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}
}

#[tokio::test]
Expand All @@ -391,19 +378,23 @@ mod tests {
&Cid::from_str("baejbeicqtpe5si4qvbffs2s7vtbk5ccbsfg6owmpidfj3zeluqz4hlnz6m").unwrap(), // cspell:disable-line
);
let event_id_str = multibase::encode(Base::Base16Lower, event_id.to_bytes());
let event_data = "f".to_string();
let mock_interest = MockReconInterestTest::new();
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_insert()
.with(predicate::eq(event_id))
.with(
predicate::eq(event_id),
predicate::eq(Some(decode_event_data(event_data.as_str()).unwrap())),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let resp = server
.events_post(
models::Event {
event_id: event_id_str,
event_data: "f".to_string(),
event_data,
},
&Context,
)
Expand Down Expand Up @@ -461,16 +452,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -524,16 +518,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -587,16 +584,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down Expand Up @@ -650,16 +650,19 @@ mod tests {
let mut mock_interest = MockReconInterestTest::new();
mock_interest
.expect_insert()
.with(predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
))
.with(
predicate::eq(
Interest::builder()
.with_sort_key("model")
.with_peer_id(&peer_id)
.with_range((start.as_slice(), end.as_slice()))
.with_not_after(0)
.build(),
),
predicate::eq(None),
)
.times(1)
.returning(|_| Ok(()));
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use jws::{Jws, JwsSignature};
pub use network::Network;
pub use range::RangeOpen;
pub use signer::{JwkSigner, Signer};
pub use sql::SqlitePool;
pub use sql::{DbTx, SqlitePool};
pub use stream_id::{StreamId, StreamIdType};

pub use cid::Cid;
Expand Down
19 changes: 18 additions & 1 deletion core/src/sql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::{path::Path, str::FromStr};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
Sqlite, Transaction,
};

/// A trivial wrapper around a sqlx Sqlite database transaction
pub type DbTx<'a> = Transaction<'a, Sqlite>;

#[derive(Clone, Debug)]
/// The sqlite pool is split into a writer and a reader pool.
Expand All @@ -19,16 +25,20 @@ impl SqlitePool {
// A few ideas: number of RO connections, synchronize = NORMAL, mmap_size, temp_store = memory
let conn_opts = SqliteConnectOptions::from_str(&db_path)?
.journal_mode(SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.create_if_missing(true)
.optimize_on_close(true, None);

let ro_opts = conn_opts.clone().read_only(true);

let writer = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(1)
.acquire_timeout(std::time::Duration::from_secs(1))
.connect_with(conn_opts)
.await?;
let reader = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(8)
.connect_with(ro_opts)
.await?;
Expand All @@ -37,10 +47,17 @@ impl SqlitePool {
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
/// If you are going to do multiple writes in a row, instead use `tx` and `commit`.
pub fn writer(&self) -> &sqlx::SqlitePool {
&self.writer
}

/// Get a writer tranaction. The writer pool has only one connection so this is an exclusive lock.
/// Use this method to perform simultaneous writes to the database, calling `commit` when you are done.
pub async fn tx(&self) -> anyhow::Result<DbTx> {
Ok(self.writer.begin().await?)
}

/// Get a reference to the reader database pool. The reader pool has many connections.
pub fn reader(&self) -> &sqlx::SqlitePool {
&self.reader
Expand Down
6 changes: 2 additions & 4 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _key: Self::Key) -> Result<()> {
async fn insert(&self, _key: Self::Key, _value: Option<Vec<u8>>) -> Result<()> {
unreachable!()
}

Expand All @@ -1268,9 +1268,7 @@ mod tests {
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
}
async fn store_value_for_key(&self, _key: Self::Key, _value: &[u8]) -> Result<()> {
Ok(())
}

async fn interests(&self) -> Result<Vec<RangeOpen<Self::Key>>> {
unreachable!()
}
Expand Down
Loading
Loading