diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 467ac52b54..caa13229b7 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -20,7 +20,7 @@ use std::{ str::FromStr, sync::Arc, }; -use tokio::task::AbortHandle; +use tokio::{sync::Notify, task::AbortHandle}; use tracing::{error, info}; @@ -57,111 +57,131 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true); pub struct JobDeclaratorClient { /// Configuration of the proxy server [`JobDeclaratorClient`] is connected to. config: ProxyConfig, + shutdown: Arc, } impl JobDeclaratorClient { pub fn new(config: ProxyConfig) -> Self { - Self { config } + Self { + config, + shutdown: Arc::new(Notify::new()), + } } pub async fn start(self) { let mut upstream_index = 0; - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); // Channel used to manage failed tasks let (tx_status, rx_status) = unbounded(); let task_collector = Arc::new(Mutex::new(vec![])); - let proxy_config = &self.config; + tokio::spawn({ + let shutdown_signal = self.shutdown.clone(); + async move { + if tokio::signal::ctrl_c().await.is_ok() { + info!("Interrupt received"); + shutdown_signal.notify_one(); + } + } + }); - loop { + let proxy_config = self.config; + 'outer: loop { let task_collector = task_collector.clone(); let tx_status = tx_status.clone(); + let proxy_config = proxy_config.clone(); if let Some(upstream) = proxy_config.upstreams.get(upstream_index) { - self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone()) - .await; + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + let upstream = upstream.clone(); + tokio::spawn(async move { + Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await; + }); } else { - self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone()) + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + tokio::spawn(async move { + Self::initialize_jd_as_solo_miner( + proxy_config, + tx_status.clone(), + task_collector.clone(), + ) .await; + }); } // Check all tasks if is_finished() is true, if so exit loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - std::process::exit(0); - } - }; - let task_status: status::Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + select! { + task_status = rx_status.recv().fuse() => { + if let Ok(task_status) = task_status { + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + status::State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamRogue => { - error!("Changin Pool"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + status::State::UpstreamRogue => { + error!("Changing Pool"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + upstream_index += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - upstream_index += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } else { + info!("Received unknown task. Shutting down."); + break 'outer; + } + }, + _ = self.shutdown.notified().fuse() => { + info!("Shutting down gracefully..."); + std::process::exit(0); } - } + }; } } } async fn initialize_jd_as_solo_miner( - &self, + proxy_config: ProxyConfig, tx_status: async_channel::Sender>, task_collector: Arc>>, ) { - let proxy_config = &self.config; let timeout = proxy_config.timeout; - let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap(); + let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap(); // When Downstream receive a share that meets bitcoin target it transformit in a // SubmitSolution and send it to the TemplateReceiver @@ -211,12 +231,11 @@ impl JobDeclaratorClient { } async fn initialize_jd( - &self, + proxy_config: ProxyConfig, tx_status: async_channel::Sender>, task_collector: Arc>>, upstream_config: proxy_config::Upstream, ) { - let proxy_config = &self.config; let timeout = proxy_config.timeout; let test_only_do_not_send_solution_to_tp = proxy_config .test_only_do_not_send_solution_to_tp