Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Move tx_validation inside mempool. #183

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod acl;
pub mod rpc_client;
pub mod types;

Expand Down
57 changes: 24 additions & 33 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@ mod networking;
mod rpc_server;
mod scheduler;
mod storage;
mod txvalidation;
mod vmm;
mod watchdog;
mod workflow;

use mempool::Mempool;
use storage::{database::entity, Database};

use crate::mempool::CallbackSender;
use crate::networking::WhitelistSyncer;
use crate::txvalidation::{CallbackSender, ValidatedTxReceiver};

fn start_logger(default_level: LevelFilter) {
let filter = match EnvFilter::try_from_default_env() {
Expand Down Expand Up @@ -148,7 +147,7 @@ fn generate_key(opts: KeyOptions) -> Result<()> {
}

#[async_trait]
impl txvalidation::ValidateStorage for storage::Database {
impl mempool::ValidateStorage for storage::Database {
async fn get_tx(&self, hash: &Hash) -> Result<Option<Transaction<Validated>>> {
self.find_transaction(hash).await
}
Expand Down Expand Up @@ -181,6 +180,8 @@ impl mempool::Storage for storage::Database {
}
}

impl crate::mempool::MempoolStorage for storage::Database {}

#[async_trait]
impl workflow::TransactionStore for storage::Database {
async fn find_transaction(&self, tx_hash: &Hash) -> Result<Option<Transaction<Validated>>> {
Expand Down Expand Up @@ -228,9 +229,22 @@ async fn run(config: Arc<Config>) -> Result<()> {
let (tx_sender, rcv_tx_event_rx) =
mpsc::unbounded_channel::<(Transaction<Received>, Option<CallbackSender>)>();

//To show to idea. Should use your config definition
let new_validated_tx_receiver: Arc<RwLock<dyn ValidatedTxReceiver>> = if !config.no_execution {
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));
//Start Mempool
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));
let (txevent_loop_jh, p2p_stream) = mempool::Mempool::start_tx_validation_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
rcv_tx_event_rx,
mempool.clone(),
database.clone(),
database.clone(),
)
.await?;

if !config.no_execution {
//start execution scheduler.
let scheduler_watchdog_sender =
watchdog::start_healthcheck(config.http_healthcheck_listen_addr).await?;
let scheduler = scheduler::start_scheduler(
Expand All @@ -244,30 +258,7 @@ async fn run(config: Arc<Config>) -> Result<()> {

// Run Scheduler in its own task.
tokio::spawn(async move { scheduler.run(scheduler_watchdog_sender).await });
mempool
} else {
struct ArchiveMempool(Arc<Database>);
#[async_trait]
impl ValidatedTxReceiver for ArchiveMempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.0.as_ref().add_transaction(&tx).await
}
}
Arc::new(RwLock::new(ArchiveMempool(database.clone())))
};

// Start Tx process event loop.
let (txevent_loop_jh, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
rcv_tx_event_rx,
new_validated_tx_receiver.clone(),
database.clone(),
)
.await?;
}

let public_node_key = PublicKey::from_secret_key(&node_key);
let node_resources = scheduler::get_configured_resources(&config);
Expand All @@ -280,7 +271,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
Some(config.http_download_port),
config.p2p_advertised_listen_addr,
http_peer_list,
txvalidation::TxEventSender::<txvalidation::P2pSender>::build(tx_sender.clone()),
mempool::TxEventSender::<mempool::P2pSender>::build(tx_sender.clone()),
p2p_stream,
node_resources,
)
Expand Down Expand Up @@ -320,7 +311,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
let rpc_server = rpc_server::RpcServer::run(
config.clone(),
database.clone(),
txvalidation::TxEventSender::<txvalidation::RpcSender>::build(tx_sender),
mempool::TxEventSender::<mempool::RpcSender>::build(tx_sender),
)
.await?;

Expand Down Expand Up @@ -356,7 +347,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> {
None,
config.p2p_advertised_listen_addr,
http_peer_list,
txvalidation::TxEventSender::<txvalidation::P2pSender>::build(tx),
mempool::TxEventSender::<mempool::P2pSender>::build(tx),
p2p_stream,
(0, 0, 0), // P2P beacon node's resources aren't really important.
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use super::ValidatedTxReceiver;
use crate::txvalidation::acl::AclWhitelist;
use crate::txvalidation::download_manager;
use crate::txvalidation::EventProcessError;
use crate::txvalidation::ValidateStorage;
use crate::mempool::txvalidation::download_manager;
use crate::mempool::ValidateStorage;
use crate::types::Hash;
use crate::types::{
transaction::{Received, Validated},
Program, Transaction,
};
use futures::future::join_all;
use futures_util::TryFutureExt;
use gevulot_node::acl;
use lru::LruCache;
use std::collections::HashMap;
use std::fmt::Debug;
Expand All @@ -19,12 +18,30 @@ use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;

const MAX_CACHED_TX_FOR_VERIFICATION: usize = 50;
const MAX_WAITING_TX_FOR_VERIFICATION: usize = 100;
const MAX_WAITING_TIME_IN_MS: u64 = 3600 * 1000; // One hour.

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
pub enum EventProcessError {
#[error("Fail to send the Tx on the channel: {0}")]
PropagateTxError(#[from] Box<tokio::sync::mpsc::error::SendError<Transaction<Validated>>>),
#[error("validation fail: {0}")]
ValidateError(String),
#[error("Tx asset fail to download because {0}")]
DownloadAssetError(String),
#[error("Save Tx error: {0}")]
SaveTxError(String),
#[error("Storage access error: {0}")]
StorageError(String),
#[error("AclWhite list authenticate error: {0}")]
AclWhiteListAuthError(#[from] acl::AclWhiteListError),
}

//event type.
#[derive(Debug, Clone)]
pub struct ReceivedTx;
Expand Down Expand Up @@ -102,9 +119,9 @@ impl From<TxEvent<WaitTx>> for TxEvent<NewTx> {

//Processing of event that arrive: SourceTxType.
impl TxEvent<ReceivedTx> {
pub async fn process_event(
pub async fn verify_tx(
self,
acl_whitelist: &impl AclWhitelist,
acl_whitelist: &impl acl::AclWhitelist,
) -> Result<TxEvent<DownloadTx>, EventProcessError> {
match self.validate_tx(acl_whitelist).await {
Ok(()) => Ok(self.into()),
Expand All @@ -117,7 +134,7 @@ impl TxEvent<ReceivedTx> {
//Tx validation process.
async fn validate_tx(
&self,
acl_whitelist: &impl AclWhitelist,
acl_whitelist: &impl acl::AclWhitelist,
) -> Result<(), EventProcessError> {
self.tx.validate().map_err(|err| {
EventProcessError::ValidateError(format!("Error during transaction validation:{err}",))
Expand All @@ -136,7 +153,7 @@ impl TxEvent<ReceivedTx> {

//Download Tx processing
impl TxEvent<DownloadTx> {
pub async fn process_event(
pub async fn downlod_tx_assets(
self,
local_directory_path: &Path,
http_peer_list: Vec<(SocketAddr, Option<u16>)>,
Expand Down Expand Up @@ -174,7 +191,7 @@ impl TxEvent<DownloadTx> {

// Propagate Tx processing.
impl TxEvent<PropagateTx> {
pub async fn process_event(
pub async fn propagate_tx(
self,
p2p_sender: &UnboundedSender<Transaction<Validated>>,
) -> Result<(), EventProcessError> {
Expand Down Expand Up @@ -204,7 +221,7 @@ impl TxEvent<PropagateTx> {
// Manage Run Tx and put to wait depending on progam.
// Manage Proof and Verify Tx to wait depending on Run Tx.
impl TxEvent<WaitTx> {
pub async fn process_event(
pub async fn validate_tx_dep(
self,
programid_cache: &mut TxCache<Program>,
parent_cache: &mut TxCache<Transaction<Validated>>,
Expand Down Expand Up @@ -366,7 +383,7 @@ impl TxEvent<WaitTx> {
}

impl TxEvent<NewTx> {
pub async fn process_event(
pub async fn save_tx(
self,
newtx_receiver: &mut dyn ValidatedTxReceiver,
) -> Result<(), EventProcessError> {
Expand Down Expand Up @@ -485,7 +502,7 @@ impl<Kind> Default for TxCache<Kind> {
mod tests {

use super::*;
use crate::txvalidation::Created;
use crate::mempool::Created;
use crate::types::transaction::Payload;
use crate::types::transaction::ProgramMetadata;
use crate::types::transaction::{Workflow, WorkflowStep};
Expand Down Expand Up @@ -623,7 +640,7 @@ mod tests {

// Valide Run Tx. Put in Wait cache
let res = run1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Run Tx is waiting
Expand All @@ -634,7 +651,7 @@ mod tests {

// Deploy1 Tx, only return the deploy Tx. Wait for deploy2 program.
let res = deploy1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_progam_cache.waiting_tx.len(), 1);
Expand All @@ -644,7 +661,7 @@ mod tests {

// Deploy2 Tx, return 2 tx, deploy2 + Run.
let res = deploy2_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
//remove from cache
Expand Down Expand Up @@ -762,15 +779,15 @@ mod tests {
// New Tx that will wait.
let tx_event = new_proof_tx_event(parent1_hash);
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);

// New Tx that will wait.
let tx_event = new_proof_tx_event(parent2_hash);
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 2);
Expand All @@ -781,15 +798,15 @@ mod tests {
let tx_event = new_proof_tx_event(parent1_hash);
let tx_hash = tx_event.tx.hash;
let res = tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Evicted but new tx added => len=1.
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);

//Process the parent Tx. Do child Tx return because they have been evicted.
let res = parent1_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
let ret_txs = res.unwrap();
Expand All @@ -814,7 +831,7 @@ mod tests {
let tx1_hash = tx_event1.tx.hash;
let tx1 = tx_event1.tx.clone();
let res = tx_event1
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Save the Tx in db to test cache miss.
Expand All @@ -828,7 +845,7 @@ mod tests {
let tx2_hash = tx2_event.tx.hash;
let tx2 = tx2_event.tx.clone();
let res = tx2_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Save the Tx in db to test cache miss.
Expand All @@ -844,15 +861,15 @@ mod tests {
let tx_event3 = new_proof_tx_event(parent_tx_event.tx.hash);
let tx3_hash = tx_event3.tx.hash;
let res = tx_event3
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 1);
assert!(!wait_tx_cache.is_tx_cached(&tx3_hash));

// Test process parent Tx: Tx3 removed from waiting, Tx3 and parent added to cached. Return Tx3 and parent.
let res = parent_tx_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
assert_eq!(wait_tx_cache.waiting_tx.len(), 0);
Expand All @@ -868,7 +885,7 @@ mod tests {
let tx4_event = new_proof_tx_event(tx1_hash); // tx1_hash not in the cache but in the DB
let tx4_hash = tx4_event.tx.hash;
let res = tx4_event
.process_event(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.validate_tx_dep(&mut wait_progam_cache, &mut wait_tx_cache, &db)
.await;
assert!(res.is_ok());
// Not cached because parent in db
Expand Down
Loading
Loading