From 6e35bc7d2ef204a4124b7383017d9cd0724f1112 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Wed, 13 Nov 2024 02:22:53 +0800 Subject: [PATCH] Test/TPCC (#243) * test: add Tpcc * fix: stackoverflow with `TPCC` release mode * chore bump fnck_sql version to 0.0.6 --- Cargo.toml | 7 +- README.md | 19 +- src/binder/expr.rs | 4 +- src/binder/update.rs | 8 +- src/execution/dql/aggregate/avg.rs | 2 +- src/expression/evaluator.rs | 6 +- src/function/char_length.rs | 3 +- src/function/lower.rs | 3 +- src/function/upper.rs | 3 +- src/macros/mod.rs | 14 +- src/types/mod.rs | 2 +- src/types/tuple_builder.rs | 4 + src/types/value.rs | 2 +- tpcc/Cargo.toml | 15 + tpcc/src/delivery.rs | 84 ++++ tpcc/src/load.rs | 748 +++++++++++++++++++++++++++++ tpcc/src/main.rs | 167 +++++++ tpcc/src/new_ord.rs | 283 +++++++++++ tpcc/src/order_stat.rs | 117 +++++ tpcc/src/payment.rs | 194 ++++++++ tpcc/src/rt_hist.rs | 135 ++++++ tpcc/src/slev.rs | 69 +++ 22 files changed, 1860 insertions(+), 29 deletions(-) create mode 100644 tpcc/Cargo.toml create mode 100644 tpcc/src/delivery.rs create mode 100644 tpcc/src/load.rs create mode 100644 tpcc/src/main.rs create mode 100644 tpcc/src/new_ord.rs create mode 100644 tpcc/src/order_stat.rs create mode 100644 tpcc/src/payment.rs create mode 100644 tpcc/src/rt_hist.rs create mode 100644 tpcc/src/slev.rs diff --git a/Cargo.toml b/Cargo.toml index 0bbdaaf7..406e3290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "fnck_sql" -version = "0.0.4" +version = "0.0.6" edition = "2021" authors = ["Kould ", "Xwg "] description = "SQL as a Function for Rust" @@ -82,8 +82,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } [workspace] members = [ "tests/sqllogictest", - "tests/macros-test" -, "fnck_sql_serde_macros"] + "tests/macros-test", + "fnck_sql_serde_macros", + "tpcc"] [profile.release] lto = true diff --git a/README.md b/README.md index 5f7f2219..b9c56efd 100755 --- a/README.md +++ b/README.md @@ -54,6 +54,21 @@ let fnck_sql = DataBaseBuilder::path("./data").build()?; let tuples = fnck_sql.run("select * from t1")?; ``` +### TPCC +run `cargo run -p tpcc --release` to run tpcc + +- i9-13900HX +- 32.0 GB +- YMTC PC411-1024GB-B +```shell +<90th Percentile RT (MaxRT)> + New-Order : 0.882 (0.947) + Payment : 0.080 (0.095) +Order-Status : 0.235 (0.255) + Delivery : 5.386 (5.658) + Stock-Level : 0.001 (0.002) +``` + #### PG Wire Service run `cargo run --features="net"` to start server ![start](./static/images/start.gif) @@ -138,8 +153,8 @@ table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: Logi .map(|i| Ok(Tuple { id: None, values: vec![ - Arc::new(DataValue::Int32(Some(i))), - Arc::new(DataValue::Int32(Some(i))), + DataValue::Int32(Some(i)), + DataValue::Int32(Some(i)), ] }))) as Box>>) })); diff --git a/src/binder/expr.rs b/src/binder/expr.rs index e3c5181e..983468af 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -586,7 +586,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> { if !args.is_empty() { ty = args[0].return_type(); - for arg in args.iter() { + for arg in args.iter_mut() { let temp_ty = arg.return_type(); if temp_ty == LogicalType::SqlNull { @@ -595,7 +595,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> { if ty == LogicalType::SqlNull && temp_ty != LogicalType::SqlNull { ty = temp_ty; } else if ty != temp_ty { - return Err(DatabaseError::Incomparable(ty, temp_ty)); + ty = LogicalType::max_logical_type(&ty, &temp_ty)?; } } } diff --git a/src/binder/update.rs b/src/binder/update.rs index 0cd5f00e..b2cc49f6 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -40,7 +40,7 @@ impl Binder<'_, '_, T> { Some(table_name.to_string()), )? { ScalarExpression::ColumnRef(column) => { - let expr = if matches!(expression, ScalarExpression::Empty) { + let mut expr = if matches!(expression, ScalarExpression::Empty) { let default_value = column .default_value()? .ok_or(DatabaseError::DefaultNotExist)?; @@ -48,6 +48,12 @@ impl Binder<'_, '_, T> { } else { expression.clone() }; + if &expr.return_type() != column.datatype() { + expr = ScalarExpression::TypeCast { + expr: Box::new(expr), + ty: column.datatype().clone(), + } + } value_exprs.push((column, expr)); } _ => return Err(DatabaseError::InvalidColumn(ident.to_string())), diff --git a/src/execution/dql/aggregate/avg.rs b/src/execution/dql/aggregate/avg.rs index 404503b7..a18028cc 100644 --- a/src/execution/dql/aggregate/avg.rs +++ b/src/execution/dql/aggregate/avg.rs @@ -45,7 +45,7 @@ impl Accumulator for AvgAccumulator { let quantity_ty = quantity.logical_type(); if value_ty != quantity_ty { - value = DataValue::clone(&value).cast(&quantity_ty)? + value = value.cast(&quantity_ty)? } let evaluator = EvaluatorFactory::binary_create(quantity_ty, BinaryOperator::Divide)?; Ok(evaluator.0.binary_eval(&value, &quantity)) diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index 053cd462..cf8caabb 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -37,7 +37,7 @@ impl ScalarExpression { pub fn eval(&self, tuple: &Tuple, schema: &[ColumnRef]) -> Result { let check_cast = |value: DataValue, return_type: &LogicalType| { if value.logical_type() != *return_type { - return DataValue::clone(&value).cast(return_type); + return value.cast(return_type); } Ok(value) }; @@ -73,9 +73,7 @@ impl ScalarExpression { expr.eval(tuple, schema) } ScalarExpression::TypeCast { expr, ty, .. } => { - let value = expr.eval(tuple, schema)?; - - Ok(DataValue::clone(&value).cast(ty)?) + Ok(expr.eval(tuple, schema)?.cast(ty)?) } ScalarExpression::Binary { left_expr, diff --git a/src/function/char_length.rs b/src/function/char_length.rs index d23cc018..d4eeb834 100644 --- a/src/function/char_length.rs +++ b/src/function/char_length.rs @@ -41,8 +41,7 @@ impl ScalarFunctionImpl for CharLength { let value = exprs[0].eval(tuples, columns)?; let mut value = DataValue::clone(&value); if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { - value = DataValue::clone(&value) - .cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; + value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; } let mut length: u64 = 0; if let DataValue::Utf8 { diff --git a/src/function/lower.rs b/src/function/lower.rs index 86ed71e3..655c7506 100644 --- a/src/function/lower.rs +++ b/src/function/lower.rs @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Lower { let value = exprs[0].eval(tuples, columns)?; let mut value = DataValue::clone(&value); if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { - value = DataValue::clone(&value) - .cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; + value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; } if let DataValue::Utf8 { value: Some(value), diff --git a/src/function/upper.rs b/src/function/upper.rs index bc346aa2..531cc9b0 100644 --- a/src/function/upper.rs +++ b/src/function/upper.rs @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Upper { let value = exprs[0].eval(tuples, columns)?; let mut value = DataValue::clone(&value); if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { - value = DataValue::clone(&value) - .cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; + value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?; } if let DataValue::Utf8 { value: Some(value), diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 76545524..e6bb0b47 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -25,22 +25,20 @@ macro_rules! implement_from_tuple { ($struct_name:ident, ($($field_name:ident : $field_type:ty => $closure:expr),+)) => { impl From<(&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)> for $struct_name { - fn from((schema, tuple): (&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)) -> Self { - fn try_get(tuple: &::fnck_sql::types::tuple::Tuple, schema: &::fnck_sql::types::tuple::SchemaRef, field_name: &str) -> Option<::fnck_sql::types::value::DataValue> { + fn from((schema, mut tuple): (&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)) -> Self { + fn try_get(tuple: &mut ::fnck_sql::types::tuple::Tuple, schema: &::fnck_sql::types::tuple::SchemaRef, field_name: &str) -> Option<::fnck_sql::types::value::DataValue> { let ty = ::fnck_sql::types::LogicalType::type_trans::()?; let (idx, _) = schema .iter() .enumerate() .find(|(_, col)| col.name() == field_name)?; - ::fnck_sql::types::value::DataValue::clone(&tuple.values[idx]) - .cast(&ty) - .ok() + std::mem::replace(&mut tuple.values[idx], ::fnck_sql::types::value::DataValue::Null).cast(&ty).ok() } let mut struct_instance = $struct_name::default(); $( - if let Some(value) = try_get::<$field_type>(&tuple, schema, stringify!($field_name)) { + if let Some(value) = try_get::<$field_type>(&mut tuple, schema, stringify!($field_name)) { $closure( &mut struct_instance, value @@ -103,7 +101,7 @@ macro_rules! scala_function { _index += 1; if value.logical_type() != $arg_ty { - value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?; + value = value.cast(&$arg_ty)?; } value }, )*) @@ -197,7 +195,7 @@ macro_rules! table_function { _index += 1; if value.logical_type() != $arg_ty { - value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?; + value = value.cast(&$arg_ty)?; } value }, )*) diff --git a/src/types/mod.rs b/src/types/mod.rs index d4f0b492..803d6e66 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -335,7 +335,7 @@ impl LogicalType { LogicalType::Float | LogicalType::Double | LogicalType::Decimal(_, _) ), LogicalType::Float => matches!(to, LogicalType::Double | LogicalType::Decimal(_, _)), - LogicalType::Double => false, + LogicalType::Double => matches!(to, LogicalType::Decimal(_, _)), LogicalType::Char(..) => false, LogicalType::Varchar(..) => false, LogicalType::Date => matches!( diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs index 4fb4cc84..d8ecb5a5 100644 --- a/src/types/tuple_builder.rs +++ b/src/types/tuple_builder.rs @@ -34,6 +34,10 @@ impl TupleIdBuilder { } pub fn build(&mut self) -> Option { + if self.tmp_keys.len() != self.primary_indexes.len() { + self.tmp_keys.clear(); + return None; + } (!self.tmp_keys.is_empty()).then(|| { if self.tmp_keys.len() == 1 { self.tmp_keys.pop().unwrap().unwrap() diff --git a/src/types/value.rs b/src/types/value.rs index 27adc59e..01e2eb89 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -1388,7 +1388,7 @@ impl DataValue { LogicalType::Tuple(types) => Ok(if let Some(mut values) = values { for (i, value) in values.iter_mut().enumerate() { if types[i] != value.logical_type() { - *value = DataValue::clone(value).cast(&types[i])?; + *value = mem::replace(value, DataValue::Null).cast(&types[i])?; } } DataValue::Tuple(Some(values)) diff --git a/tpcc/Cargo.toml b/tpcc/Cargo.toml new file mode 100644 index 00000000..54102107 --- /dev/null +++ b/tpcc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tpcc" +version = "0.1.0" +edition = "2021" + +[dependencies] +clap = { version = "4", features = ["derive"] } +chrono = { version = "0.4" } +fnck_sql = { version = "0.0.6", path = "..", package = "fnck_sql" } +indicatif = { version = "0.17" } +ordered-float = { version = "4" } +rand = { version = "0.8" } +rust_decimal = { version = "1" } +tempfile = { version = "3" } +thiserror = { version = "1" } \ No newline at end of file diff --git a/tpcc/src/delivery.rs b/tpcc/src/delivery.rs new file mode 100644 index 00000000..25f32ccb --- /dev/null +++ b/tpcc/src/delivery.rs @@ -0,0 +1,84 @@ +use crate::load::DIST_PER_WARE; +use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; +use chrono::Utc; +use fnck_sql::db::DBTransaction; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; + +#[derive(Debug)] +pub(crate) struct DeliveryArgs { + w_id: usize, + o_carrier_id: usize, +} + +impl DeliveryArgs { + pub(crate) fn new(w_id: usize, o_carrier_id: usize) -> Self { + Self { w_id, o_carrier_id } + } +} + +pub(crate) struct Delivery; +pub(crate) struct DeliveryTest; + +impl TpccTransaction for Delivery { + type Args = DeliveryArgs; + + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + + for d_id in 1..DIST_PER_WARE + 1 { + // "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?" + let (_, tuple) = tx.run(format!("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = {} AND no_w_id = {}", d_id, args.w_id))?; + let no_o_id = tuple[0].values[0].i32().unwrap(); + + if no_o_id == 0 { + continue; + } + // "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?" + let _ = tx.run(format!( + "DELETE FROM new_orders WHERE no_o_id = {} AND no_d_id = {} AND no_w_id = {}", + no_o_id, d_id, args.w_id + ))?; + // "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" + let (_, tuple) = tx.run(format!( + "SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", + no_o_id, d_id, args.w_id + ))?; + let c_id = tuple[0].values[0].i32().unwrap(); + // "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" + let _ = tx.run(format!("UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", args.o_carrier_id, no_o_id, d_id, args.w_id))?; + // "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" + let _ = tx.run(format!("UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", now, no_o_id, d_id, args.w_id))?; + // "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" + let (_, tuple) = tx.run(format!("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", no_o_id, d_id, args.w_id))?; + let ol_total = tuple[0].values[0].decimal().unwrap(); + // "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?" + let _ = tx.run(format!("UPDATE customer SET c_balance = c_balance + {} , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", ol_total, c_id, d_id, args.w_id))?; + } + + Ok(()) + } +} + +impl TpccTest for DeliveryTest { + fn name(&self) -> &'static str { + "Delivery" + } + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + _: &TpccArgs, + ) -> Result<(), TpccError> { + let w_id = rng.gen_range(0..num_ware) + 1; + let o_carrier_id = rng.gen_range(1..10); + + let args = DeliveryArgs::new(w_id, o_carrier_id); + Delivery::run(tx, &args)?; + + Ok(()) + } +} diff --git a/tpcc/src/load.rs b/tpcc/src/load.rs new file mode 100644 index 00000000..0e1a6d54 --- /dev/null +++ b/tpcc/src/load.rs @@ -0,0 +1,748 @@ +use crate::TpccError; +use chrono::Utc; +use fnck_sql::db::Database; +use fnck_sql::storage::Storage; +use indicatif::{ProgressBar, ProgressStyle}; +use rand::rngs::ThreadRng; +use rand::Rng; +use rust_decimal::Decimal; +use std::marker::PhantomData; +use std::ops::Add; +// https://github.com/AgilData/tpcc/blob/master/src/main/java/com/codefutures/tpcc/Load.java + +pub(crate) const MAX_ITEMS: usize = 100_000; +pub(crate) const CUST_PER_DIST: usize = 3_000; +pub(crate) const DIST_PER_WARE: usize = 10; +pub(crate) const ORD_PER_DIST: usize = 3000; + +pub(crate) static MAX_NUM_ITEMS: usize = 15; +pub(crate) static MAX_ITEM_LEN: usize = 24; + +fn generate_string(rng: &mut ThreadRng, min: usize, max: usize) -> String { + let chars: Vec = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + .chars() + .collect(); + + let max = if min == max { + min + } else { + rng.gen_range(min..max) + }; + (0..max) + .map(|_| chars[rng.gen_range(0..chars.len())]) + .collect() +} + +pub(crate) fn nu_rand(rng: &mut ThreadRng, a: usize, x: usize, y: usize) -> usize { + let c = match a { + 255 => 255, + 1023 => 1023, + 8191 => 8191, + _ => unreachable!(), + }; + + (((rng.gen_range(0..a) | rng.gen_range(x..y)) + c) % (y - x + 1)) + x +} + +pub(crate) fn last_name(num: usize) -> String { + let n = [ + "BAR", "OUGHT", "ABLE", "PRI", "PRES", "ESE", "ANTI", "CALLY", "ATION", "EING", + ]; + + let mut name = n[num / 100].to_string(); + name = name.add(n[(num / 10) % 10]); + name = name.add(n[num % 10]); + name +} + +fn init_permutation(rng: &mut ThreadRng) -> [usize; CUST_PER_DIST] { + let mut nums = [0; CUST_PER_DIST]; + let mut temp_nums = [0; CUST_PER_DIST]; + + for i in 0..ORD_PER_DIST { + nums[i] = i + 1; + temp_nums[i] = i + 1; + } + for i in 0..ORD_PER_DIST - 1 { + let j = if i + 1 >= ORD_PER_DIST - 1 { + i + 1 + } else { + rng.gen_range(i + 1..ORD_PER_DIST - 1) + }; + nums[j] = temp_nums[i]; + } + nums +} + +pub struct Load { + phantom: PhantomData, +} + +impl Load { + /// table: item + /// + /// i_id int not null + /// i_im_id int + /// i_name varchar(24), + /// i_price decimal(5, 2) + /// i_data varchar(50) + /// + /// primary key (i_id) + pub fn load_items(rng: &mut ThreadRng, db: &Database) -> Result<(), TpccError> { + let _ = db.run("drop table if exists item;")?; + let _ = db.run( + "create table item ( + i_id int not null, + i_im_id int, + i_name varchar(24), + i_price decimal(5,2), + i_data varchar(50), + PRIMARY KEY(i_id) );", + )?; + let pb = ProgressBar::new(MAX_ITEMS as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading items: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + let orig = Self::gen_orig(rng); + + for i_id in 1..MAX_ITEMS + 1 { + let i_im_id = rng.gen_range(1..10000); + let i_name = generate_string(rng, 14, 24); + let i_price = Decimal::from_f64_retain(rng.gen_range(1.0..1000.0)) + .unwrap() + .round_dp(2); + let mut i_data = generate_string(rng, 26, 50); + if orig[i_id] == 0 { + let pos = rng.gen_range(0..i_data.len() - 8); + let (prefix, suffix) = i_data.split_at(pos); + let (_, remainder) = suffix.split_at(8); + + i_data = format!("{}original{}", prefix, remainder); + } + + let _ = db.run(format!( + "insert into item values ({i_id}, {i_im_id}, '{i_name}', {i_price}, '{i_data}')" + ))?; + pb.set_position(i_id as u64); + } + pb.finish_with_message("load completed!"); + Ok(()) + } + + /// table: warehouse + /// + /// w_id smallint not null + /// w_name varchar(10) + /// w_street_1 varchar(20) + /// w_street_2 varchar(20) + /// w_city varchar(20) + /// w_state char(2) + /// w_zip char(9) + /// w_tax decimal(4, 2) + /// w_ytd decimal(12, 2) + /// + /// primary key (w_id) + pub fn load_warehouses( + rng: &mut ThreadRng, + db: &Database, + num_ware: usize, + ) -> Result<(), TpccError> { + let _ = db.run("drop table if exists warehouse;")?; + let _ = db.run( + "create table warehouse ( + w_id smallint not null, + w_name varchar(10), + w_street_1 varchar(20), + w_street_2 varchar(20), + w_city varchar(20), + w_state char(2), + w_zip char(9), + w_tax decimal(4,2), + w_ytd decimal(12,2), + PRIMARY KEY(w_id) );", + )?; + let _ = db.run("drop table if exists stock;")?; + let _ = db.run( + "create table stock ( + s_i_id int not null, + s_w_id smallint not null, + s_quantity smallint, + s_dist_01 char(24), + s_dist_02 char(24), + s_dist_03 char(24), + s_dist_04 char(24), + s_dist_05 char(24), + s_dist_06 char(24), + s_dist_07 char(24), + s_dist_08 char(24), + s_dist_09 char(24), + s_dist_10 char(24), + s_ytd decimal(8,0), + s_order_cnt smallint, + s_remote_cnt smallint, + s_data varchar(50), + PRIMARY KEY(s_w_id, s_i_id) );", + )?; + let _ = db.run("CREATE INDEX fkey_stock_2 ON stock (s_i_id);")?; + let _ = db.run("drop table if exists district;")?; + let _ = db.run( + "create table district ( + d_id tinyint not null, + d_w_id smallint not null, + d_name varchar(10), + d_street_1 varchar(20), + d_street_2 varchar(20), + d_city varchar(20), + d_state char(2), + d_zip char(9), + d_tax decimal(4,2), + d_ytd decimal(12,2), + d_next_o_id int, + primary key (d_w_id, d_id) );", + )?; + let pb = ProgressBar::new(num_ware as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading warehouses: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + for w_id in 1..num_ware + 1 { + let w_name = generate_string(rng, 6, 10); + let w_street_1 = generate_string(rng, 10, 20); + let w_street_2 = generate_string(rng, 10, 20); + let w_city = generate_string(rng, 10, 20); + let w_state = generate_string(rng, 2, 2); + let w_zip = generate_string(rng, 9, 9); + + let w_tax = Decimal::from_f64_retain(rng.gen_range(1.0..100.0)) + .unwrap() + .round_dp(2); + let w_ytd = Decimal::from_f64_retain(3000000.00).unwrap().round_dp(2); + + let _ = db.run(format!( + "insert into warehouse values({}, '{}', '{}', '{}', '{}', '{}', '{}', {}, {})", + w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd, + ))?; + Self::stock(rng, db, w_id)?; + Self::district(rng, db, w_id)?; + + pb.set_position(w_id as u64); + } + pb.finish_with_message("load completed!"); + println!("analyze stock"); + let _ = db.run("analyze table stock")?; + Ok(()) + } + + pub fn load_custs( + rng: &mut ThreadRng, + db: &Database, + num_ware: usize, + ) -> Result<(), TpccError> { + let _ = db.run("drop table if exists customer;")?; + let _ = db.run( + "create table customer ( + c_id int not null, + c_d_id tinyint not null, + c_w_id smallint not null, + c_first varchar(16), + c_middle char(2), + c_last varchar(16), + c_street_1 varchar(20), + c_street_2 varchar(20), + c_city varchar(20), + c_state char(2), + c_zip char(9), + c_phone char(16), + c_since datetime, + c_credit char(2), + c_credit_lim bigint, + c_discount decimal(4,2), + c_balance decimal(12,2), + c_ytd_payment decimal(12,2), + c_payment_cnt smallint, + c_delivery_cnt smallint, + c_data text, + PRIMARY KEY(c_w_id, c_d_id, c_id) );", + )?; + let _ = db.run("CREATE INDEX idx_customer ON customer (c_w_id,c_d_id,c_last,c_first);")?; + let _ = db.run("drop table if exists history;")?; + let _ = db.run( + "create table history ( + h_c_id int, + h_c_d_id tinyint, + h_c_w_id smallint, + h_d_id tinyint, + h_w_id smallint, + h_date datetime, + h_amount decimal(6,2), + h_data varchar(24), + PRIMARY KEY(h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id) );", + )?; + for w_id in 1..num_ware + 1 { + for d_id in 1..DIST_PER_WARE + 1 { + Self::load_customers(rng, db, d_id, w_id)?; + } + } + println!("analyze customer"); + let _ = db.run("analyze table customer")?; + + Ok(()) + } + + pub fn load_ord( + rng: &mut ThreadRng, + db: &Database, + num_ware: usize, + ) -> Result<(), TpccError> { + let _ = db.run("drop table if exists orders;")?; + let _ = db.run( + "create table orders ( + o_id int not null, + o_d_id tinyint not null, + o_w_id smallint not null, + o_c_id int, + o_entry_d datetime, + o_carrier_id tinyint, + o_ol_cnt tinyint, + o_all_local tinyint, + PRIMARY KEY(o_w_id, o_d_id, o_id) );", + )?; + let _ = db.run("CREATE INDEX idx_orders ON orders (o_w_id,o_d_id,o_c_id,o_id);")?; + let _ = db.run("drop table if exists new_orders;")?; + let _ = db.run( + "create table new_orders ( + no_o_id int not null, + no_d_id tinyint not null, + no_w_id smallint not null, + PRIMARY KEY(no_w_id, no_d_id, no_o_id));", + )?; + let _ = db.run("drop table if exists order_line;")?; + let _ = db.run( + "create table order_line ( + ol_o_id int not null, + ol_d_id tinyint not null, + ol_w_id smallint not null, + ol_number tinyint not null, + ol_i_id int, + ol_supply_w_id smallint, + ol_delivery_d datetime, + ol_quantity tinyint, + ol_amount decimal(6,2), + ol_dist_info char(24), + PRIMARY KEY(ol_w_id, ol_d_id, ol_o_id, ol_number) );", + )?; + let _ = db.run("CREATE INDEX fkey_order_line_2 ON order_line (ol_supply_w_id,ol_i_id);")?; + for w_id in 1..num_ware + 1 { + for d_id in 1..DIST_PER_WARE + 1 { + Self::load_orders(rng, db, d_id, w_id)?; + } + } + println!("analyze orders & order_line"); + let _ = db.run("analyze table orders")?; + let _ = db.run("analyze table order_line")?; + + Ok(()) + } + + /// table: stock + /// + /// s_i_id int not null + /// s_w_id smallint not null + /// s_quantity smallint + /// s_dist_01 char(24) + /// s_dist_02 char(24) + /// s_dist_03 char(24) + /// s_dist_04 char(24) + /// s_dist_05 char(24) + /// s_dist_06 char(24) + /// s_dist_07 char(24) + /// s_dist_08 char(24) + /// s_dist_09 char(24) + /// s_dist_10 char(24) + /// s_ytd decimal(8,0) + /// s_order_cnt smallint + /// s_remote_cnt smallint + /// s_data varchar(50) + /// + /// primary key(s_w_id, s_i_id) + pub fn stock(rng: &mut ThreadRng, db: &Database, w_id: usize) -> Result<(), TpccError> { + let pb = ProgressBar::new(MAX_ITEMS as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading stock: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + let s_w_id = w_id; + let orig = Self::gen_orig(rng); + + for s_i_id in 1..MAX_ITEMS + 1 { + let s_quantity = rng.gen_range(10..100); + let s_dist_01 = generate_string(rng, 24, 24); + let s_dist_02 = generate_string(rng, 24, 24); + let s_dist_03 = generate_string(rng, 24, 24); + let s_dist_04 = generate_string(rng, 24, 24); + let s_dist_05 = generate_string(rng, 24, 24); + let s_dist_06 = generate_string(rng, 24, 24); + let s_dist_07 = generate_string(rng, 24, 24); + let s_dist_08 = generate_string(rng, 24, 24); + let s_dist_09 = generate_string(rng, 24, 24); + let s_dist_10 = generate_string(rng, 24, 24); + + let s_data = if orig[s_i_id] != 0 { + "original".to_string() + } else { + generate_string(rng, 26, 50) + }; + let _ = db.run(format!( + "insert into stock values({}, {}, {}, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', {}, {}, {}, '{}')", + s_i_id, + s_w_id, + s_quantity, + s_dist_01, + s_dist_02, + s_dist_03, + s_dist_04, + s_dist_05, + s_dist_06, + s_dist_07, + s_dist_08, + s_dist_09, + s_dist_10, + 0, + 0, + 0, + s_data, + ))?; + pb.set_position(s_i_id as u64); + } + pb.finish_with_message("load completed!"); + + Ok(()) + } + + #[allow(unused_assignments)] + fn gen_orig(rng: &mut ThreadRng) -> Vec { + let mut orig = vec![0; MAX_ITEMS + 1]; + + for _ in 0..MAX_ITEMS / 10 { + let mut pos = 0; + loop { + pos = rng.gen_range(0..MAX_ITEMS); + + if orig[pos] == 0 { + break; + } + } + orig[pos] = 1; + } + orig + } + + /// table: district + /// + /// d_id tinyint not null + /// d_w_id smallint not null + /// d_name varchar(10) + /// d_street_1 varchar(20) + /// d_street_2 varchar(20) + /// d_city varchar(20) + /// d_state char(2) + /// d_zip char(9) + /// d_tax decimal(4,2) + /// d_ytd decimal(12,2) + /// d_next_o_id int + /// + /// + /// primary key (d_w_id, d_id) + pub fn district(rng: &mut ThreadRng, db: &Database, w_id: usize) -> Result<(), TpccError> { + let pb = ProgressBar::new(DIST_PER_WARE as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading district: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + let d_w_id = w_id; + let d_ytd = Decimal::from_f64_retain(30000.0).unwrap().round_dp(2); + let d_next_o_id = 3001; + + for d_id in 1..DIST_PER_WARE + 1 { + let d_name = generate_string(rng, 6, 10); + let d_street_1 = generate_string(rng, 10, 20); + let d_street_2 = generate_string(rng, 10, 20); + let d_city = generate_string(rng, 10, 20); + let d_state = generate_string(rng, 2, 2); + let d_zip = generate_string(rng, 9, 9); + + let d_tax = Decimal::from_f64_retain(rng.gen_range(0.1..0.2)) + .unwrap() + .round_dp(2); + + let _ = db.run(format!( + "insert into district values({}, {}, '{}', '{}', '{}', '{}', '{}', '{}', {}, {}, {})", + d_id, + d_w_id, + d_name, + d_street_1, + d_street_2, + d_city, + d_state, + d_zip, + d_tax, + d_ytd, + d_next_o_id, + ))?; + pb.set_position(d_id as u64); + } + pb.finish_with_message("load completed!"); + + Ok(()) + } + + /// table: customer + /// + /// c_id int not null + /// c_d_id tinyint not null + /// c_w_id smallint not null + /// c_first varchar(16) + /// c_middle char(2), + /// c_last varchar(16) + /// c_street_1 varchar(20) + /// c_street_2 varchar(20) + /// c_city varchar(20) + /// c_state char(2) + /// c_zip char(9) + /// c_phone char(16) + /// c_since datetime + /// c_credit char(2) + /// c_credit_lim bigint + /// c_discount decimal(4, 2) + /// c_balance decimal(12, 2) + /// c_ytd_payment decimal(12, 2) + /// c_payment_cnt smallint + /// c_delivery_cnt smallint + /// c_date text, + /// + /// primary key(c_w_id, c_d_id, c_id) + /// + /// + /// table: history + /// + /// h_c_id int + /// h_c_d_id tinyint + /// h_c_w_id smallint + /// h_d_id tinyint + /// h_w_id smallint + /// h_date datetime + /// h_amount decimal(6, 2) + /// h_data varchar(24) + pub fn load_customers( + rng: &mut ThreadRng, + db: &Database, + d_id: usize, + w_id: usize, + ) -> Result<(), TpccError> { + let pb = ProgressBar::new(CUST_PER_DIST as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading customers: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + let date = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + + for c_id in 1..CUST_PER_DIST + 1 { + let c_d_id = d_id; + let c_w_id = w_id; + + let c_first = generate_string(rng, 8, 16); + let c_middle = "OE"; + + let num = if c_id <= 1000 { + c_id - 1 + } else { + nu_rand(rng, 255, 0, 999) + }; + let c_last = last_name(num); + let c_street_1 = generate_string(rng, 10, 20); + let c_street_2 = generate_string(rng, 10, 20); + let c_city = generate_string(rng, 10, 20); + let c_state = generate_string(rng, 2, 2); + let c_zip = generate_string(rng, 9, 9); + + let c_phone = generate_string(rng, 16, 16); + let c_since = &date; + let c_credit = if rng.gen_range(0..1) == 1 { "GC" } else { "BC" }; + let c_credit_lim = 50000; + let c_discount = Decimal::from_f64_retain(rng.gen_range(0.0..0.5)) + .unwrap() + .round_dp(2); + let c_balance = "-10.00"; + + let c_ytd_payment = "10.00"; + let c_payment_cnt = 1; + let c_delivery_cnt = 0; + + let c_data = generate_string(rng, 300, 500); + + let _ = db.run(format!( + "insert into customer values({}, {}, {}, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', {}, {}, {}, {}, {}, {}, '{}')", + c_id, + c_d_id, + c_w_id, + c_first, + c_middle, + c_last, + c_street_1, + c_street_2, + c_city, + c_state, + c_zip, + c_phone, + c_since, + c_credit, + c_credit_lim, + c_discount, + c_balance, + c_ytd_payment, + c_payment_cnt, + c_delivery_cnt, + c_data, + ))?; + + let h_date = &date; + let h_amount = Decimal::from_f64_retain(10.0).unwrap().round_dp(2); + let h_data = generate_string(rng, 12, 24); + + let _ = db.run(format!( + "insert into history values({}, {}, {}, {}, {}, '{}', {}, '{}')", + c_id, c_d_id, c_w_id, c_d_id, c_w_id, h_date, h_amount, h_data, + ))?; + pb.set_position(c_id as u64); + } + pb.finish_with_message("load completed!"); + + Ok(()) + } + + /// table: order + /// + /// o_id int not null + /// o_d_id tinyint not null + /// o_w_id smallint not null + /// o_c_id int + /// o_entry_d datetime + /// o_carrier_id tinyint + /// o_ol_cnt tinyint + /// o_all_local tinyint + /// + /// primary key(o_w_id, o_d_id, o_id) + /// + /// + /// table: new_order + /// + /// no_o_id int not null, + /// no_d_id tinyint not null, + /// no_w_id smallint not null, + /// + /// primary key(no_w_id, no_d_id, no_o_id) + /// + /// + /// table: order_line + /// + /// ol_o_id int not null + /// ol_d_id tinyint not null + /// ol_w_id smallint not null + /// ol_number tinyint not null + /// ol_i_id int + /// ol_supply_w_id smallint + /// ol_delivery_d datetime + /// ol_quantity tinyint + /// ol_amount decimal(6,2) + /// ol_dist_info char(24) + /// + /// primary key(ol_w_id, ol_d_id, ol_o_id, ol_number) + pub fn load_orders( + rng: &mut ThreadRng, + db: &Database, + d_id: usize, + w_id: usize, + ) -> Result<(), TpccError> { + let pb = ProgressBar::new(ORD_PER_DIST as u64); + pb.set_style( + ProgressStyle::default_bar() + .template( + "[loading orders: {elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}", + ) + .unwrap(), + ); + let o_d_id = d_id; + let o_w_id = w_id; + + let nums = init_permutation(rng); + + for o_id in 1..ORD_PER_DIST + 1 { + let o_c_id = nums[o_id - 1]; + let o_carrier_id = rng.gen_range(1..10); + let o_ol_cnt = rng.gen_range(5..15); + + let date = format!("'{}'", Utc::now().format("%Y-%m-%d %H:%M:%S")); + + let o_carrier_id = if o_id > 2100 { + let _ = db.run(format!( + "insert into new_orders values({}, {}, {})", + o_id, o_d_id, o_w_id, + ))?; + "null".to_string() + } else { + o_carrier_id.to_string() + }; + let _ = db.run(format!( + "insert into orders values({}, {}, {}, {}, {}, {}, {}, {})", + o_id, o_d_id, o_w_id, o_c_id, date, o_carrier_id, o_ol_cnt, "1", + ))?; + + for ol in 1..o_ol_cnt + 1 { + let ol_i_id = rng.gen_range(1..MAX_ITEMS); + let ol_supply_w_id = o_w_id; + let ol_quantity = 4; + let ol_amount = 0.0; + + let ol_dist_info = generate_string(rng, 24, 24); + let (ol_delivery_d, ol_amount) = if o_id > 2100 { + ("null", ol_amount) + } else { + (date.as_str(), rng.gen_range(0.1..100.0)) + }; + let _ = db.run(format!( + "insert into order_line values({}, {}, {}, {}, {}, {}, {}, {}, {}, '{}')", + o_id, + o_d_id, + o_w_id, + ol, + ol_i_id, + ol_supply_w_id, + ol_delivery_d, + ol_quantity, + ol_amount, + ol_dist_info, + ))?; + } + pb.set_position((o_id - 1) as u64); + } + pb.finish_with_message("load completed!"); + + Ok(()) + } +} diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs new file mode 100644 index 00000000..03881bdf --- /dev/null +++ b/tpcc/src/main.rs @@ -0,0 +1,167 @@ +use crate::delivery::DeliveryTest; +use crate::load::Load; +use crate::new_ord::NewOrdTest; +use crate::order_stat::OrderStatTest; +use crate::payment::PaymentTest; +use crate::rt_hist::RtHist; +use crate::slev::SlevTest; +use clap::Parser; +use fnck_sql::db::{DBTransaction, DataBaseBuilder}; +use fnck_sql::errors::DatabaseError; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; +use std::time::{Duration, Instant}; + +mod delivery; +mod load; +mod new_ord; +mod order_stat; +mod payment; +mod rt_hist; +mod slev; + +pub(crate) const ALLOW_MULTI_WAREHOUSE_TX: bool = true; + +pub(crate) trait TpccTransaction { + type Args; + + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError>; +} + +pub(crate) trait TpccTest { + fn name(&self) -> &'static str; + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + args: &TpccArgs, + ) -> Result<(), TpccError>; +} + +struct TpccArgs { + joins: bool, +} + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + #[clap(long, default_value = "false")] + joins: bool, + #[clap(long, default_value = "fnck_sql_tpcc")] + path: String, + #[clap(long, default_value = "5")] + max_retry: usize, + #[clap(long, default_value = "1080")] + measure_time: u64, + #[clap(long, default_value = "1")] + num_ware: usize, +} + +fn main() -> Result<(), TpccError> { + let args = Args::parse(); + + let mut rng = rand::thread_rng(); + let database = DataBaseBuilder::path(&args.path).build()?; + + Load::load_items(&mut rng, &database)?; + Load::load_warehouses(&mut rng, &database, args.num_ware)?; + Load::load_custs(&mut rng, &database, args.num_ware)?; + Load::load_ord(&mut rng, &database, args.num_ware)?; + + let mut rt_hist = RtHist::new(); + let tests = vec![ + Box::new(NewOrdTest) as Box>, + Box::new(PaymentTest), + Box::new(OrderStatTest), + Box::new(DeliveryTest), + Box::new(SlevTest), + ]; + let tpcc_args = TpccArgs { joins: args.joins }; + + let tpcc_start = Instant::now(); + let duration = Duration::new(args.measure_time, 0); + let mut round_count = 0; + + while tpcc_start.elapsed() < duration { + for (i, tpcc_test) in tests.iter().enumerate() { + let mut is_succeed = false; + for j in 0..args.max_retry + 1 { + let transaction_start = Instant::now(); + let mut tx = database.new_transaction()?; + + if let Err(err) = + tpcc_test.do_transaction(&mut rng, &mut tx, args.num_ware, &tpcc_args) + { + eprintln!( + "[{}] Error while doing transaction: {}", + tpcc_test.name(), + err + ); + } else { + rt_hist.hist_inc(i, transaction_start.elapsed()); + is_succeed = true; + break; + } + if j < args.max_retry { + println!("[{}] Retry for the {}th time", tpcc_test.name(), j + 1); + } + } + if !is_succeed { + return Err(TpccError::MaxRetry); + } + } + if round_count != 0 && round_count % 4 == 0 { + println!( + "============ TPCC CheckPoint {} on round {round_count}: ===============", + round_count / 4 + ); + for (i, name) in vec![ + "New-Order", + "Payment", + "Order-Status", + "Delivery", + "Stock-Level", + ] + .into_iter() + .enumerate() + { + println!("{name} 90th Percentile RT: {:.3}", rt_hist.hist_ckp(i)); + } + println!("=========================================================="); + } + round_count += 1; + } + rt_hist.hist_report(); + + Ok(()) +} + +fn other_ware(rng: &mut ThreadRng, home_ware: usize, num_ware: usize) -> usize { + if num_ware == 1 { + return home_ware; + } + + loop { + let tmp = rng.gen_range(1..num_ware); + if tmp != home_ware { + return tmp; + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum TpccError { + #[error("fnck_sql: {0}")] + Database( + #[source] + #[from] + DatabaseError, + ), + #[error("tuples is empty")] + EmptyTuples, + #[error("maximum retries reached")] + MaxRetry, +} diff --git a/tpcc/src/new_ord.rs b/tpcc/src/new_ord.rs new file mode 100644 index 00000000..6ba87f44 --- /dev/null +++ b/tpcc/src/new_ord.rs @@ -0,0 +1,283 @@ +use crate::load::{nu_rand, CUST_PER_DIST, DIST_PER_WARE, MAX_ITEMS, MAX_NUM_ITEMS}; +use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; +use chrono::Utc; +use fnck_sql::db::DBTransaction; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; +use rust_decimal::Decimal; + +#[derive(Debug)] +pub(crate) struct NewOrdArgs { + joins: bool, + w_id: usize, + d_id: usize, + c_id: usize, + o_ol_cnt: usize, + o_all_local: u8, + item_id: Vec, + supware: Vec, + qty: Vec, +} + +impl NewOrdArgs { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + joins: bool, + w_id: usize, + d_id: usize, + c_id: usize, + o_ol_cnt: usize, + o_all_local: u8, + item_id: Vec, + supware: Vec, + qty: Vec, + ) -> Self { + Self { + joins, + w_id, + d_id, + c_id, + o_ol_cnt, + o_all_local, + item_id, + supware, + qty, + } + } +} + +pub(crate) struct NewOrd; +pub(crate) struct NewOrdTest; + +impl TpccTransaction for NewOrd { + type Args = NewOrdArgs; + + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + let mut price = vec![Decimal::default(); MAX_NUM_ITEMS]; + let mut iname = vec![String::new(); MAX_NUM_ITEMS]; + let mut stock = vec![0; MAX_NUM_ITEMS]; + let mut bg = vec![String::new(); MAX_NUM_ITEMS]; + let mut amt = vec![Decimal::default(); MAX_NUM_ITEMS]; + let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + + let (c_discount, c_last, c_credit, w_tax) = if args.joins { + // "SELECT c_discount, c_last, c_credit, w_tax FROM customer, warehouse WHERE w_id = ? AND c_w_id = w_id AND c_d_id = ? AND c_id = ?" + let (_, tuple) = tx.run(format!("SELECT c.c_discount, c.c_last, c.c_credit, w.w_tax FROM customer AS c JOIN warehouse AS w ON c.c_w_id = w_id AND w.w_id = {} AND c.c_w_id = {} AND c.c_d_id = {} AND c.c_id = {}", args.w_id, args.w_id, args.d_id, args.c_id))?; + let c_discount = tuple[0].values[0].decimal().unwrap(); + let c_last = tuple[0].values[1].utf8().unwrap(); + let c_credit = tuple[0].values[2].utf8().unwrap(); + let w_tax = tuple[0].values[3].decimal().unwrap(); + + (c_discount, c_last, c_credit, w_tax) + } else { + // "SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let (_, tuple) = tx.run(format!("SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.w_id, args.d_id, args.c_id))?; + let c_discount = tuple[0].values[0].decimal().unwrap(); + let c_last = tuple[0].values[1].utf8().unwrap(); + let c_credit = tuple[0].values[2].utf8().unwrap(); + // "SELECT w_tax FROM warehouse WHERE w_id = ?" + let (_, tuple) = tx.run(format!( + "SELECT w_tax FROM warehouse WHERE w_id = {}", + args.w_id + ))?; + let w_tax = tuple[0].values[0].decimal().unwrap(); + + (c_discount, c_last, c_credit, w_tax) + }; + // "SELECT d_next_o_id, d_tax FROM district WHERE d_id = ? AND d_w_id = ? FOR UPDATE" + let (_, tuple) = tx.run(format!( + "SELECT d_next_o_id, d_tax FROM district WHERE d_id = {} AND d_w_id = {}", + args.d_id, args.w_id + ))?; + let d_next_o_id = tuple[0].values[0].i32().unwrap(); + let d_tax = tuple[0].values[1].decimal().unwrap(); + // "UPDATE district SET d_next_o_id = ? + 1 WHERE d_id = ? AND d_w_id = ?" + let _ = tx.run(format!( + "UPDATE district SET d_next_o_id = {} + 1 WHERE d_id = {} AND d_w_id = {}", + d_next_o_id, args.d_id, args.w_id + ))?; + let o_id = d_next_o_id; + // "INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES(?, ?, ?, ?, ?, ?, ?)" + let _ = tx.run(format!("INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES({}, {}, {}, {}, '{}', {}, {})", o_id, args.d_id, args.w_id, args.c_id, now, args.o_ol_cnt, args.o_all_local))?; + // "INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES (?,?,?) + let _ = tx.run(format!( + "INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES ({},{},{})", + o_id, args.d_id, args.w_id + ))?; + let mut ol_num_seq = vec![0; MAX_NUM_ITEMS]; + + for i in 0..args.o_ol_cnt { + ol_num_seq[i] = i; + } + for i in 0..args.o_ol_cnt - 1 { + let mut tmp_0 = + (MAX_ITEMS + 1) * args.supware[ol_num_seq[i]] + args.item_id[ol_num_seq[i]]; + let mut min_num = i; + for j in i + 1..args.o_ol_cnt { + let tmp_1 = + (MAX_ITEMS + 1) * args.supware[ol_num_seq[j]] + args.item_id[ol_num_seq[j]]; + if tmp_1 < tmp_0 { + tmp_0 = tmp_1; + min_num = j; + } + } + if min_num != i { + let swp = ol_num_seq[min_num]; + ol_num_seq[min_num] = ol_num_seq[i]; + ol_num_seq[i] = swp; + } + } + for ol_number in 1..args.o_ol_cnt + 1 { + let ol_supply_w_id = args.supware[ol_num_seq[ol_number - 1]]; + let ol_i_id = args.item_id[ol_num_seq[ol_number - 1]]; + let ol_quantity = args.qty[ol_num_seq[ol_number - 1]]; + // "SELECT i_price, i_name, i_data FROM item WHERE i_id = ?" + let (_, tuples) = tx.run(format!( + "SELECT i_price, i_name, i_data FROM item WHERE i_id = {}", + ol_i_id + ))?; + if tuples.is_empty() { + return Err(TpccError::EmptyTuples); + } + let i_price = tuples[0].values[0].decimal().unwrap(); + let i_name = tuples[0].values[1].utf8().unwrap(); + let i_data = tuples[0].values[2].utf8().unwrap(); + + price[ol_num_seq[ol_number - 1]] = i_price; + iname[ol_num_seq[ol_number - 1]] = i_name; + + // "SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = ? AND s_w_id = ? FOR UPDATE" + let (_, tuples) = tx.run(format!("SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = {} AND s_w_id = {}", ol_i_id, ol_supply_w_id))?; + let mut s_quantity = tuples[0].values[0].i16().unwrap(); + let s_data = tuples[0].values[1].utf8().unwrap(); + let s_dist_01 = tuples[0].values[2].utf8().unwrap(); + let s_dist_02 = tuples[0].values[3].utf8().unwrap(); + let s_dist_03 = tuples[0].values[4].utf8().unwrap(); + let s_dist_04 = tuples[0].values[5].utf8().unwrap(); + let s_dist_05 = tuples[0].values[6].utf8().unwrap(); + let s_dist_06 = tuples[0].values[7].utf8().unwrap(); + let s_dist_07 = tuples[0].values[8].utf8().unwrap(); + let s_dist_08 = tuples[0].values[9].utf8().unwrap(); + let s_dist_09 = tuples[0].values[10].utf8().unwrap(); + let s_dist_10 = tuples[0].values[11].utf8().unwrap(); + + let ol_dist_info = pick_dist_info( + args.d_id, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, + s_dist_07, s_dist_08, s_dist_09, s_dist_10, + ); + stock[ol_num_seq[ol_number - 1]] = s_quantity; + bg[ol_num_seq[ol_number - 1]] = + if i_data.contains("original") && s_data.contains("original") { + "B" + } else { + "C" + } + .to_string(); + s_quantity = if s_quantity > ol_quantity as i16 { + s_quantity - ol_quantity as i16 + } else { + s_quantity - ol_quantity as i16 + 91 + }; + // "UPDATE stock SET s_quantity = ? WHERE s_i_id = ? AND s_w_id = ?" + let _ = tx.run(format!( + "UPDATE stock SET s_quantity = {} WHERE s_i_id = {} AND s_w_id = {}", + s_quantity, ol_i_id, ol_supply_w_id + ))?; + + // Tips: Integers always have 7 digits, so divide by 10 here + let mut ol_amount = Decimal::from(ol_quantity) + * i_price + * (Decimal::from(1) + w_tax + d_tax) + * (Decimal::from(1) - c_discount).round_dp(2); + while ol_amount.mantissa() > 4 { + ol_amount = ol_amount / Decimal::from(10); + } + + amt[ol_num_seq[ol_number - 1]] = ol_amount; + // "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + let _ = tx.run(format!("INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, '{}')", o_id, args.d_id, args.w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info))?; + } + + Ok(()) + } +} + +impl TpccTest for NewOrdTest { + fn name(&self) -> &'static str { + "New-Order" + } + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + args: &TpccArgs, + ) -> Result<(), TpccError> { + let mut all_local = 1; + let notfound = MAX_ITEMS + 1; + + let mut itemid = vec![0; MAX_NUM_ITEMS]; + let mut supware = vec![0; MAX_NUM_ITEMS]; + let mut qty = vec![0; MAX_NUM_ITEMS]; + + let w_id = rng.gen_range(0..num_ware) + 1; + let d_id = rng.gen_range(1..DIST_PER_WARE); + let c_id = nu_rand(rng, 1023, 1, CUST_PER_DIST); + let ol_cnt = rng.gen_range(5..15); + let rbk = rng.gen_range(1..100); + + for i in 0..ol_cnt { + itemid[i] = nu_rand(rng, 8191, 1, MAX_ITEMS); + if (i == ol_cnt - 1) && (rbk == 1) { + itemid[i] = notfound; + } + if ALLOW_MULTI_WAREHOUSE_TX { + if rng.gen_range(1..100) != 1 { + supware[i] = w_id; + } else { + supware[i] = other_ware(rng, w_id, num_ware); + all_local = 0; + } + } else { + supware[i] = w_id; + } + qty[i] = rng.gen_range(1..10); + } + let args = NewOrdArgs::new( + args.joins, w_id, d_id, c_id, ol_cnt, all_local, itemid, supware, qty, + ); + NewOrd::run(tx, &args)?; + + Ok(()) + } +} + +fn pick_dist_info( + ol_supply_w_id: usize, + s_dist_01: String, + s_dist_02: String, + s_dist_03: String, + s_dist_04: String, + s_dist_05: String, + s_dist_06: String, + s_dist_07: String, + s_dist_08: String, + s_dist_09: String, + s_dist_10: String, +) -> String { + match ol_supply_w_id { + 1 => s_dist_01, + 2 => s_dist_02, + 3 => s_dist_03, + 4 => s_dist_04, + 5 => s_dist_05, + 6 => s_dist_06, + 7 => s_dist_07, + 8 => s_dist_08, + 9 => s_dist_09, + 10 => s_dist_10, + _ => unreachable!(), + } +} diff --git a/tpcc/src/order_stat.rs b/tpcc/src/order_stat.rs new file mode 100644 index 00000000..73642953 --- /dev/null +++ b/tpcc/src/order_stat.rs @@ -0,0 +1,117 @@ +use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; +use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; +use fnck_sql::db::DBTransaction; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; +use rust_decimal::Decimal; + +#[derive(Debug)] +pub(crate) struct OrderStatArgs { + w_id: usize, + d_id: usize, + by_name: bool, + c_id: usize, + c_last: String, +} + +impl OrderStatArgs { + pub(crate) fn new( + w_id: usize, + d_id: usize, + by_name: bool, + c_id: usize, + c_last: String, + ) -> Self { + Self { + w_id, + d_id, + by_name, + c_id, + c_last, + } + } +} + +pub(crate) struct OrderStat; +pub(crate) struct OrderStatTest; + +impl TpccTransaction for OrderStat { + type Args = OrderStatArgs; + + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + let (c_balance, c_first, c_middle, c_last) = if args.by_name { + // SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" + let (_, tuples) = tx.run(format!("SELECT count(c_id) FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}'", args.w_id, args.d_id, args.c_last))?; + let mut name_cnt = tuples[0].values[0].i32().unwrap() as usize; + // SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" + let (_, tuples) = tx.run(format!("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", args.w_id, args.d_id, args.c_last))?; + + if name_cnt % 2 == 1 { + name_cnt += 1; + } + let mut c_balance = Decimal::default(); + let mut c_first = String::new(); + let mut c_middle = String::new(); + let mut c_last = String::new(); + + for n in 0..name_cnt / 2 { + c_balance = tuples[n].values[0].decimal().unwrap(); + c_first = tuples[n].values[1].utf8().unwrap(); + c_middle = tuples[n].values[2].utf8().unwrap(); + c_last = tuples[n].values[3].utf8().unwrap(); + } + (c_balance, c_first, c_middle, c_last) + } else { + // "SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let (_, tuples) = tx.run(format!("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.w_id, args.d_id, args.c_id))?; + let c_balance = tuples[0].values[0].decimal().unwrap(); + let c_first = tuples[0].values[1].utf8().unwrap(); + let c_middle = tuples[0].values[2].utf8().unwrap(); + let c_last = tuples[0].values[3].utf8().unwrap(); + (c_balance, c_first, c_middle, c_last) + }; + // "SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = ? AND o_d_id = ? AND o_c_id = ? AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = ? AND o_d_id = ? AND o_c_id = ?)" + let (_, tuples) = tx.run(format!("SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = {} AND o_d_id = {} AND o_c_id = {} AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = {} AND o_d_id = {} AND o_c_id = {})", args.w_id, args.d_id, args.c_id, args.w_id, args.d_id, args.c_id))?; + if tuples.is_empty() { + return Err(TpccError::EmptyTuples); + } + let o_id = tuples[0].values[0].i32().unwrap(); + // let o_entry_d = tuples[0].values[1].datetime().unwrap(); + // let o_carrier_id = tuples[0].values[2].i32().unwrap(); + // "SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id = ?" + let (_, tuples) = tx.run(format!("SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = {} AND ol_d_id = {} AND ol_o_id = {}", args.w_id, args.d_id, o_id))?; + // let ol_i_id = tuples[0].values[0].i32(); + // let ol_supply_w_id = tuples[0].values[1].i16(); + // let ol_quantity = tuples[0].values[2].i8(); + // let ol_amount = tuples[0].values[3].decimal(); + // let ol_delivery_d = tuples[0].values[4].datetime(); + + Ok(()) + } +} + +impl TpccTest for OrderStatTest { + fn name(&self) -> &'static str { + "Order-Status" + } + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + _: &TpccArgs, + ) -> Result<(), TpccError> { + let w_id = rng.gen_range(0..num_ware) + 1; + let d_id = rng.gen_range(1..DIST_PER_WARE); + let c_id = nu_rand(rng, 1023, 1, CUST_PER_DIST); + let c_last = last_name(nu_rand(rng, 255, 0, 999)); + let by_name = rng.gen_range(1..100) <= 60; + + let args = OrderStatArgs::new(w_id, d_id, by_name, c_id, c_last); + OrderStat::run(tx, &args)?; + + Ok(()) + } +} diff --git a/tpcc/src/payment.rs b/tpcc/src/payment.rs new file mode 100644 index 00000000..4e6559ea --- /dev/null +++ b/tpcc/src/payment.rs @@ -0,0 +1,194 @@ +use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; +use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; +use chrono::Utc; +use fnck_sql::db::DBTransaction; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; +use rust_decimal::Decimal; + +#[derive(Debug)] +pub(crate) struct PaymentArgs { + w_id: usize, + d_id: usize, + by_name: bool, + c_w_id: usize, + c_d_id: usize, + c_id: usize, + c_last: String, + h_amount: Decimal, +} + +impl PaymentArgs { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + w_id: usize, + d_id: usize, + by_name: bool, + c_w_id: usize, + c_d_id: usize, + c_id: usize, + c_last: String, + h_amount: Decimal, + ) -> Self { + Self { + w_id, + d_id, + by_name, + c_w_id, + c_d_id, + c_id, + c_last, + h_amount, + } + } +} + +pub(crate) struct Payment; +pub(crate) struct PaymentTest; + +impl TpccTransaction for Payment { + type Args = PaymentArgs; + + #[allow(unused_variables)] + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + // "UPDATE warehouse SET w_ytd = w_ytd + ? WHERE w_id = ?" + let _ = tx.run(format!( + "UPDATE warehouse SET w_ytd = w_ytd + {} WHERE w_id = {}", + args.h_amount, args.w_id + ))?; + + // "SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = ?" + let (_, tuples) = tx.run(format!("SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = {}", args.w_id))?; + let w_street_1 = tuples[0].values[0].utf8().unwrap(); + let w_street_2 = tuples[0].values[1].utf8().unwrap(); + let w_city = tuples[0].values[2].utf8().unwrap(); + let w_state = tuples[0].values[3].utf8().unwrap(); + let w_zip = tuples[0].values[4].utf8().unwrap(); + let w_name = tuples[0].values[5].utf8().unwrap(); + + // "UPDATE district SET d_ytd = d_ytd + ? WHERE d_w_id = ? AND d_id = ?" + let _ = tx.run(format!( + "UPDATE district SET d_ytd = d_ytd + {} WHERE d_w_id = {} AND d_id = {}", + args.h_amount, args.w_id, args.d_id + ))?; + + // "SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = ? AND d_id = ?" + let (_, tuples) = tx.run(format!("SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = {} AND d_id = {}", args.w_id, args.d_id))?; + let d_street_1 = tuples[0].values[0].utf8().unwrap(); + let d_street_2 = tuples[0].values[1].utf8().unwrap(); + let d_city = tuples[0].values[2].utf8().unwrap(); + let d_state = tuples[0].values[3].utf8().unwrap(); + let d_zip = tuples[0].values[4].utf8().unwrap(); + let d_name = tuples[0].values[5].utf8().unwrap(); + + let mut c_id = args.c_id as i32; + if args.by_name { + // "SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" + let (_, tuples) = tx.run(format!("SELECT count(c_id) FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}'", args.c_w_id, args.c_d_id, args.c_last))?; + let mut name_cnt = tuples[0].values[0].i32().unwrap(); + + // "SELECT c_id FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ? ORDER BY c_first" + let (_, tuples) = tx.run(format!("SELECT c_id FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", args.c_w_id, args.c_d_id, args.c_last))?; + if name_cnt % 2 == 1 { + name_cnt += 1; + } + for n in 0..name_cnt / 2 { + c_id = tuples[n as usize].values[0].i32().unwrap(); + } + } + // "SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ? FOR UPDATE" + let (_, tuples) = tx.run(format!("SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.c_w_id, args.c_d_id, c_id))?; + let c_first = tuples[0].values[0].utf8().unwrap(); + let c_middle = tuples[0].values[1].utf8().unwrap(); + let c_last = tuples[0].values[2].utf8().unwrap(); + let c_street_1 = tuples[0].values[3].utf8().unwrap(); + let c_street_2 = tuples[0].values[4].utf8().unwrap(); + let c_city = tuples[0].values[5].utf8().unwrap(); + let c_state = tuples[0].values[6].utf8().unwrap(); + let c_zip = tuples[0].values[7].utf8().unwrap(); + let c_phone = tuples[0].values[8].utf8().unwrap(); + let c_credit = tuples[0].values[9].utf8(); + let c_credit_lim = tuples[0].values[10].i64().unwrap(); + let c_discount = tuples[0].values[11].decimal().unwrap(); + let mut c_balance = tuples[0].values[12].decimal().unwrap(); + let c_since = tuples[0].values[13].datetime().unwrap(); + + c_balance += args.h_amount; + if let Some(c_credit) = c_credit { + if c_credit.contains("BC") { + // "SELECT c_data FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let (_, tuples) = tx.run(format!( + "SELECT c_data FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", + args.c_w_id, args.c_d_id, c_id + ))?; + let c_data = tuples[0].values[0].utf8().unwrap(); + + // https://github.com/AgilData/tpcc/blob/dfbabe1e35cc93b2bf2e107fc699eb29c2097e24/src/main/java/com/codefutures/tpcc/Payment.java#L284 + // let c_new_data = format!("| {} {} {} {} {} {} {}", c_id, args.c_d_id, args.c_w_id, args.d_id, args.w_id, args.h_amount, ) + + // "UPDATE customer SET c_balance = ?, c_data = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let _ = tx.run(format!("UPDATE customer SET c_balance = {}, c_data = '{}' WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, c_data, args.c_w_id, args.c_d_id, c_id))?; + } else { + // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let _ = tx.run(format!("UPDATE customer SET c_balance = {} WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, args.c_w_id, args.c_d_id, c_id))?; + } + } else { + // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" + let _ = tx.run(format!("UPDATE customer SET c_balance = {} WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, args.c_w_id, args.c_d_id, c_id))?; + } + let h_data = format!("\\0{d_name} \\0"); + + let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + // "INSERT INTO history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES(?, ?, ?, ?, ?, ?, ?, ?)" + let _ = tx.run(format!("INSERT OVERWRITE history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES({}, {}, {}, {}, {}, '{}', {}, '{}')", args.c_d_id, args.c_w_id, c_id, args.d_id, args.w_id, now, args.h_amount, h_data))?; + + Ok(()) + } +} + +impl TpccTest for PaymentTest { + fn name(&self) -> &'static str { + "Payment" + } + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + _: &TpccArgs, + ) -> Result<(), TpccError> { + let w_id = rng.gen_range(0..num_ware) + 1; + let d_id = rng.gen_range(1..DIST_PER_WARE); + let c_id = nu_rand(rng, 1023, 1, CUST_PER_DIST); + let c_last = last_name(nu_rand(rng, 255, 0, 999)); + let h_amount = rng.gen_range(1..5000); + let by_name = rng.gen_range(1..100) < 60; + let (c_w_id, c_d_id) = if ALLOW_MULTI_WAREHOUSE_TX { + if rng.gen_range(1..100) < 85 { + (w_id, d_id) + } else { + ( + other_ware(rng, w_id, num_ware), + rng.gen_range(1..DIST_PER_WARE), + ) + } + } else { + (w_id, d_id) + }; + let args = PaymentArgs::new( + w_id, + d_id, + by_name, + c_w_id, + c_d_id, + c_id, + c_last, + Decimal::from(h_amount), + ); + Payment::run(tx, &args)?; + + Ok(()) + } +} diff --git a/tpcc/src/rt_hist.rs b/tpcc/src/rt_hist.rs new file mode 100644 index 00000000..145783d8 --- /dev/null +++ b/tpcc/src/rt_hist.rs @@ -0,0 +1,135 @@ +use ordered_float::OrderedFloat; +use std::time::Duration; + +pub(crate) const MAX_REC: usize = 20; +pub(crate) const REC_PER_SEC: usize = 1000; +pub(crate) const NUM_TRANSACTIONS: usize = 5; + +#[derive(Clone)] +pub(crate) struct RtHist { + total_hist: Vec>, + cur_hist: Vec>, + max_rt: [f64; 5], + cur_max_rt: [f64; 5], +} + +impl RtHist { + pub(crate) fn new() -> Self { + let total_hist = vec![vec![0; MAX_REC * REC_PER_SEC]; 5]; + let cur_hist = vec![vec![0; MAX_REC * REC_PER_SEC]; 5]; + + let max_rt = [0.0; 5]; + let cur_max_rt = [0.0; 5]; + + Self { + total_hist, + cur_hist, + max_rt, + cur_max_rt, + } + } + + // Increment matched one + pub fn hist_inc(&mut self, transaction: usize, rtclk: Duration) { + let i = (rtclk.as_secs_f64() * REC_PER_SEC as f64) as usize; + let i = if i >= (MAX_REC * REC_PER_SEC) { + (MAX_REC * REC_PER_SEC) - 1 + } else { + i + }; + + if rtclk.as_secs_f64() > self.cur_max_rt[transaction] { + self.cur_max_rt[transaction] = rtclk.as_secs_f64(); + } + + self.cur_hist[transaction][i] += 1; + } + + // Checkpoint and add to the total histogram + pub fn hist_ckp(&mut self, transaction: usize) -> f64 { + let mut total = 0; + let mut tmp = 0; + let mut line = MAX_REC * REC_PER_SEC; + let mut line_set = false; + + for i in 0..(MAX_REC * REC_PER_SEC) { + total += self.cur_hist[transaction][i]; + } + + for i in 0..(MAX_REC * REC_PER_SEC) { + tmp += self.cur_hist[transaction][i]; + self.total_hist[transaction][i] += self.cur_hist[transaction][i]; + self.cur_hist[transaction][i] = 0; + + if tmp >= total * 99 / 100 && !line_set { + line = i; + line_set = true; + } + } + for i in 0..5 { + self.max_rt[i] = *OrderedFloat(self.cur_max_rt[i]).max(OrderedFloat(self.max_rt[i])); + self.cur_max_rt[i] = 0.0; + } + + line as f64 / REC_PER_SEC as f64 + } + + // Report histograms + pub fn hist_report(&self) { + let mut total = [0; NUM_TRANSACTIONS]; + let mut tmp = [0; NUM_TRANSACTIONS]; + let mut line = [MAX_REC * REC_PER_SEC; NUM_TRANSACTIONS]; + + for j in 0..NUM_TRANSACTIONS { + for i in 0..(MAX_REC * REC_PER_SEC) { + total[j] += self.total_hist[j][i]; + } + + for i in (0..(MAX_REC * REC_PER_SEC)).rev() { + tmp[j] += self.total_hist[j][i]; + if tmp[j] * 10 <= total[j] { + line[j] = i; + } + } + } + + println!("\n"); + for j in 0..NUM_TRANSACTIONS { + match j { + 0 => println!("\n1.New-Order\n"), + 1 => println!("\n2.Payment\n"), + 2 => println!("\n3.Order-Status\n"), + 3 => println!("\n4.Delivery\n"), + 4 => println!("\n5.Stock-Level\n"), + _ => (), + } + + for i in 0..(MAX_REC * REC_PER_SEC) { + if i <= line[j] * 4 && self.total_hist[j][i] > 0 { + println!( + "{:.3}, {:6}", + (i + 1) as f64 / REC_PER_SEC as f64, + self.total_hist[j][i] + ); + } + } + } + + println!("\n<90th Percentile RT (MaxRT)>"); + for j in 0..NUM_TRANSACTIONS { + match j { + 0 => print!(" New-Order : "), + 1 => print!(" Payment : "), + 2 => print!("Order-Status : "), + 3 => print!(" Delivery : "), + 4 => print!(" Stock-Level : "), + _ => (), + } + println!( + "{:.3} ({:.3})", + line[j] as f64 / REC_PER_SEC as f64, + self.max_rt[j] + ); + } + } +} diff --git a/tpcc/src/slev.rs b/tpcc/src/slev.rs new file mode 100644 index 00000000..00b09df8 --- /dev/null +++ b/tpcc/src/slev.rs @@ -0,0 +1,69 @@ +use crate::load::DIST_PER_WARE; +use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; +use fnck_sql::db::DBTransaction; +use fnck_sql::storage::Storage; +use rand::prelude::ThreadRng; +use rand::Rng; + +#[derive(Debug)] +pub(crate) struct SlevArgs { + w_id: usize, + d_id: usize, + level: usize, +} + +impl SlevArgs { + pub(crate) fn new(w_id: usize, d_id: usize, level: usize) -> Self { + Self { w_id, d_id, level } + } +} + +pub(crate) struct Slev; +pub(crate) struct SlevTest; + +impl TpccTransaction for Slev { + type Args = SlevArgs; + + fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + // "SELECT d_next_o_id FROM district WHERE d_id = ? AND d_w_id = ?" + let (_, tuples) = tx.run(format!( + "SELECT d_next_o_id FROM district WHERE d_id = {} AND d_w_id = {}", + args.d_id, args.w_id + ))?; + let d_next_o_id = tuples[0].values[0].i32().unwrap(); + // "SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id < ? AND ol_o_id >= (? - 20)" + let (_, tuples) = tx.run(format!("SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = {} AND ol_d_id = {} AND ol_o_id < {} AND ol_o_id >= ({} - 20)", args.w_id, args.d_id, d_next_o_id, d_next_o_id))?; + let ol_i_id = tuples[0].values[0].i32().unwrap(); + // "SELECT count(*) FROM stock WHERE s_w_id = ? AND s_i_id = ? AND s_quantity < ?" + let (_, tuples) = tx.run(format!( + "SELECT count(*) FROM stock WHERE s_w_id = {} AND s_i_id = {} AND s_quantity < {}", + args.w_id, ol_i_id, args.level + ))?; + // let i_count = tuples[0].values[0].i32().unwrap(); + + Ok(()) + } +} + +impl TpccTest for SlevTest { + fn name(&self) -> &'static str { + "Stock-Level" + } + + fn do_transaction( + &self, + rng: &mut ThreadRng, + tx: &mut DBTransaction, + num_ware: usize, + _: &TpccArgs, + ) -> Result<(), TpccError> { + let w_id = rng.gen_range(0..num_ware) + 1; + let d_id = rng.gen_range(1..DIST_PER_WARE); + let level = rng.gen_range(10..20); + + let args = SlevArgs::new(w_id, d_id, level); + Slev::run(tx, &args)?; + + Ok(()) + } +}