diff --git a/mutiny-core/src/ldkstorage.rs b/mutiny-core/src/ldkstorage.rs index 5bb77fd4a..ff261a0b0 100644 --- a/mutiny-core/src/ldkstorage.rs +++ b/mutiny-core/src/ldkstorage.rs @@ -11,7 +11,6 @@ use crate::nodemanager::ChannelClosure; use crate::storage::{MutinyStorage, VersionedValue}; use crate::utils; use crate::utils::{sleep, spawn}; -use crate::vss::VssKeyValueItem; use crate::{chain::MutinyChain, scorer::HubPreferentialScorer}; use anyhow::anyhow; use bitcoin::hashes::hex::{FromHex, ToHex}; @@ -93,6 +92,15 @@ impl MutinyNodePersister { format!("{}_{}", key, self.node_id) } + pub(crate) fn get_monitor_key(&self, funding_txo: &OutPoint) -> String { + let key = format!( + "{MONITORS_PREFIX_KEY}{}_{}", + funding_txo.txid.to_hex(), + funding_txo.index + ); + self.get_key(&key) + } + fn init_persist_monitor( &self, key: String, @@ -105,47 +113,30 @@ impl MutinyNodePersister { let logger = self.logger.clone(); let object = object.encode(); - // currently we only retry storage to VSS because we don't have a way to detect - // for local storage if a higher version has been persisted. Without handling this - // we could end up with a previous state being persisted over a newer one. - // VSS does not have this problem because it verifies the version before storing - // and will not overwrite a newer version, so it is safe to retry. spawn(async move { - let mut is_retry = false; // Sleep before persisting to give chance for the manager to be persisted sleep(50).await; - loop { - match persist_monitor(&storage, &key, &object, Some(version), is_retry, &logger) - .await - { - Ok(()) => { - log_debug!(logger, "Persisted channel monitor: {update_id:?}"); - - // unwrap is safe, we set it up immediately - let chain_monitor = chain_monitor.lock().await; - let chain_monitor = chain_monitor.as_ref().unwrap(); - - // these errors are not fatal, so we don't return them just log - if let Err(e) = chain_monitor.channel_monitor_updated( - update_id.funding_txo, - update_id.monitor_update_id, - ) { - log_error!( - logger, - "Error notifying chain monitor of channel monitor update: {e:?}" - ); - } else { - break; // successful storage, no more attempts - } - } - Err(e) => { - log_error!(logger, "Error persisting channel monitor: {e}"); + match persist_monitor(&storage, &key, &object, Some(version), &logger).await { + Ok(()) => { + log_debug!(logger, "Persisted channel monitor: {update_id:?}"); + + // unwrap is safe, we set it up immediately + let chain_monitor = chain_monitor.lock().await; + let chain_monitor = chain_monitor.as_ref().unwrap(); + + // these errors are not fatal, so we don't return them just log + if let Err(e) = chain_monitor + .channel_monitor_updated(update_id.funding_txo, update_id.monitor_update_id) + { + log_error!( + logger, + "Error notifying chain monitor of channel monitor update: {e:?}" + ); } } - - // if we get here, we failed to persist, so we retry - is_retry = true; - sleep(1_000).await; + Err(e) => { + log_error!(logger, "Error persisting channel monitor: {e}"); + } } }); @@ -675,15 +666,10 @@ impl Persist, monitor_update_id: MonitorUpdateId, ) -> ChannelMonitorUpdateStatus { - let key = format!( - "{MONITORS_PREFIX_KEY}{}_{}", - funding_txo.txid.to_hex(), - funding_txo.index - ); - let key = self.get_key(&key); + let key = self.get_monitor_key(&funding_txo); let update_id = monitor.get_latest_update_id(); - debug_assert!(update_id == utils::get_monitor_version(monitor.encode())); + debug_assert!(update_id == utils::get_monitor_version(&monitor.encode())); // safely convert u64 to u32 let version = if update_id >= u32::MAX as u64 { @@ -707,14 +693,9 @@ impl Persist, monitor_update_id: MonitorUpdateId, ) -> ChannelMonitorUpdateStatus { - let key = format!( - "{MONITORS_PREFIX_KEY}{}_{}", - funding_txo.txid.to_hex(), - funding_txo.index - ); - let key = self.get_key(&key); + let key = self.get_monitor_key(&funding_txo); let update_id = monitor.get_latest_update_id(); - debug_assert!(update_id == utils::get_monitor_version(monitor.encode())); + debug_assert!(update_id == utils::get_monitor_version(&monitor.encode())); // safely convert u64 to u32 let version = if update_id >= u32::MAX as u64 { @@ -738,33 +719,14 @@ pub struct MonitorUpdateIdentifier { pub monitor_update_id: MonitorUpdateId, } -async fn persist_monitor( +pub(crate) async fn persist_monitor( storage: &impl MutinyStorage, key: &str, object: &Vec, version: Option, - vss_only: bool, logger: &MutinyLogger, ) -> Result<(), lightning::io::Error> { - let res = if vss_only { - // if we are only storing to VSS, we don't need to store to local storage - // just need to call put_objects on VSS - if let (Some(vss), Some(version)) = (storage.vss_client(), version) { - let value = - serde_json::to_value(object).map_err(|_| lightning::io::ErrorKind::Other)?; - let item = VssKeyValueItem { - key: key.to_string(), - value, - version, - }; - - vss.put_objects(vec![item]).await - } else { - Ok(()) - } - } else { - storage.set_data_async(key, object, version).await - }; + let res = storage.set_data_async(key, object, version).await; res.map_err(|e| { match e { diff --git a/mutiny-core/src/node.rs b/mutiny-core/src/node.rs index bd7ee3a99..a6e871080 100644 --- a/mutiny-core/src/node.rs +++ b/mutiny-core/src/node.rs @@ -1,5 +1,5 @@ use crate::labels::LabelStorage; -use crate::ldkstorage::ChannelOpenParams; +use crate::ldkstorage::{persist_monitor, ChannelOpenParams}; use crate::nodemanager::ChannelClosure; use crate::scb::StaticChannelBackup; use crate::{ @@ -39,6 +39,7 @@ use lightning::{ }; use crate::multiesplora::MultiEsploraClient; +use crate::utils::get_monitor_version; use bitcoin::util::bip32::ExtendedPrivKey; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; use lightning::ln::PaymentSecret; @@ -548,6 +549,80 @@ impl Node { keys_manager.get_node_id(Recipient::Node).unwrap() ); + // Here we re-attempt to persist any monitors that failed to persist previously. + let retry_logger = logger.clone(); + let retry_persister = persister.clone(); + let retry_stop = stop.clone(); + let retry_chain_monitor = chain_monitor.clone(); + utils::spawn(async move { + loop { + if retry_stop.load(Ordering::Relaxed) { + break; + } + + let updates = retry_chain_monitor.list_pending_monitor_updates(); + for (funding_txo, update_ids) in updates { + // if there are no updates, skip + if update_ids.is_empty() { + continue; + } + + log_debug!( + retry_logger, + "Retrying to persist monitor for outpoint: {funding_txo:?}" + ); + + match retry_chain_monitor.get_monitor(funding_txo) { + Ok(monitor) => { + let key = retry_persister.get_monitor_key(&funding_txo); + let object = monitor.encode(); + let update_id = monitor.get_latest_update_id(); + debug_assert_eq!(update_id, get_monitor_version(&object)); + + // safely convert u64 to u32 + let version = if update_id >= u32::MAX as u64 { + u32::MAX + } else { + update_id as u32 + }; + + let res = persist_monitor( + &retry_persister.storage, + &key, + &object, + Some(version), + &retry_logger, + ) + .await; + + match res { + Ok(_) => { + for id in update_ids { + if let Err(e) = retry_chain_monitor + .channel_monitor_updated(funding_txo, id) + { + log_error!(retry_logger, "Error notifying chain monitor of channel monitor update: {e:?}"); + } + } + } + Err(e) => log_error!( + retry_logger, + "Failed to persist monitor for outpoint: {funding_txo:?}, error: {e:?}", + ), + } + } + Err(_) => log_error!( + retry_logger, + "Failed to get monitor for outpoint: {funding_txo:?}" + ), + } + } + + // sleep 3 seconds + sleep(3_000).await; + } + }); + Ok(Node { _uuid: uuid, stopped_components, diff --git a/mutiny-core/src/utils.rs b/mutiny-core/src/utils.rs index 94660ac7c..9bfe6adeb 100644 --- a/mutiny-core/src/utils.rs +++ b/mutiny-core/src/utils.rs @@ -178,7 +178,7 @@ where /// Returns the version of a channel monitor from a serialized version /// of a channel monitor. -pub fn get_monitor_version(bytes: Vec) -> u64 { +pub fn get_monitor_version(bytes: &[u8]) -> u64 { // first two bytes are the version // next 8 bytes are the version number u64::from_be_bytes(bytes[2..10].try_into().unwrap()) diff --git a/mutiny-wasm/src/indexed_db.rs b/mutiny-wasm/src/indexed_db.rs index cac842595..3d513f4bf 100644 --- a/mutiny-wasm/src/indexed_db.rs +++ b/mutiny-wasm/src/indexed_db.rs @@ -258,12 +258,12 @@ impl IndexedDbStorage { match current.get::>(&key)? { Some(bytes) => { // check first byte is 1, then take u64 from next 8 bytes - let current_version = utils::get_monitor_version(bytes); + let current_version = utils::get_monitor_version(&bytes); let obj: Value = LocalStorage::get(&key).unwrap(); let value = decrypt_value(&key, obj, current.password())?; if let Ok(local_bytes) = serde_json::from_value::>(value.clone()) { - let local_version = utils::get_monitor_version(local_bytes); + let local_version = utils::get_monitor_version(&local_bytes); // if the current version is less than the version from local storage // then we want to use the local storage version @@ -372,7 +372,7 @@ impl IndexedDbStorage { // we can get versions from monitors, so we should compare match current.get::>(&kv.key)? { Some(bytes) => { - let current_version = utils::get_monitor_version(bytes); + let current_version = utils::get_monitor_version(&bytes); // if the current version is less than the version from vss, then we want to use the vss version if current_version < kv.version as u64 {