From a1be37c835b559b4dd7a6da85d38b637d98a4723 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 14 Jan 2025 10:59:27 -0700 Subject: [PATCH 1/4] fix: update built dep The older version of built does not understand new Cargo.lock files. --- Cargo.lock | 70 ++++++++------------------------------------- metadata/Cargo.toml | 3 +- metadata/build.rs | 8 +----- 3 files changed, 14 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 917a0da6c..1db6193cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1890,11 +1890,10 @@ dependencies = [ [[package]] name = "built" -version = "0.6.1" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b99c4cdc7b2c2364182331055623bdf45254fcb679fea565c40c3c11c101889a" +checksum = "c360505aed52b7ec96a3636c3f039d99103c37d1d9b4f7a8c743d3ea9ffcd03b" dependencies = [ - "cargo-lock", "git2", ] @@ -2009,18 +2008,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cargo-lock" -version = "9.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e11c675378efb449ed3ce8de78d75d0d80542fc98487c26aba28eb3b82feac72" -dependencies = [ - "semver 1.0.23", - "serde", - "toml", - "url", -] - [[package]] name = "cargo-platform" version = "0.1.8" @@ -2438,7 +2425,6 @@ name = "ceramic-metadata" version = "0.47.3" dependencies = [ "built", - "project-root", "serde", ] @@ -4890,11 +4876,11 @@ dependencies = [ [[package]] name = "git2" -version = "0.17.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b989d6a7ca95a362cf2cfc5ad688b3a467be1f87e480b8dad07fee8c79b0044" +checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", "libc", "libgit2-sys", "log", @@ -5318,7 +5304,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -6414,9 +6400,9 @@ checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" [[package]] name = "libgit2-sys" -version = "0.15.2+1.6.4" +version = "0.17.0+1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a80df2e11fb4a61f4ba2ab42dbe7f74468da143f1a75c74e11dee7c813f694fa" +checksum = "10472326a8a6477c3c20a64547b0059e4b0d086869eee31e6d7da728a8eb7224" dependencies = [ "cc", "libc", @@ -8813,12 +8799,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "project-root" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" - [[package]] name = "prometheus-client" version = "0.22.3" @@ -8900,7 +8880,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes 1.7.2", "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -8946,7 +8926,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.90", @@ -10111,15 +10091,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_spanned" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" -dependencies = [ - "serde", -] - [[package]] name = "serde_tokenstream" version = "0.2.2" @@ -11792,26 +11763,11 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.19.15", -] - [[package]] name = "toml_datetime" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -11820,8 +11776,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap 2.5.0", - "serde", - "serde_spanned", "toml_datetime", "winnow 0.5.40", ] @@ -12571,7 +12525,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/metadata/Cargo.toml b/metadata/Cargo.toml index eeda6c890..26b3b588c 100644 --- a/metadata/Cargo.toml +++ b/metadata/Cargo.toml @@ -12,5 +12,4 @@ publish = false serde.workspace = true [build-dependencies] -built = { version = "0.6.0", features = ["git2"] } -project-root = "0.2.2" +built = { version = "0.7", features = ["git2"] } diff --git a/metadata/build.rs b/metadata/build.rs index 3cf7cd4a6..d8f91cb91 100644 --- a/metadata/build.rs +++ b/metadata/build.rs @@ -1,9 +1,3 @@ fn main() { - let mut opts = built::Options::default(); - opts.set_dependencies(true); - - let src = project_root::get_project_root().unwrap(); - let dst = std::path::Path::new(&std::env::var("OUT_DIR").unwrap()).join("built.rs"); - built::write_built_file_with_opts(&opts, src.as_ref(), &dst) - .expect("Failed to acquire build-time information"); + built::write_built_file().expect("Failed to acquire build-time information"); } From f21b8a3e10b616ec19a1349100aaeb4c0f24464d Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 14 Jan 2025 11:10:10 -0700 Subject: [PATCH 2/4] chore: make clippy happy about lifetimes --- core/src/peer.rs | 6 +++--- core/src/signer.rs | 2 +- core/src/stream_id.rs | 2 +- event/src/bytes.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/peer.rs b/core/src/peer.rs index 24a0640e6..a300a6137 100644 --- a/core/src/peer.rs +++ b/core/src/peer.rs @@ -184,7 +184,7 @@ pub struct WithId<'a> { node_key: &'a NodeKey, expiration: u64, } -impl<'a> BuilderState for WithId<'a> {} +impl BuilderState for WithId<'_> {} /// Build state where the addresses are known. pub struct WithAddresses<'a> { @@ -192,7 +192,7 @@ pub struct WithAddresses<'a> { expiration: u64, addresses: Vec, } -impl<'a> BuilderState for WithAddresses<'a> {} +impl BuilderState for WithAddresses<'_> {} impl Builder { /// Set the expiration to earliest possible value. @@ -245,7 +245,7 @@ impl<'a> Builder> { } } } -impl<'a> Builder> { +impl Builder> { /// Finish the build producing a [`PeerKey`]. pub fn build(self) -> PeerKey { let entry = PeerEntry::new( diff --git a/core/src/signer.rs b/core/src/signer.rs index c37a3a741..e16eae000 100644 --- a/core/src/signer.rs +++ b/core/src/signer.rs @@ -15,7 +15,7 @@ pub trait Signer { fn sign_jws(&self, payload: &str) -> anyhow::Result; } -impl<'a, S: Signer + Sync> Signer for &'a S { +impl Signer for &'_ S { fn algorithm(&self) -> Algorithm { (*self).algorithm() } diff --git a/core/src/stream_id.rs b/core/src/stream_id.rs index 06de2f368..bb56a5d30 100644 --- a/core/src/stream_id.rs +++ b/core/src/stream_id.rs @@ -226,7 +226,7 @@ impl<'de> Deserialize<'de> for StreamId { struct StreamIdVisitor; -impl<'de> serde::de::Visitor<'de> for StreamIdVisitor { +impl serde::de::Visitor<'_> for StreamIdVisitor { type Value = StreamId; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { diff --git a/event/src/bytes.rs b/event/src/bytes.rs index af019f464..f327588bb 100644 --- a/event/src/bytes.rs +++ b/event/src/bytes.rs @@ -44,7 +44,7 @@ impl<'de> Deserialize<'de> for Bytes { } struct BytesVisitor; -impl<'de> Visitor<'de> for BytesVisitor { +impl Visitor<'_> for BytesVisitor { type Value = Bytes; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { From 636f4c0844e2e71d2f8cbb75d8728108249931cd Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 14 Jan 2025 12:23:17 -0700 Subject: [PATCH 3/4] chore: add missing docs --- event/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/event/src/lib.rs b/event/src/lib.rs index a50a387f3..d6ea63d60 100644 --- a/event/src/lib.rs +++ b/event/src/lib.rs @@ -8,12 +8,14 @@ pub mod unvalidated; pub use ceramic_core::*; +/// Shared testing logic with the crate. #[cfg(test)] pub mod tests { use ceramic_core::DidDocument; use crate::unvalidated::signed::JwkSigner; + /// Pretty print json pub fn to_pretty_json(json_data: &[u8]) -> String { let json: serde_json::Value = match serde_json::from_slice(json_data) { Ok(r) => r, @@ -27,10 +29,12 @@ pub mod tests { serde_json::to_string_pretty(&json).unwrap() } + /// Serialize to pretty json pub fn serialize_to_pretty_json(data: &T) -> String { serde_json::to_string_pretty(data).unwrap() } + /// Construct a signer with a hardcoded private key pub async fn signer() -> JwkSigner { JwkSigner::new( DidDocument::new("did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw"), From 2f28a8075fe116a5f5233bfc65628f89e16c3ed3 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 14 Jan 2025 10:55:08 -0700 Subject: [PATCH 4/4] refactor: add simpler shutdown handling This change adds a shutdown crate that simplifies shutdown handling. The API removes the need to create as many async move blocks and clone broadcast channels. --- Cargo.lock | 13 +++++++ Cargo.toml | 2 ++ anchor-service/Cargo.toml | 3 ++ anchor-service/src/anchor_batch.rs | 34 ++++++------------ api/Cargo.toml | 1 + api/src/server.rs | 7 ++-- api/src/tests.rs | 5 +-- flight/Cargo.toml | 8 +++-- one/Cargo.toml | 1 + one/src/daemon.rs | 55 ++++++++++-------------------- one/src/lib.rs | 9 +++-- shutdown/Cargo.toml | 11 ++++++ shutdown/src/lib.rs | 39 +++++++++++++++++++++ 13 files changed, 115 insertions(+), 73 deletions(-) create mode 100644 shutdown/Cargo.toml create mode 100644 shutdown/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 1db6193cf..db17f490d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2100,6 +2100,7 @@ dependencies = [ "multibase 0.9.1", "multihash-codetable", "serde", + "shutdown", "sqlx", "tokio", "tracing", @@ -2131,6 +2132,7 @@ dependencies = [ "serde", "serde_ipld_dagcbor", "serde_json", + "shutdown", "swagger", "test-log", "tikv-jemalloc-ctl", @@ -2317,11 +2319,13 @@ dependencies = [ "http 1.1.0", "mockall", "object_store", + "shutdown", "test-log", "tokio", "tokio-stream", "tonic 0.12.3", "tracing", + "tracing-subscriber", ] [[package]] @@ -2496,6 +2500,7 @@ dependencies = [ "prometheus-client", "recon", "serde_ipld_dagcbor", + "shutdown", "signal-hook", "signal-hook-tokio", "swagger", @@ -10249,6 +10254,14 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "shutdown" +version = "0.47.3" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "signal-hook" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index 1d5864af5..ba3eb6f78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "peer-svc", "pipeline", "recon", + "shutdown", "sql", "validation", "beetle/iroh-bitswap", @@ -182,6 +183,7 @@ serde_qs = "0.10.1" serde_with = "2.1" sha2 = { version = "0.10", default-features = false } sha3 = "0.10" +shutdown = { path = "./shutdown/" } smallvec = "1.10" # pragma optimize hangs forver on 0.8, possibly due to libsqlite-sys upgrade sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] } diff --git a/anchor-service/Cargo.toml b/anchor-service/Cargo.toml index b86861569..5199c5f48 100644 --- a/anchor-service/Cargo.toml +++ b/anchor-service/Cargo.toml @@ -29,3 +29,6 @@ chrono.workspace = true [features] test-network = [] + +[dev-dependencies] +shutdown.workspace = true diff --git a/anchor-service/src/anchor_batch.rs b/anchor-service/src/anchor_batch.rs index 142726317..283a958b9 100644 --- a/anchor-service/src/anchor_batch.rs +++ b/anchor-service/src/anchor_batch.rs @@ -72,7 +72,7 @@ impl AnchorService { /// - Store the TimeEvents using the AnchorClient /// /// This function will run indefinitely, or until the process is shutdown. - pub async fn run(&mut self, shutdown_signal: impl Future) { + pub async fn run(mut self, shutdown_signal: impl Future) { let shutdown_signal = shutdown_signal.fuse(); pin_mut!(shutdown_signal); @@ -235,7 +235,8 @@ mod tests { use ceramic_core::NodeKey; use ceramic_sql::sqlite::SqlitePool; use expect_test::expect_file; - use tokio::{sync::broadcast, time::sleep}; + use shutdown::Shutdown; + use tokio::time::sleep; use super::AnchorService; use crate::{MockAnchorEventService, MockCas}; @@ -248,7 +249,7 @@ mod tests { let node_id = NodeKey::random().id(); let anchor_interval = Duration::from_millis(5); let anchor_batch_size = 1000000; - let mut anchor_service = AnchorService::new( + let anchor_service = AnchorService::new( tx_manager, event_service.clone(), pool, @@ -256,20 +257,14 @@ mod tests { anchor_interval, anchor_batch_size, ); - let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1); - tokio::spawn(async move { - anchor_service - .run(async move { - let _ = shutdown_signal.recv().await; - }) - .await - }); + let shutdown = Shutdown::new(); + tokio::spawn(anchor_service.run(shutdown.wait_fut())); while event_service.events.lock().unwrap().is_empty() { sleep(Duration::from_millis(1)).await; } expect_file!["./test-data/test_anchor_service_run.txt"] .assert_debug_eq(&event_service.events.lock().unwrap()); - shutdown_signal_tx.send(()).unwrap(); + shutdown.shutdown(); } #[tokio::test] @@ -280,7 +275,7 @@ mod tests { let node_id = NodeKey::random().id(); let anchor_interval = Duration::from_millis(5); let anchor_batch_size = 1000000; - let mut anchor_service = AnchorService::new( + let anchor_service = AnchorService::new( tx_manager, event_service.clone(), pool, @@ -288,20 +283,13 @@ mod tests { anchor_interval, anchor_batch_size, ); - let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1); - // let mut shutdown_signal = shutdown_signal_rx.resubscribe(); - tokio::spawn(async move { - anchor_service - .run(async move { - let _ = shutdown_signal.recv().await; - }) - .await - }); + let shutdown = Shutdown::new(); + tokio::spawn(anchor_service.run(shutdown.wait_fut())); while event_service.events.lock().unwrap().is_empty() { sleep(Duration::from_millis(1)).await; } expect_file!["./test-data/test_anchor_service_run_1.txt"] .assert_debug_eq(&event_service.events.lock().unwrap()); - shutdown_signal_tx.send(()).unwrap(); + shutdown.shutdown(); } } diff --git a/api/Cargo.toml b/api/Cargo.toml index d95b690d8..c84837ff5 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -26,6 +26,7 @@ recon.workspace = true serde.workspace = true serde_ipld_dagcbor.workspace = true serde_json.workspace = true +shutdown.workspace = true swagger.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/api/src/server.rs b/api/src/server.rs index 1f5d4a734..53bf827ed 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -52,6 +52,7 @@ use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFuncti use futures::TryFutureExt; use multiaddr::Protocol; use recon::Key; +use shutdown::Shutdown; use swagger::{ApiError, ByteArray}; #[cfg(not(target_env = "msvc"))] use tikv_jemalloc_ctl::epoch; @@ -401,7 +402,7 @@ where model: Arc, p2p: P, pipeline: Option, - shutdown_signal: broadcast::Receiver<()>, + shutdown_signal: Shutdown, ) -> Self { let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); let event_store = model.clone(); @@ -433,7 +434,7 @@ where event_store: Arc, mut event_rx: tokio::sync::mpsc::Receiver, node_id: NodeId, - mut shutdown_signal: broadcast::Receiver<()>, + shutdown_signal: Shutdown, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS)); @@ -455,7 +456,7 @@ where events.extend(buf); } } - _ = shutdown_signal.recv() => { + _ = shutdown_signal.wait_fut() => { tracing::debug!("Insert many task got shutdown signal"); shutdown = true; } diff --git a/api/src/tests.rs b/api/src/tests.rs index f145a27b2..bf090a1ff 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -32,6 +32,7 @@ use mockall::{mock, predicate}; use multiaddr::Multiaddr; use multibase::Base; use recon::Key; +use shutdown::Shutdown; use test_log::test; use tokio::join; @@ -202,8 +203,8 @@ where M: EventService + 'static, P: P2PService, { - let (_, rx) = tokio::sync::broadcast::channel(1); - Server::new(node_id, network, interest, model, p2p, pipeline, rx) + let shutdown = Shutdown::new(); + Server::new(node_id, network, interest, model, p2p, pipeline, shutdown) } #[test(tokio::test)] diff --git a/flight/Cargo.toml b/flight/Cargo.toml index 46bb40902..30030598a 100644 --- a/flight/Cargo.toml +++ b/flight/Cargo.toml @@ -26,12 +26,14 @@ tracing.workspace = true ceramic-arrow-test.workspace = true ceramic-pipeline.workspace = true expect-test.workspace = true -tokio = { workspace = true, features = ["macros", "rt"] } -test-log.workspace = true http.workspace = true -tokio-stream = { workspace = true, features = ["net"] } mockall.workspace = true object_store.workspace = true +shutdown.workspace = true +test-log.workspace = true +tokio = { workspace = true, features = ["macros", "rt"] } +tokio-stream = { workspace = true, features = ["net"] } +tracing-subscriber.workspace = true [package.metadata.cargo-machete] ignored = [ diff --git a/one/Cargo.toml b/one/Cargo.toml index c84f58acd..b9449cedb 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -51,6 +51,7 @@ object_store.workspace = true prometheus-client.workspace = true recon.workspace = true serde_ipld_dagcbor.workspace = true +shutdown.workspace = true signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } swagger.workspace = true diff --git a/one/src/daemon.rs b/one/src/daemon.rs index f5ac7d9c7..8ce1fe400 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -20,11 +20,11 @@ use clap::Args; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; use recon::{Recon, ReconInterestProvider}; +use shutdown::{Shutdown, ShutdownSignal}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use std::sync::Arc; use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext}; -use tokio::sync::broadcast; use tracing::{debug, error, info, warn}; #[derive(Args, Debug)] @@ -338,14 +338,14 @@ async fn get_eth_rpc_providers( fn spawn_database_optimizer( sqlite_pool: SqlitePool, - mut shutdown: tokio::sync::broadcast::Receiver<()>, + mut shutdown: ShutdownSignal, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut duration = std::time::Duration::from_secs(60 * 60 * 24); // once daily loop { // recreate interval in case it's been shortened due to error tokio::select! { - _ = shutdown.recv() => { + _ = &mut shutdown => { break; } _ = tokio::time::sleep(duration) => { @@ -408,11 +408,11 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { debug!(dir = %opts.p2p_key_dir.display(), "using p2p key directory"); // Setup shutdown signal - let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1); + let shutdown = Shutdown::new(); let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; let handle = signals.handle(); debug!("starting signal handler task"); - let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx)); + let signals_handle = tokio::spawn(handle_signals(signals, shutdown.clone())); // Construct sqlite_pool let sqlite_pool = opts @@ -424,8 +424,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { // spawn (and run) optimize right before we start using the database (e.g. ordering events) info!("running initial sqlite database optimize, this may take quite a while on large databases."); sqlite_pool.optimize(true).await?; - let ss = shutdown_signal.resubscribe(); - Some(spawn_database_optimizer(sqlite_pool.clone(), ss)) + Some(spawn_database_optimizer( + sqlite_pool.clone(), + shutdown.wait_fut(), + )) } else { None }; @@ -436,14 +438,11 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone())); let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone())); let event_validation = opts.event_validation.unwrap_or(true); - let mut ss = shutdown_signal.resubscribe(); let event_svc = Arc::new( EventService::try_new( sqlite_pool.clone(), ceramic_event_svc::UndeliveredEventReview::Process { - shutdown_signal: Box::new(async move { - let _ = ss.recv().await; - }), + shutdown_signal: Box::new(shutdown.wait_fut()), }, event_validation, rpc_providers, @@ -636,14 +635,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { // Start aggregator let aggregator_handle = if opts.aggregator.unwrap_or_default() { - let mut ss = shutdown_signal.resubscribe(); let ctx = ctx.clone(); + let s = shutdown.wait_fut(); Some(tokio::spawn(async move { - if let Err(err) = ceramic_pipeline::aggregator::run(ctx, async move { - let _ = ss.recv().await; - }) - .await - { + if let Err(err) = ceramic_pipeline::aggregator::run(ctx, s).await { error!(%err, "aggregator task failed"); } })) @@ -651,14 +646,9 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { None }; - let mut ss = shutdown_signal.resubscribe(); let pipeline_ctx = ctx.clone(); - let flight_handle = tokio::spawn(async move { - ceramic_flight::server::run(ctx, addr, async move { - let _ = ss.recv().await; - }) - .await - }); + let flight_handle = + tokio::spawn(ceramic_flight::server::run(ctx, addr, shutdown.wait_fut())); (Some(pipeline_ctx), aggregator_handle, Some(flight_handle)) } else { (None, None, None) @@ -679,7 +669,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { Duration::from_secs(opts.anchor_poll_interval), opts.anchor_poll_retry_count, ); - let mut anchor_service = AnchorService::new( + let anchor_service = AnchorService::new( Arc::new(remote_cas), event_svc.clone(), sqlite_pool.clone(), @@ -688,14 +678,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { opts.anchor_batch_size, ); - let mut shutdown_signal = shutdown_signal.resubscribe(); - Some(tokio::spawn(async move { - anchor_service - .run(async move { - let _ = shutdown_signal.recv().await; - }) - .await - })) + Some(tokio::spawn(anchor_service.run(shutdown.wait_fut()))) } else { None }; @@ -708,7 +691,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { Arc::new(model_svc), ipfs.client(), pipeline_ctx, - shutdown_signal.resubscribe(), + shutdown.clone(), ); if opts.authentication { ceramic_server.with_authentication(true); @@ -740,9 +723,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { hyper::server::Server::try_bind(&opts.bind_address.parse()?) .map_err(|e| anyhow!("Failed to bind address: {}. {}", opts.bind_address, e))? .serve(service) - .with_graceful_shutdown(async move { - let _ = shutdown_signal.recv().await; - }) + .with_graceful_shutdown(shutdown.wait_fut()) .await?; debug!("api server finished, starting shutdown..."); diff --git a/one/src/lib.rs b/one/src/lib.rs index a9ccaa0bd..dd68ece39 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -20,10 +20,11 @@ use multibase::Base; use multihash::Multihash; use multihash_codetable::Code; use multihash_derive::Hasher; +use shutdown::Shutdown; use signal_hook_tokio::Signals; use std::str::FromStr; use std::{env, path::PathBuf}; -use tokio::{io::AsyncReadExt, sync::broadcast}; +use tokio::io::AsyncReadExt; use tracing::{debug, error, info, warn}; #[derive(Parser, Debug)] @@ -343,15 +344,13 @@ impl DBOpts { } } -async fn handle_signals(mut signals: Signals, shutdown: broadcast::Sender<()>) { +async fn handle_signals(mut signals: Signals, shutdown: Shutdown) { let mut shutdown = Some(shutdown); while let Some(signal) = signals.next().await { debug!(?signal, "signal received"); if let Some(shutdown) = shutdown.take() { info!("sending shutdown message"); - shutdown - .send(()) - .expect("should be able to send shutdown message"); + shutdown.shutdown(); } } } diff --git a/shutdown/Cargo.toml b/shutdown/Cargo.toml new file mode 100644 index 000000000..fd84fcc77 --- /dev/null +++ b/shutdown/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "shutdown" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +tokio.workspace = true +futures.workspace = true diff --git a/shutdown/src/lib.rs b/shutdown/src/lib.rs new file mode 100644 index 000000000..b5f6d9a7c --- /dev/null +++ b/shutdown/src/lib.rs @@ -0,0 +1,39 @@ +use futures::future::BoxFuture; +use tokio::sync::broadcast; + +/// A shutdown signal is a future that resolve to unit. +pub type ShutdownSignal = BoxFuture<'static, ()>; + +/// Shutdown can be used to signal shutdown across many different tasks. +/// Shutdown is cheaply clonable so it can be shared with as many tasks as needed. +#[derive(Clone)] +pub struct Shutdown { + tx: broadcast::Sender<()>, +} + +impl Default for Shutdown { + fn default() -> Self { + Self::new() + } +} + +impl Shutdown { + pub fn new() -> Self { + let (tx, _rx) = broadcast::channel(1); + Self { tx } + } + /// Signal that all listeners should shutdown. + /// Shutdown can be called from any clone. + pub fn shutdown(&self) { + let _ = self.tx.send(()); + } + /// Construct a future that resolves when the shutdown signal is sent. + /// + /// The future is cancel safe. + pub fn wait_fut(&self) -> ShutdownSignal { + let mut sub = self.tx.subscribe(); + Box::pin(async move { + let _ = sub.recv().await; + }) + } +}