From 874529a9055cfe2f6e3b4cfbeb8a2c28be770321 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Wed, 27 Nov 2024 01:01:49 +0800 Subject: [PATCH] Feat/prepare statment (#249) * feat: impl `PrepareStatement` * chore: TPCC use `PrepareStatement` on Test --- .gitignore | 1 + README.md | 2 +- src/binder/aggregate.rs | 26 +-- src/binder/create_table.rs | 3 + src/binder/expr.rs | 19 +- src/binder/insert.rs | 5 +- src/binder/mod.rs | 14 +- src/binder/select.rs | 1 + src/db.rs | 315 +++++++++++++++++++--------- src/errors.rs | 2 + src/execution/dml/copy_from_file.rs | 6 +- src/execution/dml/copy_to_file.rs | 10 +- src/execution/dql/values.rs | 15 +- src/optimizer/core/memo.rs | 11 +- src/storage/rocksdb.rs | 6 +- src/types/mod.rs | 18 +- src/types/value.rs | 41 +++- tests/slt/copy.slt | 4 +- tpcc/src/README.md | 36 ++-- tpcc/src/delivery.rs | 91 ++++++-- tpcc/src/main.rs | 63 +++++- tpcc/src/new_ord.rs | 152 ++++++++++---- tpcc/src/order_stat.rs | 63 +++++- tpcc/src/payment.rs | 136 +++++++++--- tpcc/src/slev.rs | 45 ++-- 25 files changed, 816 insertions(+), 269 deletions(-) diff --git a/.gitignore b/.gitignore index 8f4bce36..4e1a71d3 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,6 @@ fncksql_data fncksql_bench sqlite_bench fnck_sql_tpcc +copy.csv tests/data/row_20000.csv \ No newline at end of file diff --git a/README.md b/README.md index a7f77617..04ea8fff 100755 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ Order-Status : 0.492 (0.575) Delivery : 6.109 (6.473) Stock-Level : 0.001 (0.001) -89.9205557572134 Tpmc +98 Tpmc ``` #### PG Wire Service run `cargo run --features="net"` to start server diff --git a/src/binder/aggregate.rs b/src/binder/aggregate.rs index 3cd1e63a..9a5ccef8 100644 --- a/src/binder/aggregate.rs +++ b/src/binder/aggregate.rs @@ -41,11 +41,15 @@ impl Binder<'_, '_, T> { select_list: &mut [ScalarExpression], groupby: &[Expr], ) -> Result<(), DatabaseError> { - self.validate_groupby_illegal_column(select_list, groupby)?; + let mut group_by_exprs = Vec::with_capacity(groupby.len()); + for expr in groupby.iter() { + group_by_exprs.push(self.bind_expr(expr)?); + } + + self.validate_groupby_illegal_column(select_list, &group_by_exprs)?; - for gb in groupby { - let mut expr = self.bind_expr(gb)?; - self.visit_group_by_expr(select_list, &mut expr); + for expr in group_by_exprs.iter_mut() { + self.visit_group_by_expr(select_list, expr); } Ok(()) } @@ -213,33 +217,31 @@ impl Binder<'_, '_, T> { fn validate_groupby_illegal_column( &mut self, select_items: &[ScalarExpression], - groupby: &[Expr], + groupby: &[ScalarExpression], ) -> Result<(), DatabaseError> { let mut group_raw_exprs = vec![]; for expr in groupby { - let expr = self.bind_expr(expr)?; - if let ScalarExpression::Alias { alias, .. } = expr { let alias_expr = select_items.iter().find(|column| { if let ScalarExpression::Alias { alias: inner_alias, .. } = &column { - alias == *inner_alias + alias == inner_alias } else { false } }); if let Some(inner_expr) = alias_expr { - group_raw_exprs.push(inner_expr.clone()); + group_raw_exprs.push(inner_expr); } } else { group_raw_exprs.push(expr); } } let mut group_raw_set: HashSet<&ScalarExpression, RandomState> = - HashSet::from_iter(group_raw_exprs.iter()); + HashSet::from_iter(group_raw_exprs.iter().copied()); for expr in select_items { if expr.has_agg_call() { @@ -247,7 +249,7 @@ impl Binder<'_, '_, T> { } group_raw_set.remove(expr); - if !group_raw_exprs.iter().contains(expr) { + if !group_raw_exprs.iter().contains(&expr) { return Err(DatabaseError::AggMiss(format!( "`{}` must appear in the GROUP BY clause or be used in an aggregate function", expr @@ -257,7 +259,7 @@ impl Binder<'_, '_, T> { if !group_raw_set.is_empty() { return Err(DatabaseError::AggMiss( - "In the GROUP BY clause the field must be in the select clause".to_string(), + "in the GROUP BY clause the field must be in the select clause".to_string(), )); } diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 54695b6e..abdf0128 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -157,6 +157,7 @@ mod tests { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use sqlparser::ast::CharLengthUnits; + use std::cell::RefCell; use std::hash::RandomState; use std::sync::atomic::AtomicUsize; use tempfile::TempDir; @@ -172,6 +173,7 @@ mod tests { let table_functions = Default::default(); let sql = "create table t1 (id int primary key, name varchar(10) null)"; + let args = RefCell::new(Vec::new()); let mut binder = Binder::new( BinderContext::new( &table_cache, @@ -181,6 +183,7 @@ mod tests { &table_functions, Arc::new(AtomicUsize::new(0)), ), + &args, None, ); let stmt = crate::parser::parse_sql(sql).unwrap(); diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 983468af..c147ed76 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -5,7 +5,7 @@ use crate::expression::agg::AggKind; use itertools::Itertools; use sqlparser::ast::{ BinaryOperator, CharLengthUnits, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, - Query, UnaryOperator, + Query, UnaryOperator, Value, }; use std::collections::HashMap; use std::slice; @@ -48,7 +48,21 @@ impl<'a, T: Transaction> Binder<'a, '_, T> { } Expr::CompoundIdentifier(idents) => self.bind_column_ref_from_identifiers(idents, None), Expr::BinaryOp { left, right, op } => self.bind_binary_op_internal(left, right, op), - Expr::Value(v) => Ok(ScalarExpression::Constant(v.into())), + Expr::Value(v) => { + let value = if let Value::Placeholder(name) = v { + let (i, _) = self + .args + .borrow() + .iter() + .enumerate() + .find(|(_, (key, _))| key == name) + .ok_or_else(|| DatabaseError::ParametersNotFound(name.to_string()))?; + self.args.borrow_mut().remove(i).1 + } else { + v.into() + }; + Ok(ScalarExpression::Constant(value)) + } Expr::Function(func) => self.bind_function(func), Expr::Nested(expr) => self.bind_expr(expr), Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op), @@ -266,6 +280,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> { table_functions, temp_table_id.clone(), ), + self.args, Some(self), ); let mut sub_query = binder.bind_query(subquery)?; diff --git a/src/binder/insert.rs b/src/binder/insert.rs index babe2360..810037b0 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -72,14 +72,11 @@ impl Binder<'_, '_, T> { expression.constant_calculation()?; match expression { - ScalarExpression::Constant(mut value) => { + ScalarExpression::Constant(value) => { let ty = schema_ref[i].datatype(); // Check if the value length is too long value.check_len(ty)?; - if value.logical_type() != *ty { - value = value.cast(ty)?; - } row.push(value); } ScalarExpression::Empty => { diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 9114eefb..068d6f81 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -19,13 +19,14 @@ mod truncate; mod update; use sqlparser::ast::{Ident, ObjectName, ObjectType, SetExpr, Statement}; +use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use crate::catalog::view::View; use crate::catalog::{ColumnRef, TableCatalog, TableName}; -use crate::db::{ScalaFunctions, TableFunctions}; +use crate::db::{Args, ScalaFunctions, TableFunctions}; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinType; @@ -321,14 +322,20 @@ impl<'a, T: Transaction> BinderContext<'a, T> { pub struct Binder<'a, 'b, T: Transaction> { context: BinderContext<'a, T>, table_schema_buf: HashMap>, + args: &'a RefCell, pub(crate) parent: Option<&'b Binder<'a, 'b, T>>, } impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> { - pub fn new(context: BinderContext<'a, T>, parent: Option<&'b Binder<'a, 'b, T>>) -> Self { + pub fn new( + context: BinderContext<'a, T>, + args: &'a RefCell, + parent: Option<&'b Binder<'a, 'b, T>>, + ) -> Self { Binder { context, table_schema_buf: Default::default(), + args, parent, } } @@ -487,6 +494,7 @@ pub mod test { use crate::types::ColumnId; use crate::types::LogicalType::Integer; use crate::utils::lru::SharedLruCache; + use std::cell::RefCell; use std::hash::RandomState; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; @@ -505,6 +513,7 @@ pub mod test { let scala_functions = Default::default(); let table_functions = Default::default(); let transaction = self.storage.transaction()?; + let args = RefCell::new(Vec::new()); let mut binder = Binder::new( BinderContext::new( &self.table_cache, @@ -514,6 +523,7 @@ pub mod test { &table_functions, Arc::new(AtomicUsize::new(0)), ), + &args, None, ); let stmt = crate::parser::parse_sql(sql)?; diff --git a/src/binder/select.rs b/src/binder/select.rs index 6ea394be..eb23b3dd 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -573,6 +573,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> { table_functions, temp_table_id.clone(), ), + self.args, Some(self), ); let mut right = binder.bind_single_table_ref(relation, Some(join_type))?; diff --git a/src/db.rs b/src/db.rs index fe224845..79920274 100644 --- a/src/db.rs +++ b/src/db.rs @@ -18,12 +18,14 @@ use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksStorage; use crate::storage::{StatisticsMetaCache, Storage, TableCache, Transaction, ViewCache}; use crate::types::tuple::{SchemaRef, Tuple}; +use crate::types::value::DataValue; use crate::utils::lru::SharedLruCache; use ahash::HashMap; use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard}; use parking_lot::{RawRwLock, RwLock}; -use sqlparser::ast::Statement; +use std::cell::RefCell; use std::hash::RandomState; +use std::marker::PhantomData; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -31,6 +33,9 @@ use std::sync::Arc; pub(crate) type ScalaFunctions = HashMap>; pub(crate) type TableFunctions = HashMap>; +pub type Args = Vec<(&'static str, DataValue)>; +pub type Statement = sqlparser::ast::Statement; + #[allow(dead_code)] pub(crate) enum MetaDataLock { Read(ArcRwLockReadGuard), @@ -76,87 +81,55 @@ impl DataBaseBuilder { pub fn build(self) -> Result, DatabaseError> { let storage = RocksStorage::new(self.path)?; - let meta_cache = Arc::new(SharedLruCache::new(256, 8, RandomState::new())?); - let table_cache = Arc::new(SharedLruCache::new(48, 4, RandomState::new())?); - let view_cache = Arc::new(SharedLruCache::new(12, 4, RandomState::new())?); + let meta_cache = SharedLruCache::new(256, 8, RandomState::new())?; + let table_cache = SharedLruCache::new(48, 4, RandomState::new())?; + let view_cache = SharedLruCache::new(12, 4, RandomState::new())?; Ok(Database { storage, - scala_functions: Arc::new(self.scala_functions), - table_functions: Arc::new(self.table_functions), - mdl: Arc::new(RwLock::new(())), - meta_cache, - table_cache, - view_cache, + mdl: Default::default(), + state: Arc::new(State { + scala_functions: self.scala_functions, + table_functions: self.table_functions, + meta_cache, + table_cache, + view_cache, + _p: Default::default(), + }), }) } } -pub struct Database { - pub(crate) storage: S, - scala_functions: Arc, - table_functions: Arc, - mdl: Arc>, - pub(crate) meta_cache: Arc, - pub(crate) table_cache: Arc, - pub(crate) view_cache: Arc, +pub(crate) struct State { + scala_functions: ScalaFunctions, + table_functions: TableFunctions, + meta_cache: StatisticsMetaCache, + table_cache: TableCache, + view_cache: ViewCache, + _p: PhantomData, } -impl Database { - /// Run SQL queries. - pub fn run>(&self, sql: T) -> Result<(SchemaRef, Vec), DatabaseError> { - // parse - let stmts = parse_sql(sql)?; - if stmts.is_empty() { - return Err(DatabaseError::EmptyStatement); - } - let stmt = &stmts[0]; - let _guard = if matches!(command_type(stmt)?, CommandType::DDL) { - MetaDataLock::Write(self.mdl.write_arc()) - } else { - MetaDataLock::Read(self.mdl.read_arc()) - }; - let mut transaction = self.storage.transaction()?; - let mut plan = Self::build_plan( - stmt, - &self.table_cache, - &self.view_cache, - &self.meta_cache, - &transaction, - &self.scala_functions, - &self.table_functions, - )?; - - let schema = plan.output_schema().clone(); - let iterator = build_write( - plan, - (&self.table_cache, &self.view_cache, &self.meta_cache), - &mut transaction, - ); - let tuples = try_collect(iterator)?; - - transaction.commit()?; - - Ok((schema, tuples)) +impl State { + fn scala_functions(&self) -> &ScalaFunctions { + &self.scala_functions } - - pub fn new_transaction(&self) -> Result, DatabaseError> { - let guard = self.mdl.read_arc(); - let transaction = self.storage.transaction()?; - - Ok(DBTransaction { - inner: transaction, - scala_functions: self.scala_functions.clone(), - table_functions: self.table_functions.clone(), - _guard: guard, - meta_cache: self.meta_cache.clone(), - table_cache: self.table_cache.clone(), - view_cache: self.view_cache.clone(), - }) + fn table_functions(&self) -> &TableFunctions { + &self.table_functions + } + pub(crate) fn meta_cache(&self) -> &StatisticsMetaCache { + &self.meta_cache + } + pub(crate) fn table_cache(&self) -> &TableCache { + &self.table_cache + } + pub(crate) fn view_cache(&self) -> &ViewCache { + &self.view_cache } + #[allow(clippy::too_many_arguments)] pub(crate) fn build_plan( stmt: &Statement, + args: &RefCell, table_cache: &TableCache, view_cache: &ViewCache, meta_cache: &StatisticsMetaCache, @@ -173,6 +146,7 @@ impl Database { table_functions, Arc::new(AtomicUsize::new(0)), ), + args, None, ); /// Build a logical plan. @@ -271,48 +245,118 @@ impl Database { ImplementationRuleImpl::Truncate, ]) } + + fn prepare>(&self, sql: T) -> Result { + let mut stmts = parse_sql(sql)?; + stmts.pop().ok_or(DatabaseError::EmptyStatement) + } + + fn execute( + &self, + transaction: &mut S::TransactionType<'_>, + stmt: &Statement, + args: Args, + ) -> Result<(SchemaRef, Vec), DatabaseError> { + let args = RefCell::new(args); + + let mut plan = Self::build_plan( + stmt, + &args, + self.table_cache(), + self.view_cache(), + self.meta_cache(), + transaction, + self.scala_functions(), + self.table_functions(), + )?; + let schema = plan.output_schema().clone(); + let iterator = build_write( + plan, + (&self.table_cache, &self.view_cache, &self.meta_cache), + transaction, + ); + let tuples = try_collect(iterator)?; + + Ok((schema, tuples)) + } +} + +pub struct Database { + pub(crate) storage: S, + mdl: Arc>, + pub(crate) state: Arc>, +} + +impl Database { + /// Run SQL queries. + pub fn run>(&self, sql: T) -> Result<(SchemaRef, Vec), DatabaseError> { + let statement = self.prepare(sql)?; + + self.execute(&statement, vec![]) + } + + pub fn prepare>(&self, sql: T) -> Result { + self.state.prepare(sql) + } + + fn execute( + &self, + statement: &Statement, + args: Args, + ) -> Result<(SchemaRef, Vec), DatabaseError> { + let _guard = if matches!(command_type(statement)?, CommandType::DDL) { + MetaDataLock::Write(self.mdl.write_arc()) + } else { + MetaDataLock::Read(self.mdl.read_arc()) + }; + let mut transaction = self.storage.transaction()?; + let (schema, tuples) = self.state.execute(&mut transaction, statement, args)?; + transaction.commit()?; + + Ok((schema, tuples)) + } + + pub fn new_transaction(&self) -> Result, DatabaseError> { + let guard = self.mdl.read_arc(); + let transaction = self.storage.transaction()?; + let state = self.state.clone(); + + Ok(DBTransaction { + inner: transaction, + _guard: guard, + state, + }) + } } pub struct DBTransaction<'a, S: Storage + 'a> { inner: S::TransactionType<'a>, - scala_functions: Arc, - table_functions: Arc, _guard: ArcRwLockReadGuard, - pub(crate) meta_cache: Arc, - pub(crate) table_cache: Arc, - pub(crate) view_cache: Arc, + state: Arc>, } impl DBTransaction<'_, S> { pub fn run>(&mut self, sql: T) -> Result<(SchemaRef, Vec), DatabaseError> { - let stmts = parse_sql(sql)?; - if stmts.is_empty() { - return Err(DatabaseError::EmptyStatement); - } - let stmt = &stmts[0]; - if matches!(command_type(stmt)?, CommandType::DDL) { + let statement = self.state.prepare(sql)?; + + self.execute(&statement, vec![]) + } + + pub fn prepare>(&self, sql: T) -> Result { + self.state.prepare(sql) + } + + pub fn execute( + &mut self, + statement: &Statement, + args: Args, + ) -> Result<(SchemaRef, Vec), DatabaseError> { + if matches!(command_type(statement)?, CommandType::DDL) { return Err(DatabaseError::UnsupportedStmt( "`DDL` is not allowed to execute within a transaction".to_string(), )); } - let mut plan = Database::::build_plan( - stmt, - &self.table_cache, - &self.view_cache, - &self.meta_cache, - &self.inner, - &self.scala_functions, - &self.table_functions, - )?; - - let schema = plan.output_schema().clone(); - let executor = build_write( - plan, - (&self.table_cache, &self.view_cache, &self.meta_cache), - &mut self.inner, - ); - - Ok((schema, try_collect(executor)?)) + self.state.execute(&mut self.inner, statement, args) } pub fn commit(self) -> Result<(), DatabaseError> { @@ -367,7 +411,7 @@ pub(crate) mod test { let database = DataBaseBuilder::path(temp_dir.path()).build()?; let mut transaction = database.storage.transaction()?; - build_table(&database.table_cache, &mut transaction)?; + build_table(&database.state.table_cache(), &mut transaction)?; transaction.commit()?; let batch = database.run("select * from t1")?; @@ -437,6 +481,81 @@ pub(crate) mod test { Ok(()) } + #[test] + fn test_prepare_statment() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?; + + let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?; + let _ = fnck_sql.run("insert into t1 values(0, 0)")?; + let _ = fnck_sql.run("insert into t1 values(1, 1)")?; + let _ = fnck_sql.run("insert into t1 values(2, 2)")?; + + // Filter + { + let statement = fnck_sql.prepare("explain select * from t1 where b > ?1")?; + + let (_, tuples) = + fnck_sql.execute(&statement, vec![("?1", DataValue::Int32(Some(0)))])?; + + assert_eq!( + tuples[0].values[0].utf8().unwrap(), + "Projection [t1.a, t1.b] [Project] + Filter (t1.b > 0), Is Having: false [Filter] + TableScan t1 -> [a, b] [SeqScan]" + ) + } + // Aggregate + { + let statement = fnck_sql.prepare( + "explain select a + ?1, max(b + ?2) from t1 where b > ?3 group by a + ?4", + )?; + + let (_, tuples) = fnck_sql.execute( + &statement, + vec![ + ("?1", DataValue::Int32(Some(0))), + ("?2", DataValue::Int32(Some(0))), + ("?3", DataValue::Int32(Some(1))), + ("?4", DataValue::Int32(Some(0))), + ], + )?; + assert_eq!( + tuples[0].values[0].utf8().unwrap(), + "Projection [(t1.a + 0), Max((t1.b + 0))] [Project] + Aggregate [Max((t1.b + 0))] -> Group By [(t1.a + 0)] [HashAggregate] + Filter (t1.b > 1), Is Having: false [Filter] + TableScan t1 -> [a, b] [SeqScan]" + ) + } + { + let statement = fnck_sql.prepare("explain select *, ?1 from (select * from t1 where b > ?2) left join (select * from t1 where a > ?3) on a > ?4")?; + + let (_, tuples) = fnck_sql.execute( + &statement, + vec![ + ("?1", DataValue::Int32(Some(9))), + ("?2", DataValue::Int32(Some(0))), + ("?3", DataValue::Int32(Some(1))), + ("?4", DataValue::Int32(Some(0))), + ], + )?; + assert_eq!( + tuples[0].values[0].utf8().unwrap(), + "Projection [t1.a, t1.b, 9] [Project] + LeftOuter Join Where (t1.a > 0) [NestLoopJoin] + Projection [t1.a, t1.b] [Project] + Filter (t1.b > 0), Is Having: false [Filter] + TableScan t1 -> [a, b] [SeqScan] + Projection [t1.a, t1.b] [Project] + Filter (t1.a > 1), Is Having: false [Filter] + TableScan t1 -> [a, b] [SeqScan]" + ) + } + + Ok(()) + } + #[test] fn test_transaction_sql() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); diff --git a/src/errors.rs b/src/errors.rs index f5d7e753..1ae98579 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -81,6 +81,8 @@ pub enum DatabaseError { MisMatch(&'static str, &'static str), #[error("add column must be nullable or specify a default value")] NeedNullAbleOrDefault, + #[error("parameter: {0} not found")] + ParametersNotFound(String), #[error("no transaction begin")] NoTransactionBegin, #[error("cannot be Null")] diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index b5fc68d7..0e308c45 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -202,7 +202,11 @@ mod tests { let mut transaction = storage.transaction()?; let mut coroutine = executor.execute_mut( - (&db.table_cache, &db.view_cache, &db.meta_cache), + ( + db.state.table_cache(), + db.state.view_cache(), + db.state.meta_cache(), + ), &mut transaction, ); let tuple = match Pin::new(&mut coroutine).resume(()) { diff --git a/src/execution/dml/copy_to_file.rs b/src/execution/dml/copy_to_file.rs index 412ef0b4..c5c100b6 100644 --- a/src/execution/dml/copy_to_file.rs +++ b/src/execution/dml/copy_to_file.rs @@ -49,10 +49,10 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for CopyToFile { .map(|v| v.to_string()) .collect::>() ) - .map_err(|e| DatabaseError::from(e))); + .map_err(DatabaseError::from)); } - throw!(writer.flush().map_err(|e| DatabaseError::from(e))); + throw!(writer.flush().map_err(DatabaseError::from)); yield Ok(TupleBuilder::build_result(format!("{}", self.op))); }, @@ -183,7 +183,11 @@ mod tests { let executor = CopyToFile { op: op.clone() }; let mut coroutine = executor.execute( - (&db.table_cache, &db.view_cache, &db.meta_cache), + ( + db.state.table_cache(), + db.state.view_cache(), + db.state.meta_cache(), + ), &mut transaction, ); diff --git a/src/execution/dql/values.rs b/src/execution/dql/values.rs index 84a73dd8..c7a3a8d3 100644 --- a/src/execution/dql/values.rs +++ b/src/execution/dql/values.rs @@ -1,7 +1,10 @@ use crate::execution::{Executor, ReadExecutor}; use crate::planner::operator::values::ValuesOperator; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; +use crate::throw; use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use std::mem; pub struct Values { op: ValuesOperator, @@ -22,9 +25,17 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Values { Box::new( #[coroutine] move || { - let ValuesOperator { rows, .. } = self.op; + let ValuesOperator { rows, schema_ref } = self.op; + + for mut values in rows { + for (i, value) in values.iter_mut().enumerate() { + let ty = schema_ref[i].datatype().clone(); + + if value.logical_type() != ty { + *value = throw!(mem::replace(value, DataValue::Null).cast(&ty)); + } + } - for values in rows { yield Ok(Tuple { id: None, values }); } }, diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index 36321169..bbeb6b6a 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -98,6 +98,7 @@ mod tests { use crate::types::value::DataValue; use crate::types::LogicalType; use petgraph::stable_graph::NodeIndex; + use std::cell::RefCell; use std::ops::Bound; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -118,22 +119,24 @@ mod tests { let transaction = database.storage.transaction()?; let c1_column_id = { transaction - .table(&database.table_cache, Arc::new("t1".to_string()))? + .table(database.state.table_cache(), Arc::new("t1".to_string()))? .unwrap() .get_column_id_by_name("c1") .unwrap() }; let scala_functions = Default::default(); let table_functions = Default::default(); + let args = RefCell::new(Vec::new()); let mut binder = Binder::new( BinderContext::new( - &database.table_cache, - &database.view_cache, + database.state.table_cache(), + database.state.view_cache(), &transaction, &scala_functions, &table_functions, Arc::new(AtomicUsize::new(0)), ), + &args, None, ); // where: c1 => 2, (40, +inf) @@ -167,7 +170,7 @@ mod tests { let memo = Memo::new( &graph, - &transaction.meta_loader(&database.meta_cache), + &transaction.meta_loader(database.state.meta_cache()), &rules, )?; let best_plan = graph.into_plan(Some(&memo)); diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 8d606a90..59a71e6c 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -234,7 +234,7 @@ mod test { let table_name = Arc::new("t1".to_string()); let table = transaction - .table(&fnck_sql.table_cache, table_name.clone())? + .table(fnck_sql.state.table_cache(), table_name.clone())? .unwrap() .clone(); let a_column_id = table.get_column_id_by_name("a").unwrap(); @@ -294,13 +294,13 @@ mod test { let transaction = fnck_sql.storage.transaction().unwrap(); let table = transaction - .table(&fnck_sql.table_cache, Arc::new("t1".to_string()))? + .table(fnck_sql.state.table_cache(), Arc::new("t1".to_string()))? .unwrap() .clone(); let columns = table.columns().cloned().enumerate().collect_vec(); let mut iter = transaction .read_by_index( - &fnck_sql.table_cache, + fnck_sql.state.table_cache(), Arc::new("t1".to_string()), (Some(0), Some(1)), columns, diff --git a/src/types/mod.rs b/src/types/mod.rs index 803d6e66..d23630d8 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -260,6 +260,20 @@ impl LogicalType { (LogicalType::Integer, _) | (_, LogicalType::UInteger) => Ok(LogicalType::Bigint), (LogicalType::Smallint, _) | (_, LogicalType::USmallint) => Ok(LogicalType::Integer), (LogicalType::Tinyint, _) | (_, LogicalType::UTinyint) => Ok(LogicalType::Smallint), + ( + LogicalType::Decimal(precision_0, scale_0), + LogicalType::Decimal(precision_1, scale_1), + ) => { + let fn_option = |num_0: &Option, num_1: &Option| match (num_0, num_1) { + (Some(num_0), Some(num_1)) => Some(*cmp::max(num_0, num_1)), + (Some(num), None) | (None, Some(num)) => Some(*num), + (None, None) => None, + }; + Ok(LogicalType::Decimal( + fn_option(precision_0, precision_1), + fn_option(scale_0, scale_1), + )) + } _ => Err(DatabaseError::Incomparable(left.clone(), right.clone())), } } @@ -440,7 +454,9 @@ impl TryFrom for LogicalType { } } } - other => Err(DatabaseError::UnsupportedStmt(other.to_string())), + other => Err(DatabaseError::UnsupportedStmt(format!( + "unsupported data type: {other}" + ))), } } } diff --git a/src/types/value.rs b/src/types/value.rs index 01e2eb89..71733260 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -1496,6 +1496,7 @@ impl_scalar!(u8, UInt8); impl_scalar!(u16, UInt16); impl_scalar!(u32, UInt32); impl_scalar!(u64, UInt64); +impl_scalar!(Decimal, Decimal); impl From for DataValue { fn from(value: String) -> Self { @@ -1517,6 +1518,42 @@ impl From> for DataValue { } } +impl From<&NaiveDate> for DataValue { + fn from(value: &NaiveDate) -> Self { + DataValue::Date32(Some(value.num_days_from_ce())) + } +} + +impl From> for DataValue { + fn from(value: Option<&NaiveDate>) -> Self { + DataValue::Date32(value.map(|d| d.num_days_from_ce())) + } +} + +impl From<&NaiveDateTime> for DataValue { + fn from(value: &NaiveDateTime) -> Self { + DataValue::Date64(Some(value.and_utc().timestamp())) + } +} + +impl From> for DataValue { + fn from(value: Option<&NaiveDateTime>) -> Self { + DataValue::Date64(value.map(|d| d.and_utc().timestamp())) + } +} + +impl From<&NaiveTime> for DataValue { + fn from(value: &NaiveTime) -> Self { + DataValue::Time(Some(value.num_seconds_from_midnight())) + } +} + +impl From> for DataValue { + fn from(value: Option<&NaiveTime>) -> Self { + DataValue::Time(value.map(|d| d.num_seconds_from_midnight())) + } +} + impl From<&sqlparser::ast::Value> for DataValue { fn from(v: &sqlparser::ast::Value) -> Self { match v { @@ -1534,8 +1571,8 @@ impl From<&sqlparser::ast::Value> for DataValue { panic!("unsupported number {:?}", n) } } - sqlparser::ast::Value::SingleQuotedString(s) => s.clone().into(), - sqlparser::ast::Value::DoubleQuotedString(s) => s.clone().into(), + sqlparser::ast::Value::SingleQuotedString(s) + | sqlparser::ast::Value::DoubleQuotedString(s) => s.clone().into(), sqlparser::ast::Value::Boolean(b) => (*b).into(), sqlparser::ast::Value::Null => Self::Null, _ => todo!("unsupported parsed scalar value {:?}", v), diff --git a/tests/slt/copy.slt b/tests/slt/copy.slt index 87889fbb..5f6c73a1 100644 --- a/tests/slt/copy.slt +++ b/tests/slt/copy.slt @@ -14,6 +14,6 @@ SELECT * FROM test_copy 1 2.5 two query I -COPY test_copy TO '/tmp/copy.csv' ( DELIMITER ',' ); +COPY test_copy TO './copy.csv' ( DELIMITER ',' ); ---- -Copy test_copy -> /tmp/copy.csv [a, b, c] \ No newline at end of file +Copy test_copy -> ./copy.csv [a, b, c] \ No newline at end of file diff --git a/tpcc/src/README.md b/tpcc/src/README.md index f0afa6f3..df4a4313 100644 --- a/tpcc/src/README.md +++ b/tpcc/src/README.md @@ -6,12 +6,12 @@ run `cargo run -p tpcc --release` to run tpcc - YMTC PC411-1024GB-B - Tips: TPCC currently only supports single thread ```shell -|New-Order| sc: 1084 lt: 0 fl: 11 -|Payment| sc: 1062 lt: 0 fl: 0 -|Order-Status| sc: 102 lt: 4 fl: 36 -|Delivery| sc: 107 lt: 0 fl: 0 -|Stock-Level| sc: 106 lt: 0 fl: 0 -in 723 sec. +|New-Order| sc: 1182 lt: 0 fl: 13 +|Payment| sc: 1155 lt: 0 fl: 0 +|Order-Status| sc: 115 lt: 1 fl: 29 +|Delivery| sc: 114 lt: 2 fl: 0 +|Stock-Level| sc: 115 lt: 0 fl: 0 +in 720 sec. (all must be [OK]) [transaction percentage] Payment: 43.0% (>=43.0%) [Ok] @@ -21,23 +21,23 @@ in 723 sec. [response time (at least 90%% passed)] New-Order: 100.0 [OK] Payment: 100.0 [OK] - Order-Status: 96.2 [OK] - Delivery: 100.0 [OK] + Order-Status: 99.1 [OK] + Delivery: 98.3 [OK] Stock-Level: 100.0 [OK] - New-Order Total: 1084 - Payment Total: 1062 - Order-Status Total: 106 - Delivery Total: 107 - Stock-Level Total: 106 + New-Order Total: 1182 + Payment Total: 1155 + Order-Status Total: 116 + Delivery Total: 116 + Stock-Level Total: 115 <90th Percentile RT (MaxRT)> - New-Order : 0.005 (0.007) - Payment : 0.084 (0.141) -Order-Status : 0.492 (0.575) - Delivery : 6.109 (6.473) + New-Order : 0.003 (0.011) + Payment : 0.078 (0.470) +Order-Status : 0.227 (0.240) + Delivery : 5.439 (27.702) Stock-Level : 0.001 (0.001) -89.9205557572134 Tpmc +98 Tpmc ``` ## Refer to diff --git a/tpcc/src/delivery.rs b/tpcc/src/delivery.rs index 25f32ccb..50ae0751 100644 --- a/tpcc/src/delivery.rs +++ b/tpcc/src/delivery.rs @@ -1,8 +1,9 @@ use crate::load::DIST_PER_WARE; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; use chrono::Utc; -use fnck_sql::db::DBTransaction; +use fnck_sql::db::{DBTransaction, Statement}; use fnck_sql::storage::Storage; +use fnck_sql::types::value::DataValue; use rand::prelude::ThreadRng; use rand::Rng; @@ -24,37 +25,86 @@ pub(crate) struct DeliveryTest; impl TpccTransaction for Delivery { type Args = DeliveryArgs; - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { - let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError> { + let now = Utc::now().naive_utc(); for d_id in 1..DIST_PER_WARE + 1 { // "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?" - let (_, tuple) = tx.run(format!("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = {} AND no_w_id = {}", d_id, args.w_id))?; - let no_o_id = tuple[0].values[0].i32().unwrap(); + let (_, tuples) = tx.execute( + &statements[0], + vec![ + ("?1", DataValue::Int8(Some(d_id as i8))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; + let no_o_id = tuples[0].values[0].i32().unwrap(); if no_o_id == 0 { continue; } // "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?" - let _ = tx.run(format!( - "DELETE FROM new_orders WHERE no_o_id = {} AND no_d_id = {} AND no_w_id = {}", - no_o_id, d_id, args.w_id - ))?; + let (_, tuples) = tx.execute( + &statements[1], + vec![ + ("?1", DataValue::Int32(Some(no_o_id))), + ("?2", DataValue::Int8(Some(d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; // "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" - let (_, tuple) = tx.run(format!( - "SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", - no_o_id, d_id, args.w_id - ))?; - let c_id = tuple[0].values[0].i32().unwrap(); + let (_, tuples) = tx.execute( + &statements[2], + vec![ + ("?1", DataValue::Int32(Some(no_o_id))), + ("?2", DataValue::Int8(Some(d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; + let c_id = tuples[0].values[0].i32().unwrap(); // "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" - let _ = tx.run(format!("UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", args.o_carrier_id, no_o_id, d_id, args.w_id))?; + let (_, tuples) = tx.execute( + &statements[3], + vec![ + ("?1", DataValue::Int8(Some(args.o_carrier_id as i8))), + ("?2", DataValue::Int32(Some(no_o_id))), + ("?3", DataValue::Int8(Some(d_id as i8))), + ("?4", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; // "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" - let _ = tx.run(format!("UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", now, no_o_id, d_id, args.w_id))?; + let (_, tuples) = tx.execute( + &statements[4], + vec![ + ("?1", DataValue::from(&now)), + ("?2", DataValue::Int32(Some(no_o_id))), + ("?3", DataValue::Int8(Some(d_id as i8))), + ("?4", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; // "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" - let (_, tuple) = tx.run(format!("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", no_o_id, d_id, args.w_id))?; - let ol_total = tuple[0].values[0].decimal().unwrap(); + let (_, tuples) = tx.execute( + &statements[5], + vec![ + ("?1", DataValue::Int32(Some(no_o_id))), + ("?2", DataValue::Int8(Some(d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; + let ol_total = tuples[0].values[0].decimal().unwrap(); // "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?" - let _ = tx.run(format!("UPDATE customer SET c_balance = c_balance + {} , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", ol_total, c_id, d_id, args.w_id))?; + let (_, tuples) = tx.execute( + &statements[6], + vec![ + ("?1", DataValue::Decimal(Some(ol_total))), + ("?2", DataValue::Int32(Some(c_id))), + ("?3", DataValue::Int8(Some(d_id as i8))), + ("?4", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; } Ok(()) @@ -72,12 +122,13 @@ impl TpccTest for DeliveryTest { tx: &mut DBTransaction, num_ware: usize, _: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let o_carrier_id = rng.gen_range(1..10); let args = DeliveryArgs::new(w_id, o_carrier_id); - Delivery::run(tx, &args)?; + Delivery::run(tx, &args, statements)?; Ok(()) } diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs index 637131d5..87282036 100644 --- a/tpcc/src/main.rs +++ b/tpcc/src/main.rs @@ -7,7 +7,7 @@ use crate::rt_hist::RtHist; use crate::slev::SlevTest; use crate::utils::SeqGen; use clap::Parser; -use fnck_sql::db::{DBTransaction, DataBaseBuilder}; +use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement}; use fnck_sql::errors::DatabaseError; use fnck_sql::storage::Storage; use rand::prelude::ThreadRng; @@ -35,7 +35,11 @@ pub(crate) const RT_LIMITS: [Duration; 5] = [ pub(crate) trait TpccTransaction { type Args; - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError>; + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError>; } pub(crate) trait TpccTest { @@ -47,6 +51,7 @@ pub(crate) trait TpccTest { tx: &mut DBTransaction, num_ware: usize, args: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError>; } @@ -81,6 +86,56 @@ fn main() -> Result<(), TpccError> { Load::load_custs(&mut rng, &database, args.num_ware)?; Load::load_ord(&mut rng, &database, args.num_ware)?; + let test_statements = vec![ + vec![ + database.prepare("SELECT c.c_discount, c.c_last, c.c_credit, w.w_tax FROM customer AS c JOIN warehouse AS w ON c.c_w_id = w_id AND w.w_id = ?1 AND c.c_w_id = ?2 AND c.c_d_id = ?3 AND c.c_id = ?4")?, + database.prepare("SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?, + database.prepare("SELECT w_tax FROM warehouse WHERE w_id = ?1")?, + database.prepare("SELECT d_next_o_id, d_tax FROM district WHERE d_id = ?1 AND d_w_id = ?2")?, + database.prepare("UPDATE district SET d_next_o_id = ?1 + 1 WHERE d_id = ?2 AND d_w_id = ?3")?, + database.prepare("INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)")?, + database.prepare("INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES (?1,?2,?3)")?, + database.prepare("SELECT i_price, i_name, i_data FROM item WHERE i_id = ?1")?, + database.prepare("SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = ?1 AND s_w_id = ?2")?, + database.prepare("UPDATE stock SET s_quantity = ?1 WHERE s_i_id = ?2 AND s_w_id = ?3")?, + database.prepare("INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)")?, + ], + vec![ + database.prepare("UPDATE warehouse SET w_ytd = w_ytd + ?1 WHERE w_id = ?2")?, + database.prepare("SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = ?1")?, + database.prepare("UPDATE district SET d_ytd = d_ytd + ?1 WHERE d_w_id = ?2 AND d_id = ?3")?, + database.prepare("SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = ?1 AND d_id = ?2")?, + database.prepare("SELECT count(c_id) FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3")?, + database.prepare("SELECT c_id FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3 ORDER BY c_first")?, + database.prepare("SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?, + database.prepare("SELECT c_data FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?, + database.prepare("UPDATE customer SET c_balance = ?1, c_data = ?2 WHERE c_w_id = ?3 AND c_d_id = ?4 AND c_id = ?5")?, + database.prepare("UPDATE customer SET c_balance = ?1 WHERE c_w_id = ?2 AND c_d_id = ?3 AND c_id = ?4")?, + database.prepare("INSERT OVERWRITE history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?, + ], + vec![ + database.prepare("SELECT count(c_id) FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3")?, + database.prepare("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3 ORDER BY c_first")?, + database.prepare("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?, + database.prepare("SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = ?1 AND o_d_id = ?2 AND o_c_id = ?3 AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = ?4 AND o_d_id = ?5 AND o_c_id = ?6)")?, + database.prepare("SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 AND ol_o_id = ?3")? + ], + vec![ + database.prepare("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ?1 AND no_w_id = ?2")?, + database.prepare("DELETE FROM new_orders WHERE no_o_id = ?1 AND no_d_id = ?2 AND no_w_id = ?3")?, + database.prepare("SELECT o_c_id FROM orders WHERE o_id = ?1 AND o_d_id = ?2 AND o_w_id = ?3")?, + database.prepare("UPDATE orders SET o_carrier_id = ?1 WHERE o_id = ?2 AND o_d_id = ?3 AND o_w_id = ?4")?, + database.prepare("UPDATE order_line SET ol_delivery_d = ?1 WHERE ol_o_id = ?2 AND ol_d_id = ?3 AND ol_w_id = ?4")?, + database.prepare("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ?1 AND ol_d_id = ?2 AND ol_w_id = ?3")?, + database.prepare("UPDATE customer SET c_balance = c_balance + ?1 , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ?2 AND c_d_id = ?3 AND c_w_id = ?4")?, + ], + vec![ + database.prepare("SELECT d_next_o_id FROM district WHERE d_id = ?1 AND d_w_id = ?2")?, + database.prepare("SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 AND ol_o_id < ?3 AND ol_o_id >= (?4 - 20)")?, + database.prepare("SELECT count(*) FROM stock WHERE s_w_id = ?1 AND s_i_id = ?2 AND s_quantity < ?3")?, + ], + ]; + let mut rt_hist = RtHist::new(); let mut success = [0usize; 5]; let mut late = [0usize; 5]; @@ -102,13 +157,15 @@ fn main() -> Result<(), TpccError> { while tpcc_start.elapsed() < duration { let i = seq_gen.get(); let tpcc_test = &tests[i]; + let statement = &test_statements[i]; let mut is_succeed = false; for j in 0..args.max_retry + 1 { let transaction_start = Instant::now(); let mut tx = database.new_transaction()?; - if let Err(err) = tpcc_test.do_transaction(&mut rng, &mut tx, args.num_ware, &tpcc_args) + if let Err(err) = + tpcc_test.do_transaction(&mut rng, &mut tx, args.num_ware, &tpcc_args, &statement) { failure[i] += 1; eprintln!( diff --git a/tpcc/src/new_ord.rs b/tpcc/src/new_ord.rs index 6ba87f44..3b1fea55 100644 --- a/tpcc/src/new_ord.rs +++ b/tpcc/src/new_ord.rs @@ -1,8 +1,9 @@ use crate::load::{nu_rand, CUST_PER_DIST, DIST_PER_WARE, MAX_ITEMS, MAX_NUM_ITEMS}; use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; use chrono::Utc; -use fnck_sql::db::DBTransaction; +use fnck_sql::db::{DBTransaction, Statement}; use fnck_sql::storage::Storage; +use fnck_sql::types::value::DataValue; use rand::prelude::ThreadRng; use rand::Rng; use rust_decimal::Decimal; @@ -53,58 +54,99 @@ pub(crate) struct NewOrdTest; impl TpccTransaction for NewOrd { type Args = NewOrdArgs; - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError> { let mut price = vec![Decimal::default(); MAX_NUM_ITEMS]; let mut iname = vec![String::new(); MAX_NUM_ITEMS]; let mut stock = vec![0; MAX_NUM_ITEMS]; let mut bg = vec![String::new(); MAX_NUM_ITEMS]; let mut amt = vec![Decimal::default(); MAX_NUM_ITEMS]; - let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + let now = Utc::now().naive_utc(); let (c_discount, c_last, c_credit, w_tax) = if args.joins { // "SELECT c_discount, c_last, c_credit, w_tax FROM customer, warehouse WHERE w_id = ? AND c_w_id = w_id AND c_d_id = ? AND c_id = ?" - let (_, tuple) = tx.run(format!("SELECT c.c_discount, c.c_last, c.c_credit, w.w_tax FROM customer AS c JOIN warehouse AS w ON c.c_w_id = w_id AND w.w_id = {} AND c.c_w_id = {} AND c.c_d_id = {} AND c.c_id = {}", args.w_id, args.w_id, args.d_id, args.c_id))?; - let c_discount = tuple[0].values[0].decimal().unwrap(); - let c_last = tuple[0].values[1].utf8().unwrap(); - let c_credit = tuple[0].values[2].utf8().unwrap(); - let w_tax = tuple[0].values[3].decimal().unwrap(); + let (_, tuples) = tx.execute( + &statements[0], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ("?3", DataValue::Int8(Some(args.d_id as i8))), + ("?4", DataValue::Int64(Some(args.c_id as i64))), + ], + )?; + let c_discount = tuples[0].values[0].decimal().unwrap(); + let c_last = tuples[0].values[1].utf8().unwrap(); + let c_credit = tuples[0].values[2].utf8().unwrap(); + let w_tax = tuples[0].values[3].decimal().unwrap(); (c_discount, c_last, c_credit, w_tax) } else { // "SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let (_, tuple) = tx.run(format!("SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.w_id, args.d_id, args.c_id))?; - let c_discount = tuple[0].values[0].decimal().unwrap(); - let c_last = tuple[0].values[1].utf8().unwrap(); - let c_credit = tuple[0].values[2].utf8().unwrap(); + let (_, tuples) = tx.execute( + &statements[1], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int32(Some(args.c_id as i32))), + ], + )?; + let c_discount = tuples[0].values[0].decimal().unwrap(); + let c_last = tuples[0].values[1].utf8().unwrap(); + let c_credit = tuples[0].values[2].utf8().unwrap(); // "SELECT w_tax FROM warehouse WHERE w_id = ?" - let (_, tuple) = tx.run(format!( - "SELECT w_tax FROM warehouse WHERE w_id = {}", - args.w_id - ))?; - let w_tax = tuple[0].values[0].decimal().unwrap(); + let (_, tuples) = tx.execute( + &statements[2], + vec![("?1", DataValue::Int16(Some(args.w_id as i16)))], + )?; + let w_tax = tuples[0].values[0].decimal().unwrap(); (c_discount, c_last, c_credit, w_tax) }; // "SELECT d_next_o_id, d_tax FROM district WHERE d_id = ? AND d_w_id = ? FOR UPDATE" - let (_, tuple) = tx.run(format!( - "SELECT d_next_o_id, d_tax FROM district WHERE d_id = {} AND d_w_id = {}", - args.d_id, args.w_id - ))?; - let d_next_o_id = tuple[0].values[0].i32().unwrap(); - let d_tax = tuple[0].values[1].decimal().unwrap(); + let (_, tuples) = tx.execute( + &statements[3], + vec![ + ("?1", DataValue::Int8(Some(args.d_id as i8))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; + let d_next_o_id = tuples[0].values[0].i32().unwrap(); + let d_tax = tuples[0].values[1].decimal().unwrap(); // "UPDATE district SET d_next_o_id = ? + 1 WHERE d_id = ? AND d_w_id = ?" - let _ = tx.run(format!( - "UPDATE district SET d_next_o_id = {} + 1 WHERE d_id = {} AND d_w_id = {}", - d_next_o_id, args.d_id, args.w_id - ))?; + let (_, tuples) = tx.execute( + &statements[4], + vec![ + ("?1", DataValue::Int32(Some(d_next_o_id))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; let o_id = d_next_o_id; // "INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES(?, ?, ?, ?, ?, ?, ?)" - let _ = tx.run(format!("INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES({}, {}, {}, {}, '{}', {}, {})", o_id, args.d_id, args.w_id, args.c_id, now, args.o_ol_cnt, args.o_all_local))?; + let (_, tuples) = tx.execute( + &statements[5], + vec![ + ("?1", DataValue::Int32(Some(o_id))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ("?4", DataValue::Int32(Some(args.c_id as i32))), + ("?5", DataValue::from(&now)), + ("?6", DataValue::Int8(Some(args.o_ol_cnt as i8))), + ("?7", DataValue::Int8(Some(args.o_all_local as i8))), + ], + )?; // "INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES (?,?,?) - let _ = tx.run(format!( - "INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES ({},{},{})", - o_id, args.d_id, args.w_id - ))?; + let (_, tuples) = tx.execute( + &statements[6], + vec![ + ("?1", DataValue::Int32(Some(o_id))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; let mut ol_num_seq = vec![0; MAX_NUM_ITEMS]; for i in 0..args.o_ol_cnt { @@ -133,10 +175,10 @@ impl TpccTransaction for NewOrd { let ol_i_id = args.item_id[ol_num_seq[ol_number - 1]]; let ol_quantity = args.qty[ol_num_seq[ol_number - 1]]; // "SELECT i_price, i_name, i_data FROM item WHERE i_id = ?" - let (_, tuples) = tx.run(format!( - "SELECT i_price, i_name, i_data FROM item WHERE i_id = {}", - ol_i_id - ))?; + let (_, tuples) = tx.execute( + &statements[7], + vec![("?1", DataValue::Int32(Some(ol_i_id as i32)))], + )?; if tuples.is_empty() { return Err(TpccError::EmptyTuples); } @@ -148,7 +190,13 @@ impl TpccTransaction for NewOrd { iname[ol_num_seq[ol_number - 1]] = i_name; // "SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = ? AND s_w_id = ? FOR UPDATE" - let (_, tuples) = tx.run(format!("SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = {} AND s_w_id = {}", ol_i_id, ol_supply_w_id))?; + let (_, tuples) = tx.execute( + &statements[8], + vec![ + ("?1", DataValue::Int32(Some(ol_i_id as i32))), + ("?2", DataValue::Int16(Some(ol_supply_w_id as i16))), + ], + )?; let mut s_quantity = tuples[0].values[0].i16().unwrap(); let s_data = tuples[0].values[1].utf8().unwrap(); let s_dist_01 = tuples[0].values[2].utf8().unwrap(); @@ -180,10 +228,14 @@ impl TpccTransaction for NewOrd { s_quantity - ol_quantity as i16 + 91 }; // "UPDATE stock SET s_quantity = ? WHERE s_i_id = ? AND s_w_id = ?" - let _ = tx.run(format!( - "UPDATE stock SET s_quantity = {} WHERE s_i_id = {} AND s_w_id = {}", - s_quantity, ol_i_id, ol_supply_w_id - ))?; + let (_, tuples) = tx.execute( + &statements[9], + vec![ + ("?1", DataValue::Int16(Some(s_quantity))), + ("?2", DataValue::Int32(Some(ol_i_id as i32))), + ("?3", DataValue::Int16(Some(ol_supply_w_id as i16))), + ], + )?; // Tips: Integers always have 7 digits, so divide by 10 here let mut ol_amount = Decimal::from(ol_quantity) @@ -196,7 +248,20 @@ impl TpccTransaction for NewOrd { amt[ol_num_seq[ol_number - 1]] = ol_amount; // "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - let _ = tx.run(format!("INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, '{}')", o_id, args.d_id, args.w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info))?; + let (_, tuples) = tx.execute( + &statements[10], + vec![ + ("?1", DataValue::Int32(Some(o_id))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int16(Some(args.w_id as i16))), + ("?4", DataValue::Int8(Some(ol_number as i8))), + ("?5", DataValue::Int32(Some(ol_i_id as i32))), + ("?6", DataValue::Int16(Some(ol_supply_w_id as i16))), + ("?7", DataValue::Int8(Some(ol_quantity as i8))), + ("?8", DataValue::Decimal(Some(ol_amount.round_dp(2)))), + ("?9", DataValue::from(ol_dist_info)), + ], + )?; } Ok(()) @@ -214,6 +279,7 @@ impl TpccTest for NewOrdTest { tx: &mut DBTransaction, num_ware: usize, args: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError> { let mut all_local = 1; let notfound = MAX_ITEMS + 1; @@ -248,7 +314,7 @@ impl TpccTest for NewOrdTest { let args = NewOrdArgs::new( args.joins, w_id, d_id, c_id, ol_cnt, all_local, itemid, supware, qty, ); - NewOrd::run(tx, &args)?; + NewOrd::run(tx, &args, statements)?; Ok(()) } diff --git a/tpcc/src/order_stat.rs b/tpcc/src/order_stat.rs index 73642953..982b7a69 100644 --- a/tpcc/src/order_stat.rs +++ b/tpcc/src/order_stat.rs @@ -1,7 +1,8 @@ use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; -use fnck_sql::db::DBTransaction; +use fnck_sql::db::{DBTransaction, Statement}; use fnck_sql::storage::Storage; +use fnck_sql::types::value::DataValue; use rand::prelude::ThreadRng; use rand::Rng; use rust_decimal::Decimal; @@ -39,13 +40,31 @@ pub(crate) struct OrderStatTest; impl TpccTransaction for OrderStat { type Args = OrderStatArgs; - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError> { let (c_balance, c_first, c_middle, c_last) = if args.by_name { // SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" - let (_, tuples) = tx.run(format!("SELECT count(c_id) FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}'", args.w_id, args.d_id, args.c_last))?; + let (_, tuples) = tx.execute( + &statements[0], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::from(args.c_last.clone())), + ], + )?; let mut name_cnt = tuples[0].values[0].i32().unwrap() as usize; - // SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" - let (_, tuples) = tx.run(format!("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", args.w_id, args.d_id, args.c_last))?; + // SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ? ORDER BY c_first" + let (_, tuples) = tx.execute( + &statements[1], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::from(args.c_last.clone())), + ], + )?; if name_cnt % 2 == 1 { name_cnt += 1; @@ -64,15 +83,33 @@ impl TpccTransaction for OrderStat { (c_balance, c_first, c_middle, c_last) } else { // "SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let (_, tuples) = tx.run(format!("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.w_id, args.d_id, args.c_id))?; + let (_, tuples) = tx.execute( + &statements[2], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int32(Some(args.c_id as i32))), + ], + )?; let c_balance = tuples[0].values[0].decimal().unwrap(); let c_first = tuples[0].values[1].utf8().unwrap(); let c_middle = tuples[0].values[2].utf8().unwrap(); let c_last = tuples[0].values[3].utf8().unwrap(); (c_balance, c_first, c_middle, c_last) }; + // TODO: Join Eq // "SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = ? AND o_d_id = ? AND o_c_id = ? AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = ? AND o_d_id = ? AND o_c_id = ?)" - let (_, tuples) = tx.run(format!("SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = {} AND o_d_id = {} AND o_c_id = {} AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = {} AND o_d_id = {} AND o_c_id = {})", args.w_id, args.d_id, args.c_id, args.w_id, args.d_id, args.c_id))?; + let (_, tuples) = tx.execute( + &statements[3], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int32(Some(args.c_id as i32))), + ("?4", DataValue::Int16(Some(args.w_id as i16))), + ("?5", DataValue::Int8(Some(args.d_id as i8))), + ("?6", DataValue::Int32(Some(args.c_id as i32))), + ], + )?; if tuples.is_empty() { return Err(TpccError::EmptyTuples); } @@ -80,7 +117,14 @@ impl TpccTransaction for OrderStat { // let o_entry_d = tuples[0].values[1].datetime().unwrap(); // let o_carrier_id = tuples[0].values[2].i32().unwrap(); // "SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id = ?" - let (_, tuples) = tx.run(format!("SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = {} AND ol_d_id = {} AND ol_o_id = {}", args.w_id, args.d_id, o_id))?; + let (_, tuples) = tx.execute( + &statements[4], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int32(Some(o_id))), + ], + )?; // let ol_i_id = tuples[0].values[0].i32(); // let ol_supply_w_id = tuples[0].values[1].i16(); // let ol_quantity = tuples[0].values[2].i8(); @@ -102,6 +146,7 @@ impl TpccTest for OrderStatTest { tx: &mut DBTransaction, num_ware: usize, _: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE); @@ -110,7 +155,7 @@ impl TpccTest for OrderStatTest { let by_name = rng.gen_range(1..100) <= 60; let args = OrderStatArgs::new(w_id, d_id, by_name, c_id, c_last); - OrderStat::run(tx, &args)?; + OrderStat::run(tx, &args, statements)?; Ok(()) } diff --git a/tpcc/src/payment.rs b/tpcc/src/payment.rs index 4e6559ea..93ba8586 100644 --- a/tpcc/src/payment.rs +++ b/tpcc/src/payment.rs @@ -1,8 +1,9 @@ use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; use chrono::Utc; -use fnck_sql::db::DBTransaction; +use fnck_sql::db::{DBTransaction, Statement}; use fnck_sql::storage::Storage; +use fnck_sql::types::value::DataValue; use rand::prelude::ThreadRng; use rand::Rng; use rust_decimal::Decimal; @@ -51,15 +52,25 @@ impl TpccTransaction for Payment { type Args = PaymentArgs; #[allow(unused_variables)] - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError> { + let now = Utc::now(); // "UPDATE warehouse SET w_ytd = w_ytd + ? WHERE w_id = ?" - let _ = tx.run(format!( - "UPDATE warehouse SET w_ytd = w_ytd + {} WHERE w_id = {}", - args.h_amount, args.w_id - ))?; - + let (_, tuples) = tx.execute( + &statements[0], + vec![ + ("?1", DataValue::Decimal(Some(args.h_amount))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; // "SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = ?" - let (_, tuples) = tx.run(format!("SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = {}", args.w_id))?; + let (_, tuples) = tx.execute( + &statements[1], + vec![("?1", DataValue::Int16(Some(args.w_id as i16)))], + )?; let w_street_1 = tuples[0].values[0].utf8().unwrap(); let w_street_2 = tuples[0].values[1].utf8().unwrap(); let w_city = tuples[0].values[2].utf8().unwrap(); @@ -68,13 +79,23 @@ impl TpccTransaction for Payment { let w_name = tuples[0].values[5].utf8().unwrap(); // "UPDATE district SET d_ytd = d_ytd + ? WHERE d_w_id = ? AND d_id = ?" - let _ = tx.run(format!( - "UPDATE district SET d_ytd = d_ytd + {} WHERE d_w_id = {} AND d_id = {}", - args.h_amount, args.w_id, args.d_id - ))?; + let (_, tuples) = tx.execute( + &statements[2], + vec![ + ("?1", DataValue::Decimal(Some(args.h_amount))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ("?3", DataValue::Int8(Some(args.d_id as i8))), + ], + )?; // "SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = ? AND d_id = ?" - let (_, tuples) = tx.run(format!("SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = {} AND d_id = {}", args.w_id, args.d_id))?; + let (_, tuples) = tx.execute( + &statements[3], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ], + )?; let d_street_1 = tuples[0].values[0].utf8().unwrap(); let d_street_2 = tuples[0].values[1].utf8().unwrap(); let d_city = tuples[0].values[2].utf8().unwrap(); @@ -85,11 +106,25 @@ impl TpccTransaction for Payment { let mut c_id = args.c_id as i32; if args.by_name { // "SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" - let (_, tuples) = tx.run(format!("SELECT count(c_id) FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}'", args.c_w_id, args.c_d_id, args.c_last))?; + let (_, tuples) = tx.execute( + &statements[4], + vec![ + ("?1", DataValue::Int16(Some(args.c_w_id as i16))), + ("?2", DataValue::Int8(Some(args.c_d_id as i8))), + ("?3", DataValue::from(args.c_last.clone())), + ], + )?; let mut name_cnt = tuples[0].values[0].i32().unwrap(); // "SELECT c_id FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ? ORDER BY c_first" - let (_, tuples) = tx.run(format!("SELECT c_id FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", args.c_w_id, args.c_d_id, args.c_last))?; + let (_, tuples) = tx.execute( + &statements[5], + vec![ + ("?1", DataValue::Int16(Some(args.c_w_id as i16))), + ("?2", DataValue::Int8(Some(args.c_d_id as i8))), + ("?3", DataValue::from(args.c_last.clone())), + ], + )?; if name_cnt % 2 == 1 { name_cnt += 1; } @@ -98,7 +133,14 @@ impl TpccTransaction for Payment { } } // "SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ? FOR UPDATE" - let (_, tuples) = tx.run(format!("SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", args.c_w_id, args.c_d_id, c_id))?; + let (_, tuples) = tx.execute( + &statements[6], + vec![ + ("?1", DataValue::Int16(Some(args.c_w_id as i16))), + ("?2", DataValue::Int8(Some(args.c_d_id as i8))), + ("?3", DataValue::Int32(Some(c_id))), + ], + )?; let c_first = tuples[0].values[0].utf8().unwrap(); let c_middle = tuples[0].values[1].utf8().unwrap(); let c_last = tuples[0].values[2].utf8().unwrap(); @@ -118,30 +160,69 @@ impl TpccTransaction for Payment { if let Some(c_credit) = c_credit { if c_credit.contains("BC") { // "SELECT c_data FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let (_, tuples) = tx.run(format!( - "SELECT c_data FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", - args.c_w_id, args.c_d_id, c_id - ))?; + let (_, tuples) = tx.execute( + &statements[7], + vec![ + ("?1", DataValue::Int16(Some(args.c_w_id as i16))), + ("?2", DataValue::Int8(Some(args.c_d_id as i8))), + ("?3", DataValue::Int32(Some(c_id))), + ], + )?; let c_data = tuples[0].values[0].utf8().unwrap(); // https://github.com/AgilData/tpcc/blob/dfbabe1e35cc93b2bf2e107fc699eb29c2097e24/src/main/java/com/codefutures/tpcc/Payment.java#L284 // let c_new_data = format!("| {} {} {} {} {} {} {}", c_id, args.c_d_id, args.c_w_id, args.d_id, args.w_id, args.h_amount, ) // "UPDATE customer SET c_balance = ?, c_data = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let _ = tx.run(format!("UPDATE customer SET c_balance = {}, c_data = '{}' WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, c_data, args.c_w_id, args.c_d_id, c_id))?; + let (_, tuples) = tx.execute( + &statements[8], + vec![ + ("?1", DataValue::Decimal(Some(c_balance))), + ("?2", DataValue::from(c_data)), + ("?3", DataValue::Int16(Some(args.c_w_id as i16))), + ("?4", DataValue::Int8(Some(args.c_d_id as i8))), + ("?5", DataValue::Int32(Some(c_id))), + ], + )?; } else { // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let _ = tx.run(format!("UPDATE customer SET c_balance = {} WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, args.c_w_id, args.c_d_id, c_id))?; + let (_, tuples) = tx.execute( + &statements[9], + vec![ + ("?1", DataValue::Decimal(Some(c_balance))), + ("?2", DataValue::Int16(Some(args.c_w_id as i16))), + ("?3", DataValue::Int8(Some(args.c_d_id as i8))), + ("?4", DataValue::Int32(Some(c_id))), + ], + )?; } } else { // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" - let _ = tx.run(format!("UPDATE customer SET c_balance = {} WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", c_balance, args.c_w_id, args.c_d_id, c_id))?; + let (_, tuples) = tx.execute( + &statements[9], + vec![ + ("?1", DataValue::Decimal(Some(c_balance))), + ("?2", DataValue::Int16(Some(args.c_w_id as i16))), + ("?3", DataValue::Int8(Some(args.c_d_id as i8))), + ("?4", DataValue::Int32(Some(c_id))), + ], + )?; } let h_data = format!("\\0{d_name} \\0"); - - let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); // "INSERT INTO history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES(?, ?, ?, ?, ?, ?, ?, ?)" - let _ = tx.run(format!("INSERT OVERWRITE history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES({}, {}, {}, {}, {}, '{}', {}, '{}')", args.c_d_id, args.c_w_id, c_id, args.d_id, args.w_id, now, args.h_amount, h_data))?; + let (_, tuples) = tx.execute( + &statements[10], + vec![ + ("?1", DataValue::Int8(Some(args.c_d_id as i8))), + ("?2", DataValue::Int16(Some(args.c_w_id as i16))), + ("?3", DataValue::Int32(Some(c_id))), + ("?4", DataValue::Int8(Some(args.d_id as i8))), + ("?5", DataValue::Int16(Some(args.w_id as i16))), + ("?6", DataValue::from(&now.naive_utc())), + ("?7", DataValue::Decimal(Some(args.h_amount))), + ("?8", DataValue::from(h_data)), + ], + )?; Ok(()) } @@ -158,6 +239,7 @@ impl TpccTest for PaymentTest { tx: &mut DBTransaction, num_ware: usize, _: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE); @@ -187,7 +269,7 @@ impl TpccTest for PaymentTest { c_last, Decimal::from(h_amount), ); - Payment::run(tx, &args)?; + Payment::run(tx, &args, statements)?; Ok(()) } diff --git a/tpcc/src/slev.rs b/tpcc/src/slev.rs index 00b09df8..042c0c53 100644 --- a/tpcc/src/slev.rs +++ b/tpcc/src/slev.rs @@ -1,7 +1,8 @@ use crate::load::DIST_PER_WARE; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; -use fnck_sql::db::DBTransaction; +use fnck_sql::db::{DBTransaction, Statement}; use fnck_sql::storage::Storage; +use fnck_sql::types::value::DataValue; use rand::prelude::ThreadRng; use rand::Rng; @@ -24,21 +25,40 @@ pub(crate) struct SlevTest; impl TpccTransaction for Slev { type Args = SlevArgs; - fn run(tx: &mut DBTransaction, args: &Self::Args) -> Result<(), TpccError> { + fn run( + tx: &mut DBTransaction, + args: &Self::Args, + statements: &[Statement], + ) -> Result<(), TpccError> { // "SELECT d_next_o_id FROM district WHERE d_id = ? AND d_w_id = ?" - let (_, tuples) = tx.run(format!( - "SELECT d_next_o_id FROM district WHERE d_id = {} AND d_w_id = {}", - args.d_id, args.w_id - ))?; + let (_, tuples) = tx.execute( + &statements[0], + vec![ + ("?1", DataValue::Int8(Some(args.d_id as i8))), + ("?2", DataValue::Int16(Some(args.w_id as i16))), + ], + )?; let d_next_o_id = tuples[0].values[0].i32().unwrap(); // "SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id < ? AND ol_o_id >= (? - 20)" - let (_, tuples) = tx.run(format!("SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = {} AND ol_d_id = {} AND ol_o_id < {} AND ol_o_id >= ({} - 20)", args.w_id, args.d_id, d_next_o_id, d_next_o_id))?; + let (_, tuples) = tx.execute( + &statements[1], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(args.d_id as i8))), + ("?3", DataValue::Int32(Some(d_next_o_id))), + ("?4", DataValue::Int32(Some(d_next_o_id))), + ], + )?; let ol_i_id = tuples[0].values[0].i32().unwrap(); // "SELECT count(*) FROM stock WHERE s_w_id = ? AND s_i_id = ? AND s_quantity < ?" - let (_, tuples) = tx.run(format!( - "SELECT count(*) FROM stock WHERE s_w_id = {} AND s_i_id = {} AND s_quantity < {}", - args.w_id, ol_i_id, args.level - ))?; + let (_, tuples) = tx.execute( + &statements[2], + vec![ + ("?1", DataValue::Int16(Some(args.w_id as i16))), + ("?2", DataValue::Int8(Some(ol_i_id as i8))), + ("?3", DataValue::Int16(Some(args.level as i16))), + ], + )?; // let i_count = tuples[0].values[0].i32().unwrap(); Ok(()) @@ -56,13 +76,14 @@ impl TpccTest for SlevTest { tx: &mut DBTransaction, num_ware: usize, _: &TpccArgs, + statements: &[Statement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE); let level = rng.gen_range(10..20); let args = SlevArgs::new(w_id, d_id, level); - Slev::run(tx, &args)?; + Slev::run(tx, &args, statements)?; Ok(()) }