From d68869a9628dfde245a5712caf9209b53e5052af Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sun, 10 Oct 2021 15:44:05 +0300 Subject: [PATCH] Support basic RPC handling during initial sync Following https://github.com/romanz/electrs/issues/534#issuecomment-935982984 --- src/electrum.rs | 17 +++++++- src/index.rs | 101 +++++++++++++++++++++++++----------------------- src/server.rs | 15 +++++-- src/tracker.rs | 21 +++++++--- 4 files changed, 96 insertions(+), 58 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index 83418ce0a..0dc3d2830 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -88,6 +88,7 @@ enum RpcError { // Electrum-specific errors BadRequest(anyhow::Error), DaemonError(daemon::RpcError), + UnavailableIndex, } impl RpcError { @@ -107,6 +108,10 @@ impl RpcError { }, RpcError::BadRequest(err) => json!({"code": 1, "message": err.to_string()}), RpcError::DaemonError(err) => json!({"code": 2, "message": err.message}), + RpcError::UnavailableIndex => { + // Internal JSON-RPC error (https://www.jsonrpc.org/specification#error_object) + json!({"code": -32603, "message": "unavailable index"}) + } } } } @@ -155,7 +160,7 @@ impl Rpc { self.daemon.new_block_notification() } - pub fn sync(&mut self) -> Result<()> { + pub fn sync(&mut self) -> Result { self.tracker.sync(&self.daemon, self.signal.exit_flag()) } @@ -494,6 +499,16 @@ impl Rpc { Err(response) => return response, // params parsing may fail - the response contains request id }; self.rpc_duration.observe_duration(&call.method, || { + if self.tracker.status().is_err() { + // Allow only a few RPC (for sync status notification) not requiring index DB being compacted. + match &call.params { + Params::BlockHeader(_) + | Params::BlockHeaders(_) + | Params::HeadersSubscribe + | Params::Version(_) => (), + _ => return error_msg(&call.id, RpcError::UnavailableIndex), + }; + } let result = match &call.params { Params::Banner => Ok(json!(self.banner)), Params::BlockHeader(args) => self.block_header(*args), diff --git a/src/index.rs b/src/index.rs index 5b8a1e54a..037ddb161 100644 --- a/src/index.rs +++ b/src/index.rs @@ -103,6 +103,7 @@ pub struct Index { lookup_limit: Option, chain: Chain, stats: Stats, + is_ready: bool, } impl Index { @@ -133,6 +134,7 @@ impl Index { lookup_limit, chain, stats, + is_ready: false, }) } @@ -179,56 +181,59 @@ impl Index { .filter_map(move |height| self.chain.get_block_hash(height)) } - pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> { - self.stats.observe_db(&self.store); - loop { - let new_headers = self - .stats - .observe_duration("headers", || daemon.get_new_headers(&self.chain))?; - if new_headers.is_empty() { - break; - } - info!( - "indexing {} blocks: [{}..{}]", - new_headers.len(), - new_headers.first().unwrap().height(), - new_headers.last().unwrap().height() - ); - for chunk in new_headers.chunks(self.batch_size) { - exit_flag.poll().with_context(|| { - format!( - "indexing interrupted at height: {}", - chunk.first().unwrap().height() - ) - })?; - let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); - let mut heights = chunk.iter().map(|h| h.height()); + // Return `Ok(true)` when the chain is fully synced and the index is compacted. + pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { + let new_headers = self + .stats + .observe_duration("headers", || daemon.get_new_headers(&self.chain))?; + if new_headers.is_empty() { + self.store.flush(); // full compaction is performed on the first flush call + self.is_ready = true; + return Ok(true); // no more blocks to index (done for now) + } + info!( + "indexing {} blocks: [{}..{}]", + new_headers.len(), + new_headers.first().unwrap().height(), + new_headers.last().unwrap().height() + ); + for chunk in new_headers.chunks(self.batch_size) { + exit_flag.poll().with_context(|| { + format!( + "indexing interrupted at height: {}", + chunk.first().unwrap().height() + ) + })?; + let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); + let mut heights = chunk.iter().map(|h| h.height()); - let mut batch = WriteBatch::default(); - daemon.for_blocks(blockhashes, |_blockhash, block| { - let height = heights.next().expect("unexpected block"); - self.stats.observe_duration("block", || { - index_single_block(block, height).extend(&mut batch) - }); - self.stats.height.set("tip", height as f64); - })?; - let heights: Vec<_> = heights.collect(); - assert!( - heights.is_empty(), - "some blocks were not indexed: {:?}", - heights - ); - batch.sort(); - self.stats.observe_batch(&batch); - self.stats - .observe_duration("write", || self.store.write(&batch)); - self.stats.observe_db(&self.store); - } - self.chain.update(new_headers); - self.stats.observe_chain(&self.chain); + let mut batch = WriteBatch::default(); + daemon.for_blocks(blockhashes, |_blockhash, block| { + let height = heights.next().expect("unexpected block"); + self.stats.observe_duration("block", || { + index_single_block(block, height).extend(&mut batch) + }); + self.stats.height.set("tip", height as f64); + })?; + let heights: Vec<_> = heights.collect(); + assert!( + heights.is_empty(), + "some blocks were not indexed: {:?}", + heights + ); + batch.sort(); + self.stats.observe_batch(&batch); + self.stats + .observe_duration("write", || self.store.write(&batch)); + self.stats.observe_db(&self.store); } - self.store.flush(); - Ok(()) + self.chain.update(new_headers); + self.stats.observe_chain(&self.chain); + Ok(false) // sync is not done + } + + pub(crate) fn is_ready(&self) -> bool { + self.is_ready } } diff --git a/src/server.rs b/src/server.rs index 207fbf040..a13443c9d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -82,11 +82,18 @@ fn serve() -> Result<()> { let new_block_rx = rpc.new_block_notification(); let mut peers = HashMap::::new(); loop { - rpc.sync().context("sync failed")?; // initial sync and compaction may take a few hours - if config.sync_once { - return Ok(()); + // initial sync and compaction may take a few hours + while server_rx.is_empty() { + let done = rpc.sync().context("sync failed")?; // sync a batch of blocks + peers = notify_peers(&rpc, peers); // peers are disconnected on error + if !done { + continue; // more blocks to sync + } + if config.sync_once { + return Ok(()); // exit after initial sync is done + } + break; } - peers = notify_peers(&rpc, peers); // peers are disconnected on error. select! { // Handle signals for graceful shutdown recv(rpc.signal().receiver()) -> result => { diff --git a/src/tracker.rs b/src/tracker.rs index 054ec23b6..7f594072c 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -22,6 +22,10 @@ pub struct Tracker { ignore_mempool: bool, } +pub(crate) enum Error { + NotReady, +} + impl Tracker { pub fn new(config: &Config, metrics: Metrics) -> Result { let store = DBStore::open(&config.db_path, config.auto_reindex)?; @@ -58,13 +62,20 @@ impl Tracker { status.get_unspent(self.index.chain()) } - pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> { - self.index.sync(daemon, exit_flag)?; - if !self.ignore_mempool { + pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { + let done = self.index.sync(daemon, exit_flag)?; + if done && !self.ignore_mempool { self.mempool.sync(daemon); + // TODO: double check tip - and retry on diff + } + Ok(done) + } + + pub(crate) fn status(&self) -> Result<(), Error> { + if self.index.is_ready() { + return Ok(()); } - // TODO: double check tip - and retry on diff - Ok(()) + Err(Error::NotReady) } pub(crate) fn update_scripthash_status(