Skip to content

Commit

Permalink
Support basic RPC handling during initial sync
Browse files Browse the repository at this point in the history
Following #534 (comment)
  • Loading branch information
romanz committed Nov 19, 2021
1 parent 1b6e5a6 commit d68869a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 58 deletions.
17 changes: 16 additions & 1 deletion src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum RpcError {
// Electrum-specific errors
BadRequest(anyhow::Error),
DaemonError(daemon::RpcError),
UnavailableIndex,
}

impl RpcError {
Expand All @@ -107,6 +108,10 @@ impl RpcError {
},
RpcError::BadRequest(err) => json!({"code": 1, "message": err.to_string()}),
RpcError::DaemonError(err) => json!({"code": 2, "message": err.message}),
RpcError::UnavailableIndex => {
// Internal JSON-RPC error (https://www.jsonrpc.org/specification#error_object)
json!({"code": -32603, "message": "unavailable index"})
}
}
}
}
Expand Down Expand Up @@ -155,7 +160,7 @@ impl Rpc {
self.daemon.new_block_notification()
}

pub fn sync(&mut self) -> Result<()> {
pub fn sync(&mut self) -> Result<bool> {
self.tracker.sync(&self.daemon, self.signal.exit_flag())
}

Expand Down Expand Up @@ -494,6 +499,16 @@ impl Rpc {
Err(response) => return response, // params parsing may fail - the response contains request id
};
self.rpc_duration.observe_duration(&call.method, || {
if self.tracker.status().is_err() {
// Allow only a few RPC (for sync status notification) not requiring index DB being compacted.
match &call.params {
Params::BlockHeader(_)
| Params::BlockHeaders(_)
| Params::HeadersSubscribe
| Params::Version(_) => (),
_ => return error_msg(&call.id, RpcError::UnavailableIndex),
};
}
let result = match &call.params {
Params::Banner => Ok(json!(self.banner)),
Params::BlockHeader(args) => self.block_header(*args),
Expand Down
101 changes: 53 additions & 48 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub struct Index {
lookup_limit: Option<usize>,
chain: Chain,
stats: Stats,
is_ready: bool,
}

impl Index {
Expand Down Expand Up @@ -133,6 +134,7 @@ impl Index {
lookup_limit,
chain,
stats,
is_ready: false,
})
}

Expand Down Expand Up @@ -179,56 +181,59 @@ impl Index {
.filter_map(move |height| self.chain.get_block_hash(height))
}

pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> {
self.stats.observe_db(&self.store);
loop {
let new_headers = self
.stats
.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
if new_headers.is_empty() {
break;
}
info!(
"indexing {} blocks: [{}..{}]",
new_headers.len(),
new_headers.first().unwrap().height(),
new_headers.last().unwrap().height()
);
for chunk in new_headers.chunks(self.batch_size) {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());
// Return `Ok(true)` when the chain is fully synced and the index is compacted.
pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
let new_headers = self
.stats
.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
if new_headers.is_empty() {
self.store.flush(); // full compaction is performed on the first flush call
self.is_ready = true;
return Ok(true); // no more blocks to index (done for now)
}
info!(
"indexing {} blocks: [{}..{}]",
new_headers.len(),
new_headers.first().unwrap().height(),
new_headers.last().unwrap().height()
);
for chunk in new_headers.chunks(self.batch_size) {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());

let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
});
self.stats.height.set("tip", height as f64);
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
);
batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
}
self.chain.update(new_headers);
self.stats.observe_chain(&self.chain);
let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
});
self.stats.height.set("tip", height as f64);
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
);
batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
}
self.store.flush();
Ok(())
self.chain.update(new_headers);
self.stats.observe_chain(&self.chain);
Ok(false) // sync is not done
}

pub(crate) fn is_ready(&self) -> bool {
self.is_ready
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ fn serve() -> Result<()> {
let new_block_rx = rpc.new_block_notification();
let mut peers = HashMap::<usize, Peer>::new();
loop {
rpc.sync().context("sync failed")?; // initial sync and compaction may take a few hours
if config.sync_once {
return Ok(());
// initial sync and compaction may take a few hours
while server_rx.is_empty() {
let done = rpc.sync().context("sync failed")?; // sync a batch of blocks
peers = notify_peers(&rpc, peers); // peers are disconnected on error
if !done {
continue; // more blocks to sync
}
if config.sync_once {
return Ok(()); // exit after initial sync is done
}
break;
}
peers = notify_peers(&rpc, peers); // peers are disconnected on error.
select! {
// Handle signals for graceful shutdown
recv(rpc.signal().receiver()) -> result => {
Expand Down
21 changes: 16 additions & 5 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub struct Tracker {
ignore_mempool: bool,
}

pub(crate) enum Error {
NotReady,
}

impl Tracker {
pub fn new(config: &Config, metrics: Metrics) -> Result<Self> {
let store = DBStore::open(&config.db_path, config.auto_reindex)?;
Expand Down Expand Up @@ -58,13 +62,20 @@ impl Tracker {
status.get_unspent(self.index.chain())
}

pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> {
self.index.sync(daemon, exit_flag)?;
if !self.ignore_mempool {
pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
let done = self.index.sync(daemon, exit_flag)?;
if done && !self.ignore_mempool {
self.mempool.sync(daemon);
// TODO: double check tip - and retry on diff
}
Ok(done)
}

pub(crate) fn status(&self) -> Result<(), Error> {
if self.index.is_ready() {
return Ok(());
}
// TODO: double check tip - and retry on diff
Ok(())
Err(Error::NotReady)
}

pub(crate) fn update_scripthash_status(
Expand Down

0 comments on commit d68869a

Please sign in to comment.