Skip to content

Commit

Permalink
more type safety for delta plan compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime committed Jan 9, 2025
1 parent 8d94b29 commit cfb0a7b
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Deref;

use spacetimedb_execution::{Datastore, DeltaStore};
use spacetimedb_lib::ProductValue;
use spacetimedb_lib::{query::Delta, ProductValue};
use spacetimedb_primitives::TableId;
use spacetimedb_table::{blob_store::BlobStore, table::Table};

Expand Down Expand Up @@ -44,19 +44,19 @@ impl Datastore for DeltaTx<'_> {
}

impl DeltaStore for DeltaTx<'_> {
fn has_inserts(&self, table_id: TableId) -> Option<usize> {
fn has_inserts(&self, table_id: TableId) -> Option<Delta> {
self.data.and_then(|data| {
data.inserts()
.find(|(id, rows)| **id == table_id && !rows.is_empty())
.map(|(_, rows)| rows.len())
.map(|(_, rows)| Delta::Inserts(rows.len()))
})
}

fn has_deletes(&self, table_id: TableId) -> Option<usize> {
fn has_deletes(&self, table_id: TableId) -> Option<Delta> {
self.data.and_then(|data| {
data.deletes()
.find(|(id, rows)| **id == table_id && !rows.is_empty())
.map(|(_, rows)| rows.len())
.map(|(_, rows)| Delta::Deletes(rows.len()))
})
}

Expand Down
3 changes: 1 addition & 2 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::collections::{HashMap, HashSet};

use anyhow::{anyhow, bail, Result};
use spacetimedb_expr::expr::Delta;
use spacetimedb_lib::{AlgebraicValue, ProductValue};
use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue};
use spacetimedb_physical_plan::plan::{
HashJoin, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectPlan, Sarg, Semi, TupleField,
};
Expand Down
5 changes: 3 additions & 2 deletions crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result};
use iter::PlanIter;
use spacetimedb_lib::{
bsatn::{EncodeError, ToBsatn},
query::Delta,
ser::Serialize,
AlgebraicValue, ProductValue,
};
Expand Down Expand Up @@ -50,8 +51,8 @@ pub trait Datastore {
}

pub trait DeltaStore {
fn has_inserts(&self, table_id: TableId) -> Option<usize>;
fn has_deletes(&self, table_id: TableId) -> Option<usize>;
fn has_inserts(&self, table_id: TableId) -> Option<Delta>;
fn has_deletes(&self, table_id: TableId) -> Option<Delta>;

fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
Expand Down
8 changes: 1 addition & 7 deletions crates/expr/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use spacetimedb_lib::{AlgebraicType, AlgebraicValue};
use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue};
use spacetimedb_primitives::TableId;
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
Expand Down Expand Up @@ -87,12 +87,6 @@ pub struct Relvar {
pub delta: Option<Delta>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Delta {
Inserts(usize),
Deletes(usize),
}

impl RelExpr {
/// The number of fields this expression returns
pub fn nfields(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions crates/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod db;
pub mod error;
pub mod identity;
pub mod operator;
pub mod query;
pub mod relation;
pub mod scheduler;
pub mod version;
Expand Down
6 changes: 6 additions & 0 deletions crates/lib/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/// A type used by the query planner for incremental evaluation
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Delta {
Inserts(usize),
Deletes(usize),
}
4 changes: 2 additions & 2 deletions crates/physical-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
};

use derive_more::From;
use spacetimedb_expr::{expr::Delta, StatementSource};
use spacetimedb_lib::AlgebraicValue;
use spacetimedb_expr::StatementSource;
use spacetimedb_lib::{query::Delta, AlgebraicValue};
use spacetimedb_primitives::{ColId, ColSet, IndexId};
use spacetimedb_schema::schema::{IndexSchema, TableSchema};
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
Expand Down
1 change: 1 addition & 0 deletions crates/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rayon.workspace = true
spacetimedb-client-api-messages.workspace = true
spacetimedb-execution.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-lib.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-sql-parser.workspace = true
Expand Down
72 changes: 26 additions & 46 deletions crates/query/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::ops::Deref;
use anyhow::{bail, Result};
use itertools::Either;
use spacetimedb_execution::{iter::ProjectIter, Datastore, DeltaStore, Row};
use spacetimedb_expr::{
check::{type_subscription, SchemaView},
expr::Delta,
};
use spacetimedb_expr::check::{type_subscription, SchemaView};
use spacetimedb_lib::query::Delta;
use spacetimedb_physical_plan::{
compile::compile_project_plan,
plan::{HashJoin, IxJoin, Label, PhysicalPlan, ProjectPlan},
Expand Down Expand Up @@ -205,12 +203,10 @@ impl SelectPlan {
DeltaPlanEvaluator {
insert_plans: tx
.has_inserts(self.table_id)
.map(Delta::Inserts)
.map(|delta| delta_plan(&self.plan, self.table_id, delta))
.unwrap_or_default(),
delete_plans: tx
.has_deletes(self.table_id)
.map(Delta::Deletes)
.map(|delta| delta_plan(&self.plan, self.table_id, delta))
.unwrap_or_default(),
}
Expand Down Expand Up @@ -331,101 +327,85 @@ impl JoinPlan {
});
}

/// Instantiate an optimized delta plan
fn delta_plan_opt(plan: &ProjectPlan, label: Label, delta: Delta) -> ProjectPlan {
/// Instantiate and optimize a delta plan
fn delta_plan_opt1(plan: &ProjectPlan, label: Label, delta: Delta) -> ProjectPlan {
let mut plan = plan.clone();
delta_plan(&mut plan, label, delta);
plan.optimize()
}

/// Instantiate and optimize a delta plan
fn delta_plan_opt2(plan: &ProjectPlan, lhs: Label, n: Delta, rhs: Label, m: Delta) -> ProjectPlan {
let mut plan = plan.clone();
delta_plan(&mut plan, lhs, n);
delta_plan(&mut plan, rhs, m);
plan.optimize()
}

// dr(+)S'
let dr_ins = tx
.has_inserts(self.lhs_table)
.map(Delta::Inserts)
.map(|delta| delta_plan_opt(&self.plan, self.lhs_label, delta))
.map(|delta| delta_plan_opt1(&self.plan, self.lhs_label, delta))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// dr(-)S'
let dr_del = tx
.has_deletes(self.lhs_table)
.map(Delta::Deletes)
.map(|delta| delta_plan_opt(&self.plan, self.lhs_label, delta))
.map(|delta| delta_plan_opt1(&self.plan, self.lhs_label, delta))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// R'ds(+)
let ds_ins = tx
.has_inserts(self.rhs_table)
.map(Delta::Inserts)
.map(|delta| delta_plan_opt(&self.plan, self.rhs_label, delta))
.has_inserts(self.lhs_table)
.map(|delta| delta_plan_opt1(&self.plan, self.rhs_label, delta))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// R'ds(-)
let ds_del = tx
.has_deletes(self.rhs_table)
.map(Delta::Deletes)
.map(|delta| delta_plan_opt(&self.plan, self.rhs_label, delta))
.has_deletes(self.lhs_table)
.map(|delta| delta_plan_opt1(&self.plan, self.rhs_label, delta))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// dr(+)ds(+)
let dr_ins_ds_ins = tx
.has_inserts(self.lhs_table)
.and_then(|n| tx.has_inserts(self.rhs_table).map(|m| (n, m)))
.map(|(n, m)| {
let mut plan = self.plan.clone();
delta_plan(&mut plan, self.lhs_label, Delta::Inserts(n));
delta_plan(&mut plan, self.rhs_label, Delta::Inserts(m));
plan.optimize()
})
.zip(tx.has_inserts(self.rhs_table))
.map(|(n, m)| delta_plan_opt2(&self.plan, self.lhs_label, n, self.rhs_label, m))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// dr(+)ds(-)
let dr_ins_ds_del = tx
.has_inserts(self.lhs_table)
.and_then(|n| tx.has_deletes(self.rhs_table).map(|m| (n, m)))
.map(|(n, m)| {
let mut plan = self.plan.clone();
delta_plan(&mut plan, self.lhs_label, Delta::Inserts(n));
delta_plan(&mut plan, self.rhs_label, Delta::Deletes(m));
plan.optimize()
})
.zip(tx.has_deletes(self.rhs_table))
.map(|(n, m)| delta_plan_opt2(&self.plan, self.lhs_label, n, self.rhs_label, m))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// dr(-)ds(+)
let dr_del_ds_ins = tx
.has_deletes(self.lhs_table)
.and_then(|n| tx.has_inserts(self.rhs_table).map(|m| (n, m)))
.map(|(n, m)| {
let mut plan = self.plan.clone();
delta_plan(&mut plan, self.lhs_label, Delta::Deletes(n));
delta_plan(&mut plan, self.rhs_label, Delta::Inserts(m));
plan.optimize()
})
.zip(tx.has_inserts(self.rhs_table))
.map(|(n, m)| delta_plan_opt2(&self.plan, self.lhs_label, n, self.rhs_label, m))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));

// dr(-)ds(-)
let dr_del_ds_del = tx
.has_deletes(self.lhs_table)
.and_then(|n| tx.has_deletes(self.rhs_table).map(|m| (n, m)))
.map(|(n, m)| {
let mut plan = self.plan.clone();
delta_plan(&mut plan, self.lhs_label, Delta::Deletes(n));
delta_plan(&mut plan, self.rhs_label, Delta::Deletes(m));
plan.optimize()
})
.zip(tx.has_deletes(self.rhs_table))
.map(|(n, m)| delta_plan_opt2(&self.plan, self.lhs_label, n, self.rhs_label, m))
.map(std::iter::once)
.map(Either::Left)
.unwrap_or_else(|| Either::Right(std::iter::empty()));
Expand Down

0 comments on commit cfb0a7b

Please sign in to comment.