From f730c43abc7088435456e9f049987450a151bc15 Mon Sep 17 00:00:00 2001 From: Mononaut <github@monospace.live> Date: Sun, 23 Jun 2024 07:54:23 +0000 Subject: [PATCH] Add recent utxos rest API endpoint --- src/config.rs | 8 ++ src/new_index/query.rs | 25 +++++ src/new_index/schema.rs | 219 ++++++++++++++++++++++++++++++++++++++++ src/rest.rs | 25 +++++ 4 files changed, 277 insertions(+) diff --git a/src/config.rs b/src/config.rs index a5e903ce..aa539627 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,6 +53,7 @@ pub struct Config { pub precache_scripts: Option<String>, pub precache_threads: usize, pub utxos_limit: usize, + pub utxos_history_limit: usize, pub electrum_txs_limit: usize, pub electrum_banner: String, pub mempool_backlog_stats_ttl: u64, @@ -218,6 +219,12 @@ impl Config { .help("Maximum number of utxos to process per address. Lookups for addresses with more utxos will fail. Applies to the Electrum and HTTP APIs.") .default_value("500") ) + .arg( + Arg::with_name("utxos_history_limit") + .long("utxos-history-limit") + .help("Maximum number of history entries to process per address when looking up recent utxos.") + .default_value("20000") + ) .arg( Arg::with_name("mempool_backlog_stats_ttl") .long("mempool-backlog-stats-ttl") @@ -514,6 +521,7 @@ impl Config { daemon_rpc_addr, cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), + utxos_history_limit: value_t_or_exit!(m, "utxos_history_limit", usize), electrum_rpc_addr, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 3e314fd1..5f59efb2 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -107,6 +107,31 @@ impl Query { Ok(utxos) } + pub fn recent_utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> { + let mut utxos = self.chain.recent_utxo( + scripthash, + self.config.utxos_limit, + self.config.utxos_history_limit, + super::db::DBFlush::Enable, + )?; + let mempool = self.mempool(); + utxos.retain(|utxo| !mempool.has_spend(&OutPoint::from(utxo))); + utxos.extend(mempool.utxo(scripthash)); + utxos.sort_by(|a, b| match (&a.confirmed, &b.confirmed) { + (Some(block_a), Some(block_b)) => { + if block_a.height == block_b.height { + a.txid.cmp(&b.txid) + } else { + block_b.height.cmp(&block_a.height) + } + } + (Some(_), None) => std::cmp::Ordering::Greater, + (None, Some(_)) => std::cmp::Ordering::Less, + (None, None) => a.txid.cmp(&b.txid), + }); + Ok(utxos) + } + pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, Option<BlockId>)> { let confirmed_txids = self.chain.history_txids(scripthash, limit); let confirmed_len = confirmed_txids.len(); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index eba60588..912a6452 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -796,6 +796,181 @@ impl ChainQuery { Ok((utxos, lastblock, processed_items)) } + // get the *most recent* limit utxos + pub fn recent_utxo( + &self, + scripthash: &[u8], + limit: usize, + entries_limit: usize, + flush: DBFlush, + ) -> Result<Vec<Utxo>> { + let _timer = self.start_timer("recent_utxo"); + + // get the last known utxo set and the blockhash it was updated for. + // invalidates the cache if the block was orphaned. + let cache: Option<(UtxoMap, usize, bool, usize, usize)> = self + .store + .cache_db + .get(&RecentUtxoCacheRow::key(scripthash)) + .map(|c| bincode_util::deserialize_little(&c).unwrap()) + .and_then(|(utxos_cache, blockhash, limited, limit, entries_limit)| { + self.height_by_hash(&blockhash) + .map(|height| (utxos_cache, height, limited, limit, entries_limit)) + }) + .map(|(utxos_cache, height, limited, limit, entries_limit)| { + ( + from_utxo_cache(utxos_cache, self), + height, + limited, + limit, + entries_limit, + ) + }); + let had_cache = cache.is_some(); + + // get utxos set for new transactions + let (newutxos, lastblock, processed_items, limited) = cache.map_or_else( + || self.recent_utxo_delta(scripthash, HashMap::new(), None, limit, entries_limit), + |(oldutxos, blockheight, limited, cache_limit, cache_entries_limit)| { + // invalidate the cache if it was constructed with a lower resource limit + let start_height = + if limited && (cache_limit < limit || cache_entries_limit < entries_limit) { + None + } else { + Some(blockheight as u32) + }; + self.recent_utxo_delta(scripthash, oldutxos, start_height, limit, entries_limit) + }, + )?; + + // save updated utxo set to cache + if let Some(lastblock) = lastblock { + if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE { + self.store.cache_db.write( + vec![RecentUtxoCacheRow::new( + scripthash, + &newutxos, + &lastblock, + limited, + limit, + entries_limit, + ) + .into_row()], + flush, + ); + } + } + + // format as Utxo objects + Ok(newutxos + .into_iter() + .map(|(outpoint, (blockid, value))| { + // in elements/liquid chains, we have to lookup the txo in order to get its + // associated asset. the asset information could be kept in the db history rows + // alongside the value to avoid this. + #[cfg(feature = "liquid")] + let txo = self.lookup_txo(&outpoint).expect("missing utxo"); + + Utxo { + txid: outpoint.txid, + vout: outpoint.vout, + value, + confirmed: Some(blockid), + + #[cfg(feature = "liquid")] + asset: txo.asset, + #[cfg(feature = "liquid")] + nonce: txo.nonce, + #[cfg(feature = "liquid")] + witness: txo.witness, + } + }) + .collect()) + } + + fn recent_utxo_delta( + &self, + scripthash: &[u8], + init_utxos: UtxoMap, + start_height: Option<u32>, + limit: usize, + entries_limit: usize, + ) -> Result<(UtxoMap, Option<BlockHash>, usize, bool)> { + // iterate over history in reverse until we reach the utxo limit or meet the last known utxo + let _timer = self.start_timer("recent_utxo_delta"); + let history_iter = self + .history_iter_scan_reverse(b'H', scripthash) + .map(TxHistoryRow::from_row) + .take_while(|row| { + if let Some(height) = start_height { + row.key.confirmed_height > height + } else { + true + } + }) + .filter_map(|history| { + self.tx_confirming_block(&history.get_txid()) + // drop history entries that were previously confirmed in a re-orged block and later + // confirmed again at a different height + .filter(|blockid| blockid.height == history.key.confirmed_height as usize) + .map(|b| (history, b)) + }); + + let mut utxos = UtxoMap::new(); + let mut spent: HashSet<OutPoint> = HashSet::new(); + let mut processed_items = 0; + let mut lastblock = None; + let mut limited = false; + + for (history, blockid) in history_iter { + processed_items += 1; + if lastblock.is_none() { + lastblock = Some(blockid.hash); + } + + match history.key.txinfo { + TxHistoryInfo::Funding(ref info) => { + if !spent.contains(&history.get_funded_outpoint()) { + utxos.insert(history.get_funded_outpoint(), (blockid, info.value)); + } + } + TxHistoryInfo::Spending(_) => { + utxos.remove(&history.get_funded_outpoint()); + spent.insert(history.get_funded_outpoint()); + } + #[cfg(feature = "liquid")] + TxHistoryInfo::Issuing(_) + | TxHistoryInfo::Burning(_) + | TxHistoryInfo::Pegin(_) + | TxHistoryInfo::Pegout(_) => unreachable!(), + }; + + // finish as soon as the utxo set size exceeds the limit + if utxos.len() >= limit || processed_items >= entries_limit { + limited = true; + break; + } + } + + // copy across unspent txos from cache + let mut utxo_entries: Vec<(&OutPoint, &(BlockId, Value))> = init_utxos.iter().collect(); + utxo_entries.sort_by(|a, b| a.1 .0.height.cmp(&b.1 .0.height)); + for (&outpoint, &(ref blockid, value)) in utxo_entries { + if lastblock.is_none() { + lastblock = Some(blockid.hash); + } + if !spent.contains(&outpoint) { + utxos.insert(outpoint, (blockid.clone(), value)); + } + if utxos.len() >= limit { + limited = true; + break; + } + } + + Ok((utxos, lastblock, processed_items, limited)) + } + pub fn stats(&self, scripthash: &[u8], flush: DBFlush) -> ScriptStats { let _timer = self.start_timer("stats"); @@ -1830,6 +2005,50 @@ impl UtxoCacheRow { } } +struct RecentUtxoCacheRow { + key: ScriptCacheKey, + value: Bytes, +} + +impl RecentUtxoCacheRow { + fn new( + scripthash: &[u8], + utxos: &UtxoMap, + blockhash: &BlockHash, + limited: bool, + limit: usize, + entries_limit: usize, + ) -> Self { + let utxos_cache = make_utxo_cache(utxos); + + RecentUtxoCacheRow { + key: ScriptCacheKey { + code: b'R', + scripthash: full_hash(scripthash), + }, + value: bincode_util::serialize_little(&( + utxos_cache, + blockhash, + limited, + limit, + entries_limit, + )) + .unwrap(), + } + } + + pub fn key(scripthash: &[u8]) -> Bytes { + [b"R", scripthash].concat() + } + + fn into_row(self) -> DBRow { + DBRow { + key: bincode_util::serialize_little(&self.key).unwrap(), + value: self.value, + } + } +} + // keep utxo cache with just the block height (the hash/timestamp are read later from the headers to reconstruct BlockId) // and use a (txid,vout) tuple instead of OutPoints (they don't play nicely with bincode serialization) fn make_utxo_cache(utxos: &UtxoMap) -> CachedUtxoMap { diff --git a/src/rest.rs b/src/rest.rs index ce8bef96..9a9fe723 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -1046,6 +1046,31 @@ fn handle_request( // XXX paging? json_response(utxos, TTL_SHORT) } + ( + &Method::GET, + Some(script_type @ &"address"), + Some(script_str), + Some(&"utxo"), + Some(&"recent"), + None, + ) + | ( + &Method::GET, + Some(script_type @ &"scripthash"), + Some(script_str), + Some(&"utxo"), + Some(&"recent"), + None, + ) => { + let script_hash = to_scripthash(script_type, script_str, config.network_type)?; + let utxos: Vec<UtxoValue> = query + .recent_utxo(&script_hash[..])? + .into_iter() + .map(UtxoValue::from) + .collect(); + // XXX paging? + json_response(utxos, TTL_SHORT) + } (&Method::GET, Some(&"address-prefix"), Some(prefix), None, None, None) => { if !config.address_search { return Err(HttpError::from("address search disabled".to_string()));