Skip to content

Commit

Permalink
elements: Implement in-memory asset store and GET /assets/registry
Browse files Browse the repository at this point in the history
  • Loading branch information
shesek committed Oct 27, 2020
1 parent d604320 commit 9cb880a
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 26 deletions.
9 changes: 5 additions & 4 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ fn run_server(config: Arc<Config>) -> Result<()> {
mempool.write().unwrap().update(&daemon)?;

#[cfg(feature = "liquid")]
let asset_db = config
.asset_db_path
.as_ref()
.map(|dir| AssetRegistry::new(dir.clone()));
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
let asset_db = Arc::new(RwLock::new(AssetRegistry::new(db_dir.clone())));
AssetRegistry::spawn_sync(asset_db.clone());
asset_db
});

let query = Arc::new(Query::new(
Arc::clone(&chain),
Expand Down
9 changes: 7 additions & 2 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

use bitcoin::hashes::{hex::FromHex, sha256, Hash};
use bitcoin::{BlockHash, Txid};
Expand Down Expand Up @@ -343,8 +344,9 @@ fn asset_history_row(

pub fn lookup_asset(
query: &Query,
registry: Option<&AssetRegistry>,
registry: Option<&Arc<RwLock<AssetRegistry>>>,
asset_id: &AssetId,
meta: Option<&AssetMeta>, // may optionally be provided if already known
) -> Result<Option<LiquidAsset>> {
if asset_id == query.network().native_asset() {
let (chain_stats, mempool_stats) = native_asset_stats(query);
Expand All @@ -370,7 +372,10 @@ pub fn lookup_asset(
Ok(if let Some(row) = row {
let reissuance_token = parse_asset_id(&row.reissuance_token);

let meta = registry.map_or_else(|| Ok(None), |r| r.load(asset_id))?;
let registry = registry.map(|r| r.read().unwrap());
let meta = meta
.or_else(|| registry.as_ref().and_then(|r| r.get(asset_id)))
.cloned();
let stats = issued_asset_stats(query, asset_id, &reissuance_token);
let status = query.get_tx_status(&deserialize(&row.issuance_txid).unwrap());

Expand Down
2 changes: 1 addition & 1 deletion src/elements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod registry;

use asset::get_issuance_entropy;
pub use asset::{lookup_asset, LiquidAsset};
pub use registry::AssetRegistry;
pub use registry::{AssetRegistry, AssetSorting};

#[derive(Serialize, Deserialize, Clone)]
pub struct IssuanceValue {
Expand Down
158 changes: 144 additions & 14 deletions src/elements/registry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::fs;
use std::path;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use std::{cmp, fs, path, thread};

use serde_json::Value as JsonValue;

use elements::bitcoin_hashes::hex::FromHex;
use elements::AssetId;

use crate::errors::*;

// length of asset id prefix to use for sub-directory partitioning
Expand All @@ -11,22 +16,91 @@ use crate::errors::*;
const DIR_PARTITION_LEN: usize = 2;
pub struct AssetRegistry {
directory: path::PathBuf,
assets_cache: HashMap<AssetId, (SystemTime, AssetMeta)>,
}

pub type AssetEntry<'a> = (&'a AssetId, &'a AssetMeta);

impl AssetRegistry {
pub fn new(directory: path::PathBuf) -> Self {
Self { directory }
}

pub fn load<A: ToString>(&self, asset_id: A) -> Result<Option<AssetMeta>> {
let name = format!("{}.json", asset_id.to_string());
let subdir = self.directory.join(&name[0..DIR_PARTITION_LEN]);
let path = subdir.join(name);
Ok(if path.exists() {
let contents = fs::read_to_string(path).chain_err(|| "failed reading file")?;
Some(serde_json::from_str(&contents).chain_err(|| "failed parsing file")?)
} else {
None
Self {
directory,
assets_cache: Default::default(),
}
}

pub fn get(&self, asset_id: &AssetId) -> Option<&AssetMeta> {
self.assets_cache
.get(asset_id)
.map(|(_, metadata)| metadata)
}

pub fn list(&self, start_index: usize, limit: usize, sorting: AssetSorting) -> Vec<AssetEntry> {
let mut assets: Vec<AssetEntry> = self
.assets_cache
.iter()
.map(|(asset_id, (_, metadata))| (asset_id, metadata))
.collect();
assets.sort_by(sorting.as_comparator());
assets.into_iter().skip(start_index).take(limit).collect()
}

pub fn fs_sync(&mut self) -> Result<()> {
for entry in fs::read_dir(&self.directory).chain_err(|| "failed reading asset dir")? {
let entry = entry.chain_err(|| "invalid fh")?;
let filetype = entry.file_type().chain_err(|| "failed getting file type")?;
if !filetype.is_dir() || entry.file_name().len() != DIR_PARTITION_LEN {
continue;
}

for file_entry in
fs::read_dir(entry.path()).chain_err(|| "failed reading asset subdir")?
{
let file_entry = file_entry.chain_err(|| "invalid fh")?;
let path = file_entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}

let asset_id = AssetId::from_hex(
path.file_stem()
.unwrap() // cannot fail if extension() succeeded
.to_str()
.chain_err(|| "invalid filename")?,
)
.chain_err(|| "invalid filename")?;

let modified = file_entry
.metadata()
.chain_err(|| "failed reading metadata")?
.modified()
.chain_err(|| "metadata modified failed")?;

if let Some((last_update, _)) = self.assets_cache.get(&asset_id) {
if *last_update == modified {
continue;
}
}

let metadata: AssetMeta = serde_json::from_str(
&fs::read_to_string(path).chain_err(|| "failed reading file")?,
)
.chain_err(|| "failed parsing file")?;

self.assets_cache.insert(asset_id, (modified, metadata));
}
}
Ok(())
}

pub fn spawn_sync(asset_db: Arc<RwLock<AssetRegistry>>) -> thread::JoinHandle<()> {
thread::spawn(move || loop {
if let Err(e) = asset_db.write().unwrap().fs_sync() {
error!("registry fs_sync failed: {:?}", e);
}

thread::sleep(Duration::from_secs(15));
// TODO handle shutdowm
})
}
}
Expand All @@ -42,3 +116,59 @@ pub struct AssetMeta {
#[serde(skip_serializing_if = "Option::is_none")]
pub ticker: Option<String>,
}

impl AssetMeta {
fn domain(&self) -> Option<&str> {
self.entity["domain"].as_str()
}
}

pub struct AssetSorting(AssetSortField, AssetSortDir);

pub enum AssetSortField {
Name,
Domain,
Ticker,
}
pub enum AssetSortDir {
Descending,
Ascending,
}

impl AssetSorting {
fn as_comparator(self) -> Box<dyn Fn(&AssetEntry, &AssetEntry) -> cmp::Ordering> {
let sort_fn: Box<dyn Fn(&AssetEntry, &AssetEntry) -> cmp::Ordering> = match self.0 {
AssetSortField::Name => {
// Order by name first, use asset id as a tie breaker. the other sorting fields
// don't require this because they're guaranteed to be unique.
Box::new(|a, b| a.1.name.cmp(&b.1.name).then_with(|| a.0.cmp(b.0)))
}
AssetSortField::Domain => Box::new(|a, b| a.1.domain().cmp(&b.1.domain())),
AssetSortField::Ticker => Box::new(|a, b| a.1.ticker.cmp(&b.1.ticker)),
};

match self.1 {
AssetSortDir::Ascending => sort_fn,
AssetSortDir::Descending => Box::new(move |a, b| sort_fn(a, b).reverse()),
}
}

pub fn from_query_params(query: &HashMap<String, String>) -> Result<Self> {
let field = match query.get("sort_field").map(String::as_str) {
None => AssetSortField::Ticker,
Some("name") => AssetSortField::Name,
Some("domain") => AssetSortField::Domain,
Some("ticker") => AssetSortField::Ticker,
_ => bail!("invalid sort field"),
};

let dir = match query.get("sort_dir").map(String::as_str) {
None => AssetSortDir::Ascending,
Some("asc") => AssetSortDir::Ascending,
Some("desc") => AssetSortDir::Descending,
_ => bail!("invalid sort direction"),
};

Ok(Self(field, dir))
}
}
31 changes: 27 additions & 4 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bitcoin::Txid;
#[cfg(feature = "liquid")]
use crate::{
chain::AssetId,
elements::{lookup_asset, AssetRegistry, LiquidAsset},
elements::{lookup_asset, AssetRegistry, AssetSorting, LiquidAsset},
};

const FEE_ESTIMATES_TTL: u64 = 60; // seconds
Expand All @@ -34,7 +34,7 @@ pub struct Query {
cached_estimates: RwLock<(HashMap<u16, f64>, Option<Instant>)>,
cached_relayfee: RwLock<Option<f64>>,
#[cfg(feature = "liquid")]
asset_db: Option<AssetRegistry>,
asset_db: Option<Arc<RwLock<AssetRegistry>>>,
}

impl Query {
Expand Down Expand Up @@ -221,7 +221,7 @@ impl Query {
mempool: Arc<RwLock<Mempool>>,
daemon: Arc<Daemon>,
config: Arc<Config>,
asset_db: Option<AssetRegistry>,
asset_db: Option<Arc<RwLock<AssetRegistry>>>,
) -> Self {
Query {
chain,
Expand All @@ -236,6 +236,29 @@ impl Query {

#[cfg(feature = "liquid")]
pub fn lookup_asset(&self, asset_id: &AssetId) -> Result<Option<LiquidAsset>> {
lookup_asset(&self, self.asset_db.as_ref(), asset_id)
lookup_asset(&self, self.asset_db.as_ref(), asset_id, None)
}

#[cfg(feature = "liquid")]
pub fn list_registry_assets(
&self,
start_index: usize,
limit: usize,
sorting: AssetSorting,
) -> Result<Vec<LiquidAsset>> {
let asset_db = match &self.asset_db {
None => return Ok(vec![]),
Some(db) => db.read().unwrap(),
};
Ok(asset_db
.list(start_index, limit, sorting)
.into_iter()
.filter_map(|(asset_id, metadata)| {
// Attach on-chain information alongside the registry metadata
lookup_asset(&self, None, asset_id, Some(metadata))
.ok()
.flatten()
})
.collect())
}
}
27 changes: 26 additions & 1 deletion src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hyperlocal::UnixServerExt;
use std::fs;
#[cfg(feature = "liquid")]
use {
crate::elements::{peg::PegoutValue, IssuanceValue},
crate::elements::{peg::PegoutValue, AssetSorting, IssuanceValue},
elements::{
confidential::{Asset, Nonce, Value},
encode, AssetId,
Expand All @@ -44,6 +44,11 @@ const MAX_MEMPOOL_TXS: usize = 50;
const BLOCK_LIMIT: usize = 10;
const ADDRESS_SEARCH_LIMIT: usize = 10;

#[cfg(feature = "liquid")]
const ASSETS_PER_PAGE: usize = 25;
#[cfg(feature = "liquid")]
const ASSETS_MAX_PER_PAGE: usize = 100;

const TTL_LONG: u32 = 157_784_630; // ttl for static resources (5 years)
const TTL_SHORT: u32 = 10; // ttl for volatie resources
const TTL_MEMPOOL_RECENT: u32 = 5; // ttl for GET /mempool/recent
Expand Down Expand Up @@ -1015,6 +1020,26 @@ fn handle_request(
json_response(query.estimate_fee_map(), TTL_SHORT)
}

#[cfg(feature = "liquid")]
(&Method::GET, Some(&"assets"), Some(&"registry"), None, None, None) => {
let start_index: usize = query_params
.get("start_index")
.and_then(|n| n.parse().ok())
.unwrap_or(0);

let limit: usize = query_params
.get("limit")
.and_then(|n| n.parse().ok())
.map(|n: usize| n.min(ASSETS_MAX_PER_PAGE))
.unwrap_or(ASSETS_PER_PAGE);

let sorting = AssetSorting::from_query_params(&query_params)?;

let assets = query.list_registry_assets(start_index, limit, sorting)?;

json_response(assets, TTL_SHORT)
}

#[cfg(feature = "liquid")]
(&Method::GET, Some(&"asset"), Some(asset_str), None, None, None) => {
let asset_id = AssetId::from_hex(asset_str)?;
Expand Down

0 comments on commit 9cb880a

Please sign in to comment.