From ab1d4b865a223a22f2403de9e2ff0707639e746a Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Mon, 13 Jan 2025 19:19:29 +0530 Subject: [PATCH] fix sigterm issue with translator sv --- roles/translator/src/lib/mod.rs | 123 ++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 55 deletions(-) diff --git a/roles/translator/src/lib/mod.rs b/roles/translator/src/lib/mod.rs index ed0f41eae2..5a36d1db38 100644 --- a/roles/translator/src/lib/mod.rs +++ b/roles/translator/src/lib/mod.rs @@ -10,7 +10,8 @@ use std::{ }; use tokio::{ - sync::broadcast, + select, + sync::{broadcast, Notify}, task::{self, AbortHandle}, }; use tracing::{debug, error, info, warn}; @@ -32,6 +33,7 @@ pub mod utils; pub struct TranslatorSv2 { config: ProxyConfig, reconnect_wait_time: u64, + shutdown: Arc, } impl TranslatorSv2 { @@ -41,6 +43,7 @@ impl TranslatorSv2 { Self { config, reconnect_wait_time: wait_time, + shutdown: Arc::new(Notify::new()), } } @@ -58,7 +61,8 @@ impl TranslatorSv2 { let task_collector: Arc>> = Arc::new(Mutex::new(Vec::new())); - self.internal_start( + Self::internal_start( + self.config.clone(), tx_sv1_notify.clone(), target.clone(), tx_status.clone(), @@ -72,74 +76,79 @@ impl TranslatorSv2 { debug!("Starting up status listener"); let wait_time = self.reconnect_wait_time; // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = tokio::select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = tokio::signal::ctrl_c().fuse() => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: Status = task_status.unwrap(); - match task_status.state { - // Should only be sent by the downstream listener - State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; + tokio::spawn({ + let shutdown_signal = self.shutdown(); + async move { + if tokio::signal::ctrl_c().await.is_ok() { + info!("Interrupt received"); + shutdown_signal.notify_one(); } - State::BridgeShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; + } + }); + + loop { + select! { + task_status = rx_status.recv().fuse() => { + if let Ok(task_status_) = task_status { + match task_status_.state { + State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + self.shutdown().notify_one(); + } + State::UpstreamTryReconnect(err) => { + error!("Trying to reconnect the Upstream because of: {}", err); + let task_collector1 = task_collector_.clone(); + let tx_sv1_notify1 = tx_sv1_notify.clone(); + let target = target.clone(); + let tx_status = tx_status.clone(); + let proxy_config = self.config.clone(); + tokio::spawn (async move { + // wait a random amount of time between 0 and 3000ms + // if all the downstreams try to reconnect at the same time, the upstream may + // fail + tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await; + + // kill al the tasks + let task_collector_aborting = task_collector1.clone(); + kill_tasks(task_collector_aborting.clone()); + + warn!("Trying reconnecting to upstream"); + Self::internal_start( + proxy_config, + tx_sv1_notify1, + target.clone(), + tx_status.clone(), + task_collector1, + ) + .await; + }); + } + State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + self.shutdown().notify_one(); + } + } + } else { + info!("Channel closed"); + break; // Channel closed + } } - State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); + _ = self.shutdown.notified() => { + info!("Shutting down gracefully..."); break; } - State::UpstreamTryReconnect(err) => { - error!("Trying to reconnect the Upstream because of: {}", err); - - // wait a random amount of time between 0 and 3000ms - // if all the downstreams try to reconnect at the same time, the upstream may - // fail - tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await; - - // kill al the tasks - let task_collector_aborting = task_collector_.clone(); - kill_tasks(task_collector_aborting.clone()); - - warn!("Trying reconnecting to upstream"); - self.internal_start( - tx_sv1_notify.clone(), - target.clone(), - tx_status.clone(), - task_collector_.clone(), - ) - .await; - } - State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } } } } async fn internal_start( - &self, + proxy_config: ProxyConfig, tx_sv1_notify: broadcast::Sender>, target: Arc>>, tx_status: async_channel::Sender>, task_collector: Arc>>, ) { - let proxy_config = self.config.clone(); // Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream` // (Sender>, Receiver>) let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10); @@ -278,6 +287,10 @@ impl TranslatorSv2 { let _ = task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string()))); } + + pub fn shutdown(&self) -> Arc { + self.shutdown.clone() + } } fn kill_tasks(task_collector: Arc>>) {