Skip to content

Commit

Permalink
fix sigterm issue with translator sv2
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 5, 2025
1 parent 1f2c5e8 commit 04455f0
Showing 1 changed file with 61 additions and 53 deletions.
114 changes: 61 additions & 53 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use tokio::{
sync::broadcast,
sync::{broadcast, Notify},
task::{self, AbortHandle},
};
use tracing::{debug, error, info, warn};
Expand All @@ -32,6 +32,7 @@ pub mod utils;
pub struct TranslatorSv2 {
config: ProxyConfig,
reconnect_wait_time: u64,
shutdown: Arc<Notify>,
}

impl TranslatorSv2 {
Expand All @@ -41,6 +42,7 @@ impl TranslatorSv2 {
Self {
config,
reconnect_wait_time: wait_time,
shutdown: Arc::new(Notify::new()),
}
}

Expand All @@ -58,7 +60,8 @@ impl TranslatorSv2 {
let task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>> =
Arc::new(Mutex::new(Vec::new()));

self.internal_start(
Self::internal_start(
self.config.clone(),
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
Expand All @@ -72,74 +75,79 @@ impl TranslatorSv2 {
debug!("Starting up status listener");
let wait_time = self.reconnect_wait_time;
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = tokio::select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = tokio::signal::ctrl_c().fuse() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
State::BridgeShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
tokio::spawn({
let shutdown_signal = self.shutdown.clone();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);
}
});

// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;
loop {
tokio::select! {
task_status = rx_status.recv().fuse() => {
if let Ok(task_status_) = task_status {
match task_status_.state {
State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
self.shutdown.notify_one();
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);
let task_collector1 = task_collector_.clone();
let tx_sv1_notify1 = tx_sv1_notify.clone();
let target = target.clone();
let tx_status = tx_status.clone();
let proxy_config = self.config.clone();
tokio::spawn (async move {
// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector_.clone();
kill_tasks(task_collector_aborting.clone());
// kill al the tasks
let task_collector_aborting = task_collector1.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
self.internal_start(
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
task_collector_.clone(),
)
.await;
warn!("Trying reconnecting to upstream");
Self::internal_start(
proxy_config,
tx_sv1_notify1,
target.clone(),
tx_status.clone(),
task_collector1,
)
.await;
});
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
self.shutdown.notify_one();
}
}
} else {
info!("Channel closed");
break; // Channel closed
}
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
_ = self.shutdown.notified() => {
info!("Shutting down gracefully...");
break;
}
}
}
}

async fn internal_start(
&self,
proxy_config: ProxyConfig,
tx_sv1_notify: broadcast::Sender<server_to_client::Notify<'static>>,
target: Arc<Mutex<Vec<u8>>>,
tx_status: async_channel::Sender<Status<'static>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let proxy_config = self.config.clone();
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10);
Expand Down

0 comments on commit 04455f0

Please sign in to comment.