From 9cb880a9596afb707b1699340e7e3d809f44f6f6 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 27 Oct 2020 03:34:38 +0200 Subject: [PATCH] elements: Implement in-memory asset store and `GET /assets/registry` --- src/bin/electrs.rs | 9 ++- src/elements/asset.rs | 9 ++- src/elements/mod.rs | 2 +- src/elements/registry.rs | 158 +++++++++++++++++++++++++++++++++++---- src/new_index/query.rs | 31 +++++++- src/rest.rs | 27 ++++++- 6 files changed, 210 insertions(+), 26 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 5cc9a7eb..14f563ad 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -84,10 +84,11 @@ fn run_server(config: Arc) -> Result<()> { mempool.write().unwrap().update(&daemon)?; #[cfg(feature = "liquid")] - let asset_db = config - .asset_db_path - .as_ref() - .map(|dir| AssetRegistry::new(dir.clone())); + let asset_db = config.asset_db_path.as_ref().map(|db_dir| { + let asset_db = Arc::new(RwLock::new(AssetRegistry::new(db_dir.clone()))); + AssetRegistry::spawn_sync(asset_db.clone()); + asset_db + }); let query = Arc::new(Query::new( Arc::clone(&chain), diff --git a/src/elements/asset.rs b/src/elements/asset.rs index 610d5b9b..188e939f 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock}; use bitcoin::hashes::{hex::FromHex, sha256, Hash}; use bitcoin::{BlockHash, Txid}; @@ -343,8 +344,9 @@ fn asset_history_row( pub fn lookup_asset( query: &Query, - registry: Option<&AssetRegistry>, + registry: Option<&Arc>>, asset_id: &AssetId, + meta: Option<&AssetMeta>, // may optionally be provided if already known ) -> Result> { if asset_id == query.network().native_asset() { let (chain_stats, mempool_stats) = native_asset_stats(query); @@ -370,7 +372,10 @@ pub fn lookup_asset( Ok(if let Some(row) = row { let reissuance_token = parse_asset_id(&row.reissuance_token); - let meta = registry.map_or_else(|| Ok(None), |r| r.load(asset_id))?; + let registry = registry.map(|r| r.read().unwrap()); + let meta = meta + .or_else(|| registry.as_ref().and_then(|r| r.get(asset_id))) + .cloned(); let stats = issued_asset_stats(query, asset_id, &reissuance_token); let status = query.get_tx_status(&deserialize(&row.issuance_txid).unwrap()); diff --git a/src/elements/mod.rs b/src/elements/mod.rs index 55f57c8f..3da54f0d 100644 --- a/src/elements/mod.rs +++ b/src/elements/mod.rs @@ -7,7 +7,7 @@ mod registry; use asset::get_issuance_entropy; pub use asset::{lookup_asset, LiquidAsset}; -pub use registry::AssetRegistry; +pub use registry::{AssetRegistry, AssetSorting}; #[derive(Serialize, Deserialize, Clone)] pub struct IssuanceValue { diff --git a/src/elements/registry.rs b/src/elements/registry.rs index eef658c1..50507054 100644 --- a/src/elements/registry.rs +++ b/src/elements/registry.rs @@ -1,8 +1,13 @@ -use std::fs; -use std::path; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, SystemTime}; +use std::{cmp, fs, path, thread}; use serde_json::Value as JsonValue; +use elements::bitcoin_hashes::hex::FromHex; +use elements::AssetId; + use crate::errors::*; // length of asset id prefix to use for sub-directory partitioning @@ -11,22 +16,91 @@ use crate::errors::*; const DIR_PARTITION_LEN: usize = 2; pub struct AssetRegistry { directory: path::PathBuf, + assets_cache: HashMap, } +pub type AssetEntry<'a> = (&'a AssetId, &'a AssetMeta); + impl AssetRegistry { pub fn new(directory: path::PathBuf) -> Self { - Self { directory } - } - - pub fn load(&self, asset_id: A) -> Result> { - let name = format!("{}.json", asset_id.to_string()); - let subdir = self.directory.join(&name[0..DIR_PARTITION_LEN]); - let path = subdir.join(name); - Ok(if path.exists() { - let contents = fs::read_to_string(path).chain_err(|| "failed reading file")?; - Some(serde_json::from_str(&contents).chain_err(|| "failed parsing file")?) - } else { - None + Self { + directory, + assets_cache: Default::default(), + } + } + + pub fn get(&self, asset_id: &AssetId) -> Option<&AssetMeta> { + self.assets_cache + .get(asset_id) + .map(|(_, metadata)| metadata) + } + + pub fn list(&self, start_index: usize, limit: usize, sorting: AssetSorting) -> Vec { + let mut assets: Vec = self + .assets_cache + .iter() + .map(|(asset_id, (_, metadata))| (asset_id, metadata)) + .collect(); + assets.sort_by(sorting.as_comparator()); + assets.into_iter().skip(start_index).take(limit).collect() + } + + pub fn fs_sync(&mut self) -> Result<()> { + for entry in fs::read_dir(&self.directory).chain_err(|| "failed reading asset dir")? { + let entry = entry.chain_err(|| "invalid fh")?; + let filetype = entry.file_type().chain_err(|| "failed getting file type")?; + if !filetype.is_dir() || entry.file_name().len() != DIR_PARTITION_LEN { + continue; + } + + for file_entry in + fs::read_dir(entry.path()).chain_err(|| "failed reading asset subdir")? + { + let file_entry = file_entry.chain_err(|| "invalid fh")?; + let path = file_entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("json") { + continue; + } + + let asset_id = AssetId::from_hex( + path.file_stem() + .unwrap() // cannot fail if extension() succeeded + .to_str() + .chain_err(|| "invalid filename")?, + ) + .chain_err(|| "invalid filename")?; + + let modified = file_entry + .metadata() + .chain_err(|| "failed reading metadata")? + .modified() + .chain_err(|| "metadata modified failed")?; + + if let Some((last_update, _)) = self.assets_cache.get(&asset_id) { + if *last_update == modified { + continue; + } + } + + let metadata: AssetMeta = serde_json::from_str( + &fs::read_to_string(path).chain_err(|| "failed reading file")?, + ) + .chain_err(|| "failed parsing file")?; + + self.assets_cache.insert(asset_id, (modified, metadata)); + } + } + Ok(()) + } + + pub fn spawn_sync(asset_db: Arc>) -> thread::JoinHandle<()> { + thread::spawn(move || loop { + if let Err(e) = asset_db.write().unwrap().fs_sync() { + error!("registry fs_sync failed: {:?}", e); + } + + thread::sleep(Duration::from_secs(15)); + // TODO handle shutdowm }) } } @@ -42,3 +116,59 @@ pub struct AssetMeta { #[serde(skip_serializing_if = "Option::is_none")] pub ticker: Option, } + +impl AssetMeta { + fn domain(&self) -> Option<&str> { + self.entity["domain"].as_str() + } +} + +pub struct AssetSorting(AssetSortField, AssetSortDir); + +pub enum AssetSortField { + Name, + Domain, + Ticker, +} +pub enum AssetSortDir { + Descending, + Ascending, +} + +impl AssetSorting { + fn as_comparator(self) -> Box cmp::Ordering> { + let sort_fn: Box cmp::Ordering> = match self.0 { + AssetSortField::Name => { + // Order by name first, use asset id as a tie breaker. the other sorting fields + // don't require this because they're guaranteed to be unique. + Box::new(|a, b| a.1.name.cmp(&b.1.name).then_with(|| a.0.cmp(b.0))) + } + AssetSortField::Domain => Box::new(|a, b| a.1.domain().cmp(&b.1.domain())), + AssetSortField::Ticker => Box::new(|a, b| a.1.ticker.cmp(&b.1.ticker)), + }; + + match self.1 { + AssetSortDir::Ascending => sort_fn, + AssetSortDir::Descending => Box::new(move |a, b| sort_fn(a, b).reverse()), + } + } + + pub fn from_query_params(query: &HashMap) -> Result { + let field = match query.get("sort_field").map(String::as_str) { + None => AssetSortField::Ticker, + Some("name") => AssetSortField::Name, + Some("domain") => AssetSortField::Domain, + Some("ticker") => AssetSortField::Ticker, + _ => bail!("invalid sort field"), + }; + + let dir = match query.get("sort_dir").map(String::as_str) { + None => AssetSortDir::Ascending, + Some("asc") => AssetSortDir::Ascending, + Some("desc") => AssetSortDir::Descending, + _ => bail!("invalid sort direction"), + }; + + Ok(Self(field, dir)) + } +} diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 605f2cc1..fa1ddf6e 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -16,7 +16,7 @@ use bitcoin::Txid; #[cfg(feature = "liquid")] use crate::{ chain::AssetId, - elements::{lookup_asset, AssetRegistry, LiquidAsset}, + elements::{lookup_asset, AssetRegistry, AssetSorting, LiquidAsset}, }; const FEE_ESTIMATES_TTL: u64 = 60; // seconds @@ -34,7 +34,7 @@ pub struct Query { cached_estimates: RwLock<(HashMap, Option)>, cached_relayfee: RwLock>, #[cfg(feature = "liquid")] - asset_db: Option, + asset_db: Option>>, } impl Query { @@ -221,7 +221,7 @@ impl Query { mempool: Arc>, daemon: Arc, config: Arc, - asset_db: Option, + asset_db: Option>>, ) -> Self { Query { chain, @@ -236,6 +236,29 @@ impl Query { #[cfg(feature = "liquid")] pub fn lookup_asset(&self, asset_id: &AssetId) -> Result> { - lookup_asset(&self, self.asset_db.as_ref(), asset_id) + lookup_asset(&self, self.asset_db.as_ref(), asset_id, None) + } + + #[cfg(feature = "liquid")] + pub fn list_registry_assets( + &self, + start_index: usize, + limit: usize, + sorting: AssetSorting, + ) -> Result> { + let asset_db = match &self.asset_db { + None => return Ok(vec![]), + Some(db) => db.read().unwrap(), + }; + Ok(asset_db + .list(start_index, limit, sorting) + .into_iter() + .filter_map(|(asset_id, metadata)| { + // Attach on-chain information alongside the registry metadata + lookup_asset(&self, None, asset_id, Some(metadata)) + .ok() + .flatten() + }) + .collect()) } } diff --git a/src/rest.rs b/src/rest.rs index fbd25e1d..fee6aba8 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -22,7 +22,7 @@ use hyperlocal::UnixServerExt; use std::fs; #[cfg(feature = "liquid")] use { - crate::elements::{peg::PegoutValue, IssuanceValue}, + crate::elements::{peg::PegoutValue, AssetSorting, IssuanceValue}, elements::{ confidential::{Asset, Nonce, Value}, encode, AssetId, @@ -44,6 +44,11 @@ const MAX_MEMPOOL_TXS: usize = 50; const BLOCK_LIMIT: usize = 10; const ADDRESS_SEARCH_LIMIT: usize = 10; +#[cfg(feature = "liquid")] +const ASSETS_PER_PAGE: usize = 25; +#[cfg(feature = "liquid")] +const ASSETS_MAX_PER_PAGE: usize = 100; + const TTL_LONG: u32 = 157_784_630; // ttl for static resources (5 years) const TTL_SHORT: u32 = 10; // ttl for volatie resources const TTL_MEMPOOL_RECENT: u32 = 5; // ttl for GET /mempool/recent @@ -1015,6 +1020,26 @@ fn handle_request( json_response(query.estimate_fee_map(), TTL_SHORT) } + #[cfg(feature = "liquid")] + (&Method::GET, Some(&"assets"), Some(&"registry"), None, None, None) => { + let start_index: usize = query_params + .get("start_index") + .and_then(|n| n.parse().ok()) + .unwrap_or(0); + + let limit: usize = query_params + .get("limit") + .and_then(|n| n.parse().ok()) + .map(|n: usize| n.min(ASSETS_MAX_PER_PAGE)) + .unwrap_or(ASSETS_PER_PAGE); + + let sorting = AssetSorting::from_query_params(&query_params)?; + + let assets = query.list_registry_assets(start_index, limit, sorting)?; + + json_response(assets, TTL_SHORT) + } + #[cfg(feature = "liquid")] (&Method::GET, Some(&"asset"), Some(asset_str), None, None, None) => { let asset_id = AssetId::from_hex(asset_str)?;