diff --git a/Cargo.lock b/Cargo.lock index 280a9fd5a..aca386582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1141,6 +1141,7 @@ dependencies = [ "serde_bytes", "serde_ipld_dagcbor", "serde_json", + "sqlx", "ssi", "test-log", "tokio", @@ -1311,7 +1312,6 @@ dependencies = [ "serde_repr", "signal-hook", "signal-hook-tokio", - "sqlx", "swagger", "tokio", "tokio-metrics", diff --git a/core/Cargo.toml b/core/Cargo.toml index 371f2034c..1949f78a6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -28,6 +28,7 @@ serde_bytes.workspace = true serde_ipld_dagcbor.workspace = true serde_json.workspace = true ssi.workspace = true +sqlx.workspace = true unsigned-varint.workspace = true [dev-dependencies] diff --git a/core/src/lib.rs b/core/src/lib.rs index 02177d404..7069a1342 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,6 +9,7 @@ mod jws; mod network; mod range; mod signer; +mod sql; mod stream_id; pub use bytes::Bytes; @@ -19,6 +20,7 @@ pub use jws::{Jws, JwsSignature}; pub use network::Network; pub use range::RangeOpen; pub use signer::{JwkSigner, Signer}; +pub use sql::SqlitePool; pub use stream_id::{StreamId, StreamIdType}; pub use cid::Cid; diff --git a/core/src/sql.rs b/core/src/sql.rs new file mode 100644 index 000000000..0b76ef91b --- /dev/null +++ b/core/src/sql.rs @@ -0,0 +1,47 @@ +use std::{path::Path, str::FromStr}; + +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; + +#[derive(Clone, Debug)] +/// The sqlite pool is split into a writer and a reader pool. +/// Wrapper around the sqlx::SqlitePool +pub struct SqlitePool { + writer: sqlx::SqlitePool, + reader: sqlx::SqlitePool, +} + +impl SqlitePool { + /// Connect to the sqlite database at the given path. Creates the database if it does not exist. + /// Uses WAL journal mode. + pub async fn connect(path: impl AsRef) -> anyhow::Result { + let db_path = format!("sqlite:{}", path.as_ref().display()); + let conn_opts = SqliteConnectOptions::from_str(&db_path)? + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true) + // .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) // normally enough in WAL mode? + .optimize_on_close(true, None); + + let ro_opts = conn_opts.clone().read_only(true); + + let writer = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(conn_opts) + .await?; + let reader = SqlitePoolOptions::new() + .max_connections(32) //TODO + .connect_with(ro_opts) + .await?; + + Ok(Self { writer, reader }) + } + + /// Get a reference to the writer database pool. The writer pool has only one connection. + pub fn writer(&self) -> &sqlx::SqlitePool { + &self.writer + } + + /// Get a reference to the reader database pool. The reader pool has many connections. + pub fn reader(&self) -> &sqlx::SqlitePool { + &self.reader + } +} diff --git a/one/Cargo.toml b/one/Cargo.toml index ae857e206..6665756e4 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -44,7 +44,6 @@ serde_json = "1" serde_repr = "0.1" signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } -sqlx.workspace = true swagger.workspace = true tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" diff --git a/one/src/events.rs b/one/src/events.rs index ced490293..ee4b66fe4 100644 --- a/one/src/events.rs +++ b/one/src/events.rs @@ -1,10 +1,10 @@ use anyhow::Result; +use ceramic_core::SqlitePool; use ceramic_p2p::SQLiteBlockStore; use chrono::{SecondsFormat, Utc}; use cid::{multibase, multihash, Cid}; use clap::{Args, Subcommand}; use glob::{glob, Paths}; -use sqlx::sqlite::SqlitePool; use std::{fs, path::PathBuf}; #[derive(Subcommand, Debug)] @@ -47,14 +47,7 @@ async fn slurp(opts: SlurpOpts) -> Result<()> { output_ceramic_path.display() ); - let pool: sqlx::Pool = SqlitePool::connect(&format!( - "sqlite:{}?mode=rwc", - output_ceramic_path - .to_str() - .expect("path should be utf8 compatible") - )) - .await - .unwrap(); + let pool = SqlitePool::connect(output_ceramic_path).await.unwrap(); let store = SQLiteBlockStore::new(pool).await.unwrap(); if let Some(input_ceramic_db) = opts.input_ceramic_db { diff --git a/one/src/lib.rs b/one/src/lib.rs index 1196e2885..fb28306e5 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -6,12 +6,11 @@ mod http; mod metrics; mod network; mod recon_loop; -mod sql; use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration}; use anyhow::{anyhow, Result}; -use ceramic_core::{EventId, Interest, PeerId}; +use ceramic_core::{EventId, Interest, PeerId, SqlitePool}; use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; @@ -426,7 +425,7 @@ impl Daemon { // Connect to sqlite let sql_db_path: PathBuf = dir.join("db.sqlite3"); - let sql_pool = sql::connect(&sql_db_path).await?; + let sql_pool = SqlitePool::connect(&sql_db_path).await?; // Create recon store for interests. let interest_store = InterestStore::new(sql_pool.clone(), "interest".to_string()).await?; diff --git a/one/src/network.rs b/one/src/network.rs index 102a9c6ad..63d5590c0 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use anyhow::Result; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, SqlitePool}; use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService}; use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, SQLiteBlockStore}; use iroh_rpc_client::P2pClient; use iroh_rpc_types::{p2p::P2pAddr, Addr}; use libp2p::identity::Keypair; use recon::{libp2p::Recon, Sha256a}; -use sqlx::SqlitePool; use tokio::task::{self, JoinHandle}; use tracing::{debug, error}; diff --git a/one/src/sql.rs b/one/src/sql.rs deleted file mode 100644 index bf9e31a57..000000000 --- a/one/src/sql.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::path::Path; - -use anyhow::Result; -use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; - -pub async fn connect(path: impl AsRef) -> Result { - let pool = SqlitePoolOptions::new() - // sqlite performs best with a single active connection at a time. - .max_connections(1) - .connect(&format!("sqlite:{}?mode=rwc", path.as_ref().display())) - .await?; - - // set the WAL PRAGMA for faster writes - const SET_WAL_PRAGMA: &str = "PRAGMA journal_mode=wal;"; - sqlx::query(SET_WAL_PRAGMA).execute(&pool).await?; - - Ok(pool) -} diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 83f995f66..7b8133314 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration}; use ahash::AHashMap; use anyhow::{anyhow, bail, Context, Result}; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, SqlitePool}; use ceramic_metrics::{libp2p_metrics, Recorder}; use cid::Cid; use futures_util::stream::StreamExt; @@ -30,7 +30,6 @@ use libp2p::{ swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent}, PeerId, StreamProtocol, Swarm, }; -use sqlx::SqlitePool; use tokio::sync::oneshot::{self, Sender as OneShotSender}; use tokio::task::JoinHandle; use tokio::{ diff --git a/p2p/src/sqliteblockstore.rs b/p2p/src/sqliteblockstore.rs index 7964f4e72..c01bc0c1c 100644 --- a/p2p/src/sqliteblockstore.rs +++ b/p2p/src/sqliteblockstore.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use bytes::Bytes; +use ceramic_core::SqlitePool; use cid::{ multihash::Code::{Keccak256, Sha2_256}, multihash::MultihashDigest, @@ -9,7 +10,7 @@ use cid::{ use futures_util::stream::BoxStream; use iroh_bitswap::{Block, Store}; use multihash::Multihash; -use sqlx::{sqlite::Sqlite, Error, Row, SqlitePool}; +use sqlx::{sqlite::Sqlite, Error, Row}; #[derive(Debug, Clone)] pub struct SQLiteBlockStore { @@ -49,7 +50,7 @@ impl SQLiteBlockStore { ); ", ) - .execute(&self.pool) + .execute(self.pool.writer()) .await?; Ok(()) } @@ -76,13 +77,13 @@ impl SQLiteBlockStore { ) }; let hashes = hashes_query - .fetch_all(&self.pool) + .fetch_all(self.pool.reader()) .await? .into_iter() .map(|row| Multihash::from_bytes(row.get::<'_, &[u8], _>(0))) .collect::, multihash::Error>>()?; let remaining = remaining_query - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await? .get::<'_, i64, _>(0) // Do not count the hashes we just got in the remaining count. @@ -94,7 +95,7 @@ impl SQLiteBlockStore { Ok(Some( sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;") .bind(cid.hash().to_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await? .get::<'_, i64, _>(0) as u64, )) @@ -103,14 +104,14 @@ impl SQLiteBlockStore { pub async fn get(&self, cid: Cid) -> Result> { Ok(sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;") .bind(cid.hash().to_bytes()) - .fetch_optional(&self.pool) + .fetch_optional(self.pool.reader()) .await? .map(|row| row.get::<'_, Vec, _>(0).into())) } pub fn scan(&self) -> BoxStream> { sqlx::query_as::("SELECT multihash, bytes FROM blocks;") - .fetch(&self.pool) + .fetch(self.pool.reader()) } /// Store a DAG node into IPFS. @@ -138,7 +139,7 @@ impl SQLiteBlockStore { match sqlx::query("INSERT INTO blocks (multihash, bytes) VALUES (?, ?)") .bind(cid.hash().to_bytes()) .bind(blob.to_vec()) - .execute(&self.pool) + .execute(self.pool.writer()) .await { Ok(_) => Ok(true), @@ -158,7 +159,7 @@ impl SQLiteBlockStore { ", ) .bind(input_ceramic_db_filename) - .execute(&self.pool) + .execute(self.pool.writer()) .await?; Ok(()) } @@ -167,7 +168,7 @@ impl SQLiteBlockStore { pub async fn backup_to_sqlite(&self, output_ceramic_db_filename: &str) -> Result<()> { sqlx::query(".backup ?") .bind(output_ceramic_db_filename) - .execute(&self.pool) + .execute(self.pool.writer()) .await?; Ok(()) } @@ -182,7 +183,7 @@ impl Store for SQLiteBlockStore { Ok( sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;") .bind(cid.hash().to_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await? .get::<'_, i64, _>(0) as usize, ) @@ -195,7 +196,7 @@ impl Store for SQLiteBlockStore { Ok(Block::new( sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;") .bind(cid.hash().to_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await? .get::<'_, Vec, _>(0) .into(), @@ -210,7 +211,7 @@ impl Store for SQLiteBlockStore { Ok( sqlx::query("SELECT count(1) FROM blocks WHERE multihash = ?;") .bind(cid.hash().to_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await? .get::<'_, i64, _>(0) > 0, @@ -225,10 +226,10 @@ mod tests { use crate::SQLiteBlockStore; use anyhow::Error; use bytes::Bytes; + use ceramic_core::SqlitePool; use cid::{Cid, CidGeneric}; use expect_test::expect; use iroh_bitswap::Store; - use sqlx::SqlitePool; #[tokio::test] async fn test_store_block() { diff --git a/recon/src/recon/sqlitestore.rs b/recon/src/recon/sqlitestore.rs index 1144bccf7..28ef408ac 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/recon/src/recon/sqlitestore.rs @@ -4,7 +4,8 @@ use super::HashCount; use crate::{AssociativeHash, Key, Store}; use anyhow::Result; use async_trait::async_trait; -use sqlx::{Row, SqlitePool}; +use ceramic_core::SqlitePool; +use sqlx::Row; use std::marker::PhantomData; use std::result::Result::Ok; use tracing::{debug, instrument}; @@ -64,7 +65,9 @@ where PRIMARY KEY(sort_key, key) )"; - sqlx::query(CREATE_RECON_TABLE).execute(&self.pool).await?; + sqlx::query(CREATE_RECON_TABLE) + .execute(self.pool.writer()) + .await?; Ok(()) } } @@ -109,7 +112,7 @@ where .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) .bind(false) - .fetch_all(&self.pool) + .fetch_all(self.pool.writer()) .await; match resp { std::result::Result::Ok(_rows) => Ok(true), @@ -151,7 +154,7 @@ where .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await?; let bytes: [u32; 8] = [ row.get(0), @@ -204,7 +207,7 @@ where .bind(right_fencepost.as_bytes()) .bind(limit as i64) .bind(offset as i64) - .fetch_all(&self.pool) + .fetch_all(self.pool.reader()) .await?; debug!(count = rows.len(), "rows"); Ok(Box::new(rows.into_iter().map(|row| { @@ -234,7 +237,7 @@ where .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .fetch_one(&self.pool) + .fetch_one(self.pool.reader()) .await?; Ok(row.get::<'_, i64, _>(0) as usize) } @@ -265,7 +268,7 @@ where .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .fetch_all(&self.pool) + .fetch_all(self.pool.reader()) .await?; Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); @@ -298,7 +301,7 @@ where .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .fetch_all(&self.pool) + .fetch_all(self.pool.reader()) .await?; Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); @@ -344,7 +347,7 @@ where .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .fetch_all(&self.pool) + .fetch_all(self.pool.reader()) .await?; if let Some(row) = rows.first() { let first = K::from(row.get(0));