diff --git a/src/connectors/onchain_events/mod.rs b/src/connectors/onchain_events/mod.rs index e722e6b..f35ae7e 100644 --- a/src/connectors/onchain_events/mod.rs +++ b/src/connectors/onchain_events/mod.rs @@ -20,6 +20,7 @@ use crate::{ ValidatorMessage, }, storage::store::engine::MempoolMessage, + utils::statsd_wrapper::StatsdClientWrapper, }; sol!( @@ -144,6 +145,7 @@ pub struct Subscriber { mempool_tx: mpsc::Sender, start_block_number: u64, stop_block_number: u64, + statsd_client: StatsdClientWrapper, } // TODO(aditi): Wait for 1 confirmation before "committing" an onchain event. @@ -151,6 +153,7 @@ impl Subscriber { pub fn new( config: Config, mempool_tx: mpsc::Sender, + statsd_client: StatsdClientWrapper, ) -> Result { if config.rpc_url.is_empty() { return Err(SubscribeError::EmptyRpcUrl); @@ -163,9 +166,15 @@ impl Subscriber { mempool_tx, start_block_number: config.start_block_number, stop_block_number: config.stop_block_number, + statsd_client, }) } + fn count(&self, key: &str, value: u64) { + self.statsd_client + .count(format!("onchain_events.{}", key).as_str(), value); + } + async fn add_onchain_event( &mut self, fid: u64, @@ -200,6 +209,21 @@ impl Subscriber { log_index = event.log_index, "Processed onchain event" ); + match event_type { + OnChainEventType::EventTypeNone => {} + OnChainEventType::EventTypeSigner => { + self.count("num_signer_events", 1); + } + OnChainEventType::EventTypeSignerMigrated => { + self.count("num_signer_migrated_events", 1); + } + OnChainEventType::EventTypeIdRegister => { + self.count("num_id_register_events", 1); + } + OnChainEventType::EventTypeStorageRent => { + self.count("num_storage_events", 1); + } + }; let events = self.onchain_events_by_block.get_mut(&block_number); match events { None => { diff --git a/src/main.rs b/src/main.rs index c2fec39..fa9062e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -160,6 +160,7 @@ async fn main() -> Result<(), Box> { let mut onchain_events_subscriber = snapchain::connectors::onchain_events::Subscriber::new( app_config.onchain_events, mempool_tx.clone(), + statsd_client.clone(), )?; tokio::spawn(async move { let result = onchain_events_subscriber.run(false).await;