Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add simpler shutdown handling #643

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 25 additions & 58 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"peer-svc",
"pipeline",
"recon",
"shutdown",
"sql",
"validation",
"beetle/iroh-bitswap",
Expand Down Expand Up @@ -182,6 +183,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
sha3 = "0.10"
shutdown = { path = "./shutdown/" }
smallvec = "1.10"
# pragma optimize hangs forver on 0.8, possibly due to libsqlite-sys upgrade
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
Expand Down
3 changes: 3 additions & 0 deletions anchor-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ chrono.workspace = true

[features]
test-network = []

[dev-dependencies]
shutdown.workspace = true
34 changes: 11 additions & 23 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AnchorService {
/// - Store the TimeEvents using the AnchorClient
///
/// This function will run indefinitely, or until the process is shutdown.
pub async fn run(&mut self, shutdown_signal: impl Future<Output = ()>) {
pub async fn run(mut self, shutdown_signal: impl Future<Output = ()>) {
let shutdown_signal = shutdown_signal.fuse();
pin_mut!(shutdown_signal);

Expand Down Expand Up @@ -235,7 +235,8 @@ mod tests {
use ceramic_core::NodeKey;
use ceramic_sql::sqlite::SqlitePool;
use expect_test::expect_file;
use tokio::{sync::broadcast, time::sleep};
use shutdown::Shutdown;
use tokio::time::sleep;

use super::AnchorService;
use crate::{MockAnchorEventService, MockCas};
Expand All @@ -248,28 +249,22 @@ mod tests {
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
let anchor_service = AnchorService::new(
tx_manager,
event_service.clone(),
pool,
node_id,
anchor_interval,
anchor_batch_size,
);
let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
tokio::spawn(async move {
anchor_service
.run(async move {
let _ = shutdown_signal.recv().await;
})
.await
});
let shutdown = Shutdown::new();
tokio::spawn(anchor_service.run(shutdown.wait_fut()));
while event_service.events.lock().unwrap().is_empty() {
sleep(Duration::from_millis(1)).await;
}
expect_file!["./test-data/test_anchor_service_run.txt"]
.assert_debug_eq(&event_service.events.lock().unwrap());
shutdown_signal_tx.send(()).unwrap();
shutdown.shutdown();
}

#[tokio::test]
Expand All @@ -280,28 +275,21 @@ mod tests {
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
let anchor_service = AnchorService::new(
tx_manager,
event_service.clone(),
pool,
node_id,
anchor_interval,
anchor_batch_size,
);
let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
// let mut shutdown_signal = shutdown_signal_rx.resubscribe();
tokio::spawn(async move {
anchor_service
.run(async move {
let _ = shutdown_signal.recv().await;
})
.await
});
let shutdown = Shutdown::new();
tokio::spawn(anchor_service.run(shutdown.wait_fut()));
while event_service.events.lock().unwrap().is_empty() {
sleep(Duration::from_millis(1)).await;
}
expect_file!["./test-data/test_anchor_service_run_1.txt"]
.assert_debug_eq(&event_service.events.lock().unwrap());
shutdown_signal_tx.send(()).unwrap();
shutdown.shutdown();
}
}
1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ recon.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
shutdown.workspace = true
swagger.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFuncti
use futures::TryFutureExt;
use multiaddr::Protocol;
use recon::Key;
use shutdown::Shutdown;
use swagger::{ApiError, ByteArray};
#[cfg(not(target_env = "msvc"))]
use tikv_jemalloc_ctl::epoch;
Expand Down Expand Up @@ -401,7 +402,7 @@ where
model: Arc<M>,
p2p: P,
pipeline: Option<SessionContext>,
shutdown_signal: broadcast::Receiver<()>,
shutdown_signal: Shutdown,
) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
let event_store = model.clone();
Expand Down Expand Up @@ -433,7 +434,7 @@ where
event_store: Arc<M>,
mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
node_id: NodeId,
mut shutdown_signal: broadcast::Receiver<()>,
shutdown_signal: Shutdown,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
Expand All @@ -455,7 +456,7 @@ where
events.extend(buf);
}
}
_ = shutdown_signal.recv() => {
_ = shutdown_signal.wait_fut() => {
tracing::debug!("Insert many task got shutdown signal");
shutdown = true;
}
Expand Down
5 changes: 3 additions & 2 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use mockall::{mock, predicate};
use multiaddr::Multiaddr;
use multibase::Base;
use recon::Key;
use shutdown::Shutdown;
use test_log::test;
use tokio::join;

Expand Down Expand Up @@ -202,8 +203,8 @@ where
M: EventService + 'static,
P: P2PService,
{
let (_, rx) = tokio::sync::broadcast::channel(1);
Server::new(node_id, network, interest, model, p2p, pipeline, rx)
let shutdown = Shutdown::new();
Server::new(node_id, network, interest, model, p2p, pipeline, shutdown)
}

#[test(tokio::test)]
Expand Down
Loading
Loading