diff --git a/src/index.rs b/src/index.rs index 037ddb161..143ce89b0 100644 --- a/src/index.rs +++ b/src/index.rs @@ -3,7 +3,7 @@ use bitcoin::consensus::{deserialize, serialize}; use bitcoin::{Block, BlockHash, OutPoint, Txid}; use crate::{ - chain::Chain, + chain::{Chain, NewHeader}, daemon::Daemon, db::{DBStore, Row, WriteBatch}, metrics::{self, Gauge, Histogram, Metrics}, @@ -186,17 +186,22 @@ impl Index { let new_headers = self .stats .observe_duration("headers", || daemon.get_new_headers(&self.chain))?; - if new_headers.is_empty() { - self.store.flush(); // full compaction is performed on the first flush call - self.is_ready = true; - return Ok(true); // no more blocks to index (done for now) + match (new_headers.first(), new_headers.last()) { + (Some(first), Some(last)) => { + let count = new_headers.len(); + info!( + "indexing {} blocks: [{}..{}]", + count, + first.height(), + last.height() + ); + } + _ => { + self.store.flush(); // full compaction is performed on the first flush call + self.is_ready = true; + return Ok(true); // no more blocks to index (done for now) + } } - info!( - "indexing {} blocks: [{}..{}]", - new_headers.len(), - new_headers.first().unwrap().height(), - new_headers.last().unwrap().height() - ); for chunk in new_headers.chunks(self.batch_size) { exit_flag.poll().with_context(|| { format!( @@ -204,34 +209,39 @@ impl Index { chunk.first().unwrap().height() ) })?; - let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); - let mut heights = chunk.iter().map(|h| h.height()); - - let mut batch = WriteBatch::default(); - daemon.for_blocks(blockhashes, |_blockhash, block| { - let height = heights.next().expect("unexpected block"); - self.stats.observe_duration("block", || { - index_single_block(block, height).extend(&mut batch) - }); - self.stats.height.set("tip", height as f64); - })?; - let heights: Vec<_> = heights.collect(); - assert!( - heights.is_empty(), - "some blocks were not indexed: {:?}", - heights - ); - batch.sort(); - self.stats.observe_batch(&batch); - self.stats - .observe_duration("write", || self.store.write(&batch)); - self.stats.observe_db(&self.store); + self.sync_blocks(daemon, chunk)?; } self.chain.update(new_headers); self.stats.observe_chain(&self.chain); Ok(false) // sync is not done } + fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> { + let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); + let mut heights = chunk.iter().map(|h| h.height()); + + let mut batch = WriteBatch::default(); + daemon.for_blocks(blockhashes, |_blockhash, block| { + let height = heights.next().expect("unexpected block"); + self.stats.observe_duration("block", || { + index_single_block(block, height).extend(&mut batch) + }); + self.stats.height.set("tip", height as f64); + })?; + let heights: Vec<_> = heights.collect(); + assert!( + heights.is_empty(), + "some blocks were not indexed: {:?}", + heights + ); + batch.sort(); + self.stats.observe_batch(&batch); + self.stats + .observe_duration("write", || self.store.write(&batch)); + self.stats.observe_db(&self.store); + Ok(()) + } + pub(crate) fn is_ready(&self) -> bool { self.is_ready }