Skip to content

Commit

Permalink
Add async mutex around syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Oct 30, 2023
1 parent c6e7576 commit 0170c23
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
12 changes: 11 additions & 1 deletion mutiny-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use lightning::{
use crate::multiesplora::MultiEsploraClient;
use crate::utils::get_monitor_version;
use bitcoin::util::bip32::ExtendedPrivKey;
use futures_util::lock::Mutex;
use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
use lightning::ln::PaymentSecret;
use lightning::sign::{EntropySource, InMemorySigner, NodeSigner, Recipient};
Expand Down Expand Up @@ -162,6 +163,7 @@ pub(crate) struct Node<S: MutinyStorage> {
wallet: Arc<OnChainWallet<S>>,
logger: Arc<MutinyLogger>,
pub(crate) lsp_client: Option<LspClient>,
pub(crate) sync_lock: Arc<Mutex<()>>,
stop: Arc<AtomicBool>,
#[cfg(target_arch = "wasm32")]
websocket_proxy_addr: String,
Expand Down Expand Up @@ -549,11 +551,14 @@ impl<S: MutinyStorage> Node<S> {
keys_manager.get_node_id(Recipient::Node).unwrap()
);

let sync_lock = Arc::new(Mutex::new(()));

// Here we re-attempt to persist any monitors that failed to persist previously.
let retry_logger = logger.clone();
let retry_persister = persister.clone();
let retry_stop = stop.clone();
let retry_chain_monitor = chain_monitor.clone();
let retry_sync_lock = sync_lock.clone();
utils::spawn(async move {
// sleep 3 seconds before checking, we won't have any pending updates on startup
sleep(3_000).await;
Expand All @@ -563,7 +568,11 @@ impl<S: MutinyStorage> Node<S> {
break;
}

let updates = retry_chain_monitor.list_pending_monitor_updates();
let updates = {
let _lock = retry_sync_lock.lock().await;
retry_chain_monitor.list_pending_monitor_updates()
};

for (funding_txo, update_ids) in updates {
// if there are no updates, skip
if update_ids.is_empty() {
Expand Down Expand Up @@ -642,6 +651,7 @@ impl<S: MutinyStorage> Node<S> {
wallet,
logger,
lsp_client,
sync_lock,
stop,
#[cfg(target_arch = "wasm32")]
websocket_proxy_addr,
Expand Down
8 changes: 8 additions & 0 deletions mutiny-core/src/nodemanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,14 @@ impl<S: MutinyStorage> NodeManager<S> {
async fn sync_ldk(&self) -> Result<(), MutinyError> {
let nodes = self.nodes.lock().await;

// Lock all the nodes so we can sync them, make sure we keep the locks
// in scope so they don't get dropped and unlocked.
let futs = nodes
.iter()
.map(|(_, node)| node.sync_lock.lock())
.collect::<Vec<_>>();
let _locks = join_all(futs).await;

let confirmables: Vec<&(dyn Confirm)> = nodes
.iter()
.flat_map(|(_, node)| {
Expand Down

0 comments on commit 0170c23

Please sign in to comment.