Skip to content

Commit

Permalink
Refactor Index::sync() a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Nov 19, 2021
1 parent d68869a commit 9839a11
Showing 1 changed file with 43 additions and 33 deletions.
76 changes: 43 additions & 33 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use bitcoin::consensus::{deserialize, serialize};
use bitcoin::{Block, BlockHash, OutPoint, Txid};

use crate::{
chain::Chain,
chain::{Chain, NewHeader},
daemon::Daemon,
db::{DBStore, Row, WriteBatch},
metrics::{self, Gauge, Histogram, Metrics},
Expand Down Expand Up @@ -186,52 +186,62 @@ impl Index {
let new_headers = self
.stats
.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
if new_headers.is_empty() {
self.store.flush(); // full compaction is performed on the first flush call
self.is_ready = true;
return Ok(true); // no more blocks to index (done for now)
match (new_headers.first(), new_headers.last()) {
(Some(first), Some(last)) => {
let count = new_headers.len();
info!(
"indexing {} blocks: [{}..{}]",
count,
first.height(),
last.height()
);
}
_ => {
self.store.flush(); // full compaction is performed on the first flush call
self.is_ready = true;
return Ok(true); // no more blocks to index (done for now)
}
}
info!(
"indexing {} blocks: [{}..{}]",
new_headers.len(),
new_headers.first().unwrap().height(),
new_headers.last().unwrap().height()
);
for chunk in new_headers.chunks(self.batch_size) {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());

let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
});
self.stats.height.set("tip", height as f64);
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
);
batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
self.sync_blocks(daemon, chunk)?;
}
self.chain.update(new_headers);
self.stats.observe_chain(&self.chain);
Ok(false) // sync is not done
}

fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> {
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());

let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
});
self.stats.height.set("tip", height as f64);
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
);
batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
Ok(())
}

pub(crate) fn is_ready(&self) -> bool {
self.is_ready
}
Expand Down

0 comments on commit 9839a11

Please sign in to comment.