Skip to content

Commit

Permalink
feat: adding benchmarker.
Browse files Browse the repository at this point in the history
  • Loading branch information
l-monninger committed Feb 10, 2024
1 parent efe64e5 commit 6cca760
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 2 deletions.
12 changes: 12 additions & 0 deletions m1/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion m1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
resolver = "2"
members = [
"subnet",
"tests/e2e"
"tests/e2e",
"e2e-benchmark"
]

[workspace.package]
Expand All @@ -21,6 +22,7 @@ anyhow = "1.0.62"
futures = "0.3.28"
rand = { version = "0.7.3" }
bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" }
url = "2.2.2"

actix-web = "4"
async-trait = "0.1.53"
Expand Down Expand Up @@ -247,6 +249,8 @@ aptos-vm-types = { path = "../vendors/aptos-core-v2/aptos-move/aptos-vm-types" }
aptos-vm-validator = { path = "../vendors/aptos-core-v2/vm-validator" }
aptos-warp-webserver = { path = "../vendors/aptos-core-v2/crates/aptos-warp-webserver" }
aptos-writeset-generator = { path = "../vendors/aptos-core-v2/aptos-move/writeset-transaction-generator" }
once_cell = "1.8.0"


[patch.crates-io]
serde-reflection = { git = "https://github.com/aptos-labs/serde-reflection", rev = "839aed62a20ddccf043c08961cfe74875741ccba" }
Expand Down
12 changes: 12 additions & 0 deletions m1/e2e-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "e2e-benchmark"
version = "0.1.0"
edition = "2021"

[dependencies]
aptos-sdk = {workspace = true }
anyhow = { workspace = true }
url = { workspace = true }
tokio = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
324 changes: 324 additions & 0 deletions m1/e2e-benchmark/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
use anyhow::{Context, Result};
use aptos_sdk::{
coin_client::CoinClient,
rest_client::{Client, FaucetClient},
types::LocalAccount,
};
use once_cell::sync::Lazy;
use std::{
collections::VecDeque,
fs::File,
io::Write,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{self, time as tokio_time};
use url::Url;
use std::str::FromStr;
use tokio::time::{self, sleep};
use tokio::sync::Mutex;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};

static NODE_URL: Lazy<Url> = Lazy::new(|| {
Url::from_str(
std::env::var("APTOS_NODE_URL")
.as_ref()
.map(|s| s.as_str())
.unwrap_or("https://fullnode.devnet.aptoslabs.com"),
)
.unwrap()
});

static FAUCET_URL: Lazy<Url> = Lazy::new(|| {
Url::from_str(
std::env::var("APTOS_FAUCET_URL")
.as_ref()
.map(|s| s.as_str())
.unwrap_or("https://faucet.devnet.aptoslabs.com"),
)
.unwrap()
});

struct Statistics {
records: Vec<(Instant, bool)>, // (Timestamp, Success)
max_tps: f64,
min_tps: f64,
avg_tps: f64,
}

impl Statistics {
fn new() -> Self {
Self {
records: vec![],
max_tps: 0.0,
min_tps: f64::MAX,
avg_tps: 0.0,
}
}

fn record_transaction(&mut self, success: bool) {
let now = Instant::now();
self.records.push((now, success));
}

fn analyze_tps(&mut self) {
let mut tps_values: Vec<f64> = Vec::new();
// Assuming we calculate TPS over 1-second windows for simplicity
let start_time = self.records.first().map(|x| x.0).unwrap_or(Instant::now());
let end_time = self.records.last().map(|x| x.0).unwrap_or(Instant::now());
let total_duration = end_time.duration_since(start_time).as_secs_f64();

if total_duration > 0.0 {
let mut current_time = start_time;
while current_time <= end_time {
let window_end = current_time + Duration::from_secs(15);
let count = self
.records
.iter()
.filter(|&&(time, _)| time >= current_time && time < window_end)
.count();
let tps = (count/15) as f64;
tps_values.push(tps);

current_time += Duration::from_secs(15);
}

if let Some(max_tps) = tps_values.iter().max_by(|x, y| x.partial_cmp(y).unwrap()) {
self.max_tps = *max_tps;
}
if let Some(min_tps) = tps_values.iter().min_by(|x, y| x.partial_cmp(y).unwrap()) {
self.min_tps = *min_tps;
}

// Calculate average TPS
let sum_tps: f64 = tps_values.iter().sum();
let avg_tps = if !tps_values.is_empty() { sum_tps / tps_values.len() as f64 } else { 0.0 };
self.avg_tps = avg_tps; // Make sure to add `avg_tps` to your Statistics struct
}

// Adjust min_tps if no transactions were recorded to avoid f64::MAX as a value
if self.min_tps == f64::MAX {
self.min_tps = 0.0;
}
}
}

#[tokio::main]
async fn main() -> Result<()> {
// Setup clients and statistics
let stats = Arc::new(Mutex::new(Statistics::new()));
// Setup for benchmarking, transaction sending, etc., goes here
run_simulation(
stats.clone(),
Duration::from_secs(60 * 120)
).await?;

// Wait for benchmark to finish
// Perform analysis
let mut stats = stats.lock().await;
stats.analyze_tps();

println!("Max TPS: {}, Min TPS: {}", stats.max_tps, stats.min_tps);

// Write statistics to a file
let mut file = File::create("benchmark_stats.dat")?;
writeln!(file, "Max TPS: {}", stats.max_tps)?;
writeln!(file, "Min TPS: {}", stats.min_tps)?;
for (timestamp, success) in &stats.records {
writeln!(file, "{}, {}", timestamp.elapsed().as_secs_f64(), if *success { "success" } else { "failure" })?;
}

Ok(())
}

// Dummy function to simulate transaction - replace with actual logic
async fn perform_transaction_batch(
stats: Arc<Mutex<Statistics>>,
) -> Result<()> {

// :!:>section_1a
let rest_client = Client::new(NODE_URL.clone());
let faucet_client = FaucetClient::new(FAUCET_URL.clone(), NODE_URL.clone()); // <:!:section_1a

// :!:>section_1b
let coin_client = CoinClient::new(&rest_client); // <:!:section_1b

// Create two accounts locally, Alice and Bob.
// :!:>section_2
let mut alice = LocalAccount::generate(&mut rand::rngs::OsRng);
let mut bob = LocalAccount::generate(&mut rand::rngs::OsRng); // <:!:section_2


// Create the accounts on chain, but only fund Alice.
// :!:>section_3
match faucet_client
.fund(alice.address(), 100_000_000)
.await
.context("Failed to fund Alice's account") {

Ok(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(true);
},
Err(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(false);
}
};

match faucet_client
.create_account(bob.address())
.await
.context("Failed to fund Bob's account") {

Ok(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(true);
},
Err(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(false);
}

};


// run 16 transfers back and forth
for _ in 0..16 {
let txn_hash = coin_client
.transfer(&mut alice, bob.address(), 1_000, None)
.await
.context("Failed to submit transaction to transfer coins")?;
match rest_client
.wait_for_transaction(&txn_hash)
.await
.context("Failed when waiting for the transfer transaction") {

Ok(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(true);
},
Err(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(false);
}

};

let txn_hash = coin_client
.transfer(&mut bob, alice.address(), 1_000, None)
.await
.context("Failed to submit transaction to transfer coins")?;
match rest_client
.wait_for_transaction(&txn_hash)
.await
.context("Failed when waiting for the transfer transaction") {

Ok(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(true);
},
Err(_) => {
let mut stats = stats.lock().await;
stats.record_transaction(false);
}

};
} // <:!:section_6

Ok(())

}


async fn run_simulation(stats: Arc<Mutex<Statistics>>, duration: Duration) -> Result<()> {
let run_flag = Arc::new(AtomicBool::new(true));
let max_tps = Arc::new(AtomicUsize::new(0));
let current_tasks = Arc::new(AtomicUsize::new(1024 * 64 * 4)); //

// Function to adjust tasks based on performance
let adjust_tasks = |max_tps: &AtomicUsize, current_tps: usize, current_tasks: &AtomicUsize| {
if current_tps > max_tps.load(Ordering::Relaxed) {
max_tps.store(current_tps, Ordering::Relaxed);
let increment = (current_tps as f64 * 0.1) as usize + 1;
current_tasks.fetch_add(increment, Ordering::Relaxed);
} else {
// If current TPS is significantly lower than max, consider reducing increase rate or stop increasing
let threshold = max_tps.load(Ordering::Relaxed) * 50 / 100; // 50% of max TPS as threshold
if current_tps < threshold {
current_tasks.store(
(current_tasks.load(Ordering::Relaxed) as f64 * 0.9) as usize + 4,
Ordering::Relaxed,
);
}
}
};

let mut handles = Vec::new();
let stats_clone = stats.clone();
let run_flag_clone = run_flag.clone();
let max_tps_clone = max_tps.clone();
let current_tasks_clone = current_tasks.clone();

// Background task to adjust the number of parallel tasks
let adjuster_handle = tokio::spawn(async move {
while run_flag_clone.load(Ordering::Relaxed) {
// Wait a bit between adjustments
sleep(Duration::from_secs(20)).await;

// Lock stats to read and calculate current TPS
let mut stats = stats_clone.lock().await;
stats.analyze_tps();
let current_tps = stats.avg_tps.round() as usize;

adjust_tasks(&max_tps_clone, current_tps, &current_tasks_clone);

// Print current strategy status
println!("Current TPS: {}, Max TPS: {}, Current Tasks: {}", current_tps, max_tps_clone.load(Ordering::Relaxed), current_tasks_clone.load(Ordering::Relaxed));
}
});

// Main loop to manage tasks based on current_tasks count
let now = Instant::now();
while run_flag.load(Ordering::Relaxed) && Instant::now() < now + duration{
let current_task_count = current_tasks.load(Ordering::Relaxed);
while handles.len() < current_task_count {
let stats_clone = stats.clone();
let run_flag_clone = run_flag.clone();

let handle = tokio::spawn(async move {
while run_flag_clone.load(Ordering::Relaxed) {
match perform_transaction_batch(stats_clone.clone()).await {
Ok(_) => {},
Err(_) => {}
}
}
});

handles.push(handle);
}

// Optionally remove excess handles if current_tasks decreased
while handles.len() > current_task_count {
if let Some(handle) = handles.pop() {
handle.abort(); // Stop the extra task
}
}

// Sleep a bit before next adjustment check
sleep(Duration::from_secs(1)).await;
}

// Signal all tasks to stop
run_flag.store(false, Ordering::Relaxed);

// Wait for all tasks to complete
for handle in handles {
let _ = handle.await;
}

// Ensure the adjuster task is also completed
let _ = adjuster_handle.await;

Ok(())
}
2 changes: 1 addition & 1 deletion m1/subnet/src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ impl Vm {

async fn build_block_data(&self) -> Result<Vec<u8>, anyhow::Error> {
let unix_now_micro = Utc::now().timestamp_micros() as u64;
let tx_arr = self.get_pending_tx(500).await?;
let tx_arr = self.get_pending_tx(512).await?;
log::info!("build_block pool tx count {}", tx_arr.len());
let executor = self.executor.as_ref().ok_or_else(|| anyhow::anyhow!("Executor not available"))?.read().await;
let signer = self.signer.as_ref().ok_or_else(|| anyhow::anyhow!("Signer not available"))?;
Expand Down

0 comments on commit 6cca760

Please sign in to comment.