Skip to content

Commit

Permalink
perf: reduce the overhead incurred when deserializing Tuple (#126)
Browse files Browse the repository at this point in the history
* perf: only deserialize the given column on `Tuple::deserialize_from`

* perf: change Tuple's Column to SchemaRef to avoid repeated collection
  • Loading branch information
KKould authored Feb 2, 2024
1 parent 280b360 commit ac94e3c
Show file tree
Hide file tree
Showing 25 changed files with 333 additions and 182 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ahash = "0.8.3"
lazy_static = "1.4.0"
comfy-table = "7.0.1"
bytes = "1.5.0"
kip_db = "0.1.2-alpha.24"
kip_db = "0.1.2-alpha.25"
rust_decimal = "1"
csv = "1"
regex = "1.10.2"
Expand Down
19 changes: 18 additions & 1 deletion src/catalog/table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;

use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::errors::DatabaseError;
use crate::types::index::{IndexMeta, IndexMetaRef};
use crate::types::ColumnId;
use crate::types::{ColumnId, LogicalType};

pub type TableName = Arc<String>;

Expand Down Expand Up @@ -58,6 +59,22 @@ impl TableCatalog {
self.columns.values().map(Arc::clone).collect()
}

pub(crate) fn primary_key(&self) -> Result<(usize, &ColumnRef), DatabaseError> {
self.columns
.iter()
.map(|(_, column)| column)
.enumerate()
.find(|(_, column)| column.desc.is_primary)
.ok_or(DatabaseError::PrimaryKeyNotFound)
}

pub(crate) fn types(&self) -> Vec<LogicalType> {
self.columns
.iter()
.map(|(_, column)| *column.datatype())
.collect_vec()
}

/// Add a column to the table catalog.
pub(crate) fn add_column(&mut self, mut col: ColumnCatalog) -> Result<ColumnId, DatabaseError> {
if self.column_idxs.contains_key(col.name()) {
Expand Down
9 changes: 8 additions & 1 deletion src/execution/volcano/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ impl AddColumn {
if_not_exists,
} = &self.op;
let mut unique_values = column.desc().is_unique.then(|| Vec::new());
let mut tuple_columns = None;
let mut tuples = Vec::new();

#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;

tuple.columns.push(Arc::new(column.clone()));
let tuples_columns = tuple_columns.get_or_insert_with(|| {
let mut columns = Vec::clone(&tuple.columns);

columns.push(Arc::new(column.clone()));
Arc::new(columns)
});
if let Some(value) = column.default_value() {
if let Some(unique_values) = &mut unique_values {
unique_values.push((tuple.id.clone().unwrap(), value.clone()));
Expand All @@ -51,6 +57,7 @@ impl AddColumn {
} else {
tuple.values.push(Arc::new(DataValue::Null));
}
tuple.columns = tuples_columns.clone();
tuples.push(tuple);
}
for tuple in tuples {
Expand Down
17 changes: 11 additions & 6 deletions src/execution/volcano/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct DropColumn {
op: DropColumnOperator,
Expand All @@ -32,14 +33,14 @@ impl DropColumn {
column_name,
if_exists,
} = &self.op;
let mut option_column_i = None;
let mut tuple_columns = None;
let mut tuples = Vec::new();

#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;

if option_column_i.is_none() {
if tuple_columns.is_none() {
if let Some((column_index, is_primary)) = tuple
.columns
.iter()
Expand All @@ -52,16 +53,20 @@ impl DropColumn {
"drop of primary key column is not allowed.".to_owned(),
))?;
}
option_column_i = Some(column_index);
let mut columns = Vec::clone(&tuple.columns);
let _ = columns.remove(column_index);

tuple_columns = Some((column_index, Arc::new(columns)));
}
}
if option_column_i.is_none() && *if_exists {
if tuple_columns.is_none() && *if_exists {
return Ok(());
}
let column_i = option_column_i
let (column_i, columns) = tuple_columns
.clone()
.ok_or_else(|| DatabaseError::InvalidColumn("not found column".to_string()))?;

let _ = tuple.columns.remove(column_i);
tuple.columns = columns;
let _ = tuple.values.remove(column_i);

tuples.push(tuple);
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Analyze {

yield Tuple {
id: None,
columns,
columns: Arc::new(columns),
values,
};
}
Expand Down
33 changes: 19 additions & 14 deletions src/execution/volcano/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::index::Index;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::DataValue;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -51,9 +53,11 @@ impl Insert {
} = self;
let mut primary_key_index = None;
let mut unique_values = HashMap::new();
let mut tuples = Vec::new();
let mut tuple_values = Vec::new();

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let all_columns = table_catalog.all_columns_with_id();

#[for_await]
for tuple in build_read(input, transaction) {
let Tuple {
Expand All @@ -74,14 +78,10 @@ impl Insert {
.map(|col| col.id().unwrap())
.unwrap()
});
let all_columns = table_catalog.all_columns_with_id();
let tuple_id = tuple_map.get(primary_col_id).cloned().unwrap();
let mut tuple = Tuple {
id: Some(tuple_id.clone()),
columns: Vec::with_capacity(all_columns.len()),
values: Vec::with_capacity(all_columns.len()),
};
for (col_id, col) in all_columns {
let mut values = Vec::with_capacity(all_columns.len());

for (col_id, col) in &all_columns {
let value = tuple_map
.remove(col_id)
.or_else(|| col.default_value())
Expand All @@ -96,14 +96,19 @@ impl Insert {
if value.is_null() && !col.nullable {
return Err(DatabaseError::NotNull);
}

tuple.columns.push(col.clone());
tuple.values.push(value)
values.push(value)
}

tuples.push(tuple);
tuple_values.push((tuple_id, values));
}
for tuple in tuples {
let tuple_columns = all_columns
.into_iter()
.map(|(_, column)| column.clone())
.collect_vec();
let tuple_builder = TupleBuilder::new(tuple_columns);

for (tuple_id, values) in tuple_values {
let tuple = tuple_builder.build(Some(tuple_id), values)?;

transaction.append(&table_name, tuple, is_overwrite)?;
}
// Unique Index
Expand Down
6 changes: 5 additions & 1 deletion src/execution/volcano/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::types::value::ValueRef;
use ahash::HashMap;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::mem;
use std::sync::Arc;

pub struct HashAggExecutor {
agg_calls: Vec<ScalarExpression>,
Expand Down Expand Up @@ -108,6 +110,8 @@ impl HashAggStatus {
}

pub(crate) fn to_tuples(&mut self) -> Result<Vec<Tuple>, DatabaseError> {
let group_columns = Arc::new(mem::replace(&mut self.group_columns, vec![]));

Ok(self
.group_hash_accs
.drain()
Expand All @@ -121,7 +125,7 @@ impl HashAggStatus {

Ok::<Tuple, DatabaseError>(Tuple {
id: None,
columns: self.group_columns.clone(),
columns: group_columns.clone(),
values,
})
})
Expand Down
3 changes: 2 additions & 1 deletion src/execution/volcano/dql/aggregate/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::tuple::Tuple;
use crate::types::value::ValueRef;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::sync::Arc;

pub struct SimpleAggExecutor {
agg_calls: Vec<ScalarExpression>,
Expand Down Expand Up @@ -65,7 +66,7 @@ impl SimpleAggExecutor {

yield Tuple {
id: None,
columns,
columns: Arc::new(columns),
values,
};
}
Expand Down
6 changes: 2 additions & 4 deletions src/execution/volcano/dql/explain.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::catalog::ColumnCatalog;
use crate::catalog::ColumnRef;
use crate::errors::DatabaseError;
use crate::execution::volcano::{BoxedExecutor, ReadExecutor};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::types::value::ValueRef;
use futures_async_stream::try_stream;
use std::sync::Arc;

Expand All @@ -29,8 +27,8 @@ impl<T: Transaction> ReadExecutor<T> for Explain {
impl Explain {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute(self) {
let columns: Vec<ColumnRef> = vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))];
let values: Vec<ValueRef> = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))];
let columns = Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))]);
let values = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))];

yield Tuple {
id: None,
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl IndexScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
projection_columns: columns,
columns,
limit,
..
} = self.op;
Expand Down
31 changes: 21 additions & 10 deletions src/execution/volcano/dql/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl HashJoinStatus {
Self::columns_filling(&tuple, join_columns, *right_force_nullable);
let _ = mem::replace(right_init_flag, true);
}
let join_columns = Arc::new(join_columns.clone());

let mut join_tuples = if let Some(tuples) = build_map.get(&hash) {
let _ = used_set.insert(hash);
Expand Down Expand Up @@ -234,6 +235,8 @@ impl HashJoinStatus {

matches!(ty, JoinType::Left | JoinType::Full)
.then(|| {
let mut buf = None;

build_map
.drain()
.filter(|(hash, _)| !used_set.contains(hash))
Expand All @@ -245,16 +248,24 @@ impl HashJoinStatus {
} in tuples.iter_mut()
{
let _ = mem::replace(id, None);
let (mut right_values, mut right_columns): (
Vec<ValueRef>,
Vec<ColumnRef>,
) = join_columns[columns.len()..]
.iter()
.map(|col| (Arc::new(DataValue::none(col.datatype())), col.clone()))
.unzip();

values.append(&mut right_values);
columns.append(&mut right_columns);
let (right_values, full_columns) = buf.get_or_insert_with(|| {
let (right_values, mut right_columns): (
Vec<ValueRef>,
Vec<ColumnRef>,
) = join_columns[columns.len()..]
.iter()
.map(|col| {
(Arc::new(DataValue::none(col.datatype())), col.clone())
})
.unzip();
let mut full_columns = Vec::clone(columns);
full_columns.append(&mut right_columns);

(right_values, Arc::new(full_columns))
});

values.append(&mut right_values.clone());
*columns = full_columns.clone();
}
tuples
})
Expand Down
16 changes: 11 additions & 5 deletions src/execution/volcano/dql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct Projection {
exprs: Vec<ScalarExpression>,
Expand All @@ -28,20 +29,25 @@ impl Projection {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let Projection { exprs, input } = self;
let mut columns = None;

#[for_await]
for tuple in build_read(input, transaction) {
let mut tuple = tuple?;

let mut columns = Vec::with_capacity(exprs.len());
let mut values = Vec::with_capacity(exprs.len());
let columns = columns.get_or_insert_with(|| {
let mut columns = Vec::with_capacity(exprs.len());

for expr in exprs.iter() {
columns.push(expr.output_column());
}
Arc::new(columns)
});

for expr in exprs.iter() {
values.push(expr.eval(&tuple)?);
columns.push(expr.output_column());
}

tuple.columns = columns;
tuple.columns = columns.clone();
tuple.values = values;

yield tuple;
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SeqScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
projection_columns: columns,
columns,
limit,
..
} = self.op;
Expand Down
Loading

0 comments on commit ac94e3c

Please sign in to comment.