Skip to content

Commit

Permalink
Use alternative retry method for async storage
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Oct 15, 2023
1 parent 6aa37c3 commit 659c5de
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 77 deletions.
106 changes: 34 additions & 72 deletions mutiny-core/src/ldkstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::nodemanager::ChannelClosure;
use crate::storage::{MutinyStorage, VersionedValue};
use crate::utils;
use crate::utils::{sleep, spawn};
use crate::vss::VssKeyValueItem;
use crate::{chain::MutinyChain, scorer::HubPreferentialScorer};
use anyhow::anyhow;
use bitcoin::hashes::hex::{FromHex, ToHex};
Expand Down Expand Up @@ -93,6 +92,15 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
format!("{}_{}", key, self.node_id)
}

pub(crate) fn get_monitor_key(&self, funding_txo: &OutPoint) -> String {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
self.get_key(&key)
}

fn init_persist_monitor<W: Writeable>(
&self,
key: String,
Expand All @@ -105,47 +113,30 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
let logger = self.logger.clone();
let object = object.encode();

// currently we only retry storage to VSS because we don't have a way to detect
// for local storage if a higher version has been persisted. Without handling this
// we could end up with a previous state being persisted over a newer one.
// VSS does not have this problem because it verifies the version before storing
// and will not overwrite a newer version, so it is safe to retry.
spawn(async move {
let mut is_retry = false;
// Sleep before persisting to give chance for the manager to be persisted
sleep(50).await;
loop {
match persist_monitor(&storage, &key, &object, Some(version), is_retry, &logger)
.await
{
Ok(()) => {
log_debug!(logger, "Persisted channel monitor: {update_id:?}");

// unwrap is safe, we set it up immediately
let chain_monitor = chain_monitor.lock().await;
let chain_monitor = chain_monitor.as_ref().unwrap();

// these errors are not fatal, so we don't return them just log
if let Err(e) = chain_monitor.channel_monitor_updated(
update_id.funding_txo,
update_id.monitor_update_id,
) {
log_error!(
logger,
"Error notifying chain monitor of channel monitor update: {e:?}"
);
} else {
break; // successful storage, no more attempts
}
}
Err(e) => {
log_error!(logger, "Error persisting channel monitor: {e}");
match persist_monitor(&storage, &key, &object, Some(version), &logger).await {
Ok(()) => {
log_debug!(logger, "Persisted channel monitor: {update_id:?}");

// unwrap is safe, we set it up immediately
let chain_monitor = chain_monitor.lock().await;
let chain_monitor = chain_monitor.as_ref().unwrap();

// these errors are not fatal, so we don't return them just log
if let Err(e) = chain_monitor
.channel_monitor_updated(update_id.funding_txo, update_id.monitor_update_id)
{
log_error!(
logger,
"Error notifying chain monitor of channel monitor update: {e:?}"
);
}
}

// if we get here, we failed to persist, so we retry
is_retry = true;
sleep(1_000).await;
Err(e) => {
log_error!(logger, "Error persisting channel monitor: {e}");
}
}
});

Expand Down Expand Up @@ -675,15 +666,10 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
monitor: &ChannelMonitor<ChannelSigner>,
monitor_update_id: MonitorUpdateId,
) -> ChannelMonitorUpdateStatus {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
let key = self.get_key(&key);
let key = self.get_monitor_key(&funding_txo);

let update_id = monitor.get_latest_update_id();
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
Expand All @@ -707,14 +693,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
monitor: &ChannelMonitor<ChannelSigner>,
monitor_update_id: MonitorUpdateId,
) -> ChannelMonitorUpdateStatus {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
let key = self.get_key(&key);
let key = self.get_monitor_key(&funding_txo);
let update_id = monitor.get_latest_update_id();
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
Expand All @@ -738,33 +719,14 @@ pub struct MonitorUpdateIdentifier {
pub monitor_update_id: MonitorUpdateId,
}

async fn persist_monitor(
pub(crate) async fn persist_monitor(
storage: &impl MutinyStorage,
key: &str,
object: &Vec<u8>,
version: Option<u32>,
vss_only: bool,
logger: &MutinyLogger,
) -> Result<(), lightning::io::Error> {
let res = if vss_only {
// if we are only storing to VSS, we don't need to store to local storage
// just need to call put_objects on VSS
if let (Some(vss), Some(version)) = (storage.vss_client(), version) {
let value =
serde_json::to_value(object).map_err(|_| lightning::io::ErrorKind::Other)?;
let item = VssKeyValueItem {
key: key.to_string(),
value,
version,
};

vss.put_objects(vec![item]).await
} else {
Ok(())
}
} else {
storage.set_data_async(key, object, version).await
};
let res = storage.set_data_async(key, object, version).await;

res.map_err(|e| {
match e {
Expand Down
77 changes: 76 additions & 1 deletion mutiny-core/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::labels::LabelStorage;
use crate::ldkstorage::ChannelOpenParams;
use crate::ldkstorage::{persist_monitor, ChannelOpenParams};
use crate::nodemanager::ChannelClosure;
use crate::scb::StaticChannelBackup;
use crate::{
Expand Down Expand Up @@ -39,6 +39,7 @@ use lightning::{
};

use crate::multiesplora::MultiEsploraClient;
use crate::utils::get_monitor_version;
use bitcoin::util::bip32::ExtendedPrivKey;
use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
use lightning::ln::PaymentSecret;
Expand Down Expand Up @@ -548,6 +549,80 @@ impl<S: MutinyStorage> Node<S> {
keys_manager.get_node_id(Recipient::Node).unwrap()
);

// Here we re-attempt to persist any monitors that failed to persist previously.
let retry_logger = logger.clone();
let retry_persister = persister.clone();
let retry_stop = stop.clone();
let retry_chain_monitor = chain_monitor.clone();
utils::spawn(async move {
loop {
if retry_stop.load(Ordering::Relaxed) {
break;
}

let updates = retry_chain_monitor.list_pending_monitor_updates();
for (funding_txo, update_ids) in updates {
// if there are no updates, skip
if update_ids.is_empty() {
continue;
}

log_debug!(
retry_logger,
"Retrying to persist monitor for outpoint: {funding_txo:?}"
);

match retry_chain_monitor.get_monitor(funding_txo) {
Ok(monitor) => {
let key = retry_persister.get_monitor_key(&funding_txo);
let object = monitor.encode();
let update_id = monitor.get_latest_update_id();
debug_assert!(update_id == get_monitor_version(&object));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
u32::MAX
} else {
update_id as u32
};

let res = persist_monitor(
&retry_persister.storage,
&key,
&object,
Some(version),
&retry_logger,
)
.await;

match res {
Ok(_) => {
for id in update_ids {
if let Err(e) = retry_chain_monitor
.channel_monitor_updated(funding_txo, id)
{
log_error!(retry_logger, "Error notifying chain monitor of channel monitor update: {e:?}");
}
}
}
Err(e) => log_error!(
retry_logger,
"Failed to persist monitor for outpoint: {funding_txo:?}, error: {e:?}",
),
}
}
Err(_) => log_error!(
retry_logger,
"Failed to get monitor for outpoint: {funding_txo:?}"
),
}
}

// sleep 3 seconds
sleep(3_000).await;
}
});

Ok(Node {
_uuid: uuid,
stopped_components,
Expand Down
2 changes: 1 addition & 1 deletion mutiny-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where

/// Returns the version of a channel monitor from a serialized version
/// of a channel monitor.
pub fn get_monitor_version(bytes: Vec<u8>) -> u64 {
pub fn get_monitor_version(bytes: &[u8]) -> u64 {
// first two bytes are the version
// next 8 bytes are the version number
u64::from_be_bytes(bytes[2..10].try_into().unwrap())
Expand Down
6 changes: 3 additions & 3 deletions mutiny-wasm/src/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ impl IndexedDbStorage {
match current.get::<Vec<u8>>(&key)? {
Some(bytes) => {
// check first byte is 1, then take u64 from next 8 bytes
let current_version = utils::get_monitor_version(bytes);
let current_version = utils::get_monitor_version(&bytes);

let obj: Value = LocalStorage::get(&key).unwrap();
let value = decrypt_value(&key, obj, current.password())?;
if let Ok(local_bytes) = serde_json::from_value::<Vec<u8>>(value.clone()) {
let local_version = utils::get_monitor_version(local_bytes);
let local_version = utils::get_monitor_version(&local_bytes);

// if the current version is less than the version from local storage
// then we want to use the local storage version
Expand Down Expand Up @@ -372,7 +372,7 @@ impl IndexedDbStorage {
// we can get versions from monitors, so we should compare
match current.get::<Vec<u8>>(&kv.key)? {
Some(bytes) => {
let current_version = utils::get_monitor_version(bytes);
let current_version = utils::get_monitor_version(&bytes);

// if the current version is less than the version from vss, then we want to use the vss version
if current_version < kv.version as u64 {
Expand Down

0 comments on commit 659c5de

Please sign in to comment.