Skip to content

Commit

Permalink
-Fix jdc sigterm signalling issue
Browse files Browse the repository at this point in the history
It changes the structuring of the start method, to avoid its blocking nature in case of
disconnection from the upstream. Currently, when we are sending the termination signal, during the
stance of disconnection from upstream, due of blocking nature of initialize_jd it halts the main
thread runtime from executing select block, which listens for any termination signal and channel
responses.

-Modifications
1. Addition of new shutdown field, which can be used later in integration test to terminate the instances
2. Change argument type for methods initialize_jd and initialize_jd_solo, to make them movable.
3. Spawning of blocking process as a separate task
  • Loading branch information
Shourya742 committed Jan 13, 2025
1 parent 7483090 commit b1e6780
Showing 1 changed file with 96 additions and 72 deletions.
168 changes: 96 additions & 72 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
str::FromStr,
sync::Arc,
};
use tokio::task::AbortHandle;
use tokio::{sync::Notify, task::AbortHandle};

use tracing::{error, info};

Expand Down Expand Up @@ -57,111 +57,132 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true);
pub struct JobDeclaratorClient {
/// Configuration of the proxy server [`JobDeclaratorClient`] is connected to.
config: ProxyConfig,
// Used for notifying the [`JobDeclaratorClient`] to shutdown gracefully.
shutdown: Arc<Notify>,
}

impl JobDeclaratorClient {
pub fn new(config: ProxyConfig) -> Self {
Self { config }
Self {
config,
shutdown: Arc::new(Notify::new()),
}
}

pub async fn start(self) {
let mut upstream_index = 0;
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());

// Channel used to manage failed tasks
let (tx_status, rx_status) = unbounded();

let task_collector = Arc::new(Mutex::new(vec![]));

let proxy_config = &self.config;
tokio::spawn({
let shutdown_signal = self.shutdown();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
}
});

loop {
let proxy_config = self.config;
'outer: loop {
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let proxy_config = proxy_config.clone();
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone())
.await;
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
let upstream = upstream.clone();
tokio::spawn(async move {
Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await;
});
} else {
self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone())
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
tokio::spawn(async move {
Self::initialize_jd_as_solo_miner(
proxy_config,
tx_status.clone(),
task_collector.clone(),
)
.await;
});
}
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = interrupt_signal_future => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
std::process::exit(0);
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
select! {
task_status = rx_status.recv().fuse() => {
if let Ok(task_status) = task_status {
match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamRogue => {
error!("Changin Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
status::State::UpstreamRogue => {
error!("Changing Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
} else {
info!("Received unknown task. Shutting down.");
break 'outer;
}
},
_ = self.shutdown.notified().fuse() => {
info!("Shutting down gracefully...");
std::process::exit(0);
}
}
};
}
}
}

async fn initialize_jd_as_solo_miner(
&self,
proxy_config: ProxyConfig,

Check warning on line 180 in roles/jd-client/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-client/src/lib/mod.rs#L180

Added line #L180 was not covered by tests
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap();
let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap();

// When Downstream receive a share that meets bitcoin target it transformit in a
// SubmitSolution and send it to the TemplateReceiver
Expand Down Expand Up @@ -211,12 +232,11 @@ impl JobDeclaratorClient {
}

async fn initialize_jd(
&self,
proxy_config: ProxyConfig,

Check warning on line 235 in roles/jd-client/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-client/src/lib/mod.rs#L235

Added line #L235 was not covered by tests
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
upstream_config: proxy_config::Upstream,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let test_only_do_not_send_solution_to_tp = proxy_config
.test_only_do_not_send_solution_to_tp
Expand Down Expand Up @@ -346,6 +366,10 @@ impl JobDeclaratorClient {
)
.await;
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
}
}

#[derive(Debug)]
Expand Down

0 comments on commit b1e6780

Please sign in to comment.