Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query engine integration #2074

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"crates/paths",
"crates/physical-plan",
"crates/primitives",
"crates/query",
"crates/sats",
"crates/schema",
"crates/sdk",
Expand Down Expand Up @@ -106,6 +107,7 @@ spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc3" }
spacetimedb-paths = { path = "crates/paths", version = "1.0.0-rc3" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "1.0.0-rc3" }
spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc3" }
spacetimedb-query = { path = "crates/query", version = "1.0.0-rc3" }
spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc3" }
spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc3" }
spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc3" }
Expand Down
2 changes: 2 additions & 0 deletions crates/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ bench = false
spacetimedb-client-api = { path = "../client-api" }
spacetimedb-core = { path = "../core", features = ["test"] }
spacetimedb-data-structures.workspace = true
spacetimedb-execution = { path = "../execution" }
spacetimedb-lib = { path = "../lib" }
spacetimedb-paths.workspace = true
spacetimedb-primitives = { path = "../primitives" }
spacetimedb-query = { path = "../query" }
spacetimedb-sats = { path = "../sats" }
spacetimedb-schema = { workspace = true, features = ["test"] }
spacetimedb-standalone = { path = "../standalone" }
Expand Down
38 changes: 37 additions & 1 deletion crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
use spacetimedb::sql::ast::SchemaViewer;
use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::subscription::tx::DeltaTx;
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_query::SubscribePlan;
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue};

fn create_table_location(db: &RelationalDB) -> Result<TableId, DBError> {
Expand Down Expand Up @@ -107,6 +110,23 @@ fn eval(c: &mut Criterion) {
let ins_rhs = insert_op(rhs, "location", new_rhs_row);
let update = [&ins_lhs, &ins_rhs];

// A benchmark runner for the new query engine
let bench_query = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Subscribe);
let auth = AuthCtx::for_testing();
let schema_viewer = &SchemaViewer::new(&tx, &auth);
let plan = SubscribePlan::compile(sql, schema_viewer).unwrap();
let tx = DeltaTx::from(&tx);

b.iter(|| {
drop(black_box(
plan.collect_table_update::<_, BsatnFormat>(Compression::None, &tx),
))
})
});
};

let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
Expand All @@ -124,6 +144,22 @@ fn eval(c: &mut Criterion) {
});
};

// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire footprint table.
let semijoin = format!(
r#"
select f.*
from footprint f join location l on f.entity_id = l.entity_id
where l.chunk_index = {chunk_index}
"#
);

let index_scan_multi = "select * from location WHERE x = 0 AND z = 10000 AND dimension = 0";

bench_query(c, "footprint-scan", "select * from footprint");
bench_query(c, "footprint-semijoin", &semijoin);
bench_query(c, "index-scan-multi", index_scan_multi);

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-scan --exact --profile-time=30
// Iterate 1M rows.
Expand All @@ -132,7 +168,7 @@ fn eval(c: &mut Criterion) {
// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-join --exact --profile-time=30
// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire lhs table.
// Note, this should use an index join so as not to read the entire footprint table.
let name = format!(
r#"
select footprint.*
Expand Down
3 changes: 3 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ spacetimedb-durability.workspace = true
spacetimedb-metrics.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-query.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-execution.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
33 changes: 24 additions & 9 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use spacetimedb_client_api_messages::websocket::{
CallReducerFlags, Compression, FormatSwitch, SubscribeSingle, Unsubscribe,
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeSingle, Unsubscribe, WebsocketFormat,
};
use spacetimedb_lib::identity::RequestId;
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -314,26 +314,41 @@ impl ClientConnection {
.unwrap()
}

pub fn one_off_query(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let result = self.module.one_off_query(self.id.identity, query.to_owned());
pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<JsonFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
}

pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<BsatnFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
}

fn one_off_query<F: WebsocketFormat>(
&self,
query: &str,
message_id: &[u8],
timer: Instant,
) -> OneOffQueryResponseMessage<F> {
let result = self.module.one_off_query::<F>(self.id.identity, query.to_owned());
let message_id = message_id.to_owned();
let total_host_execution_duration = timer.elapsed().as_micros() as u64;
let response = match result {
match result {
Ok(results) => OneOffQueryResponseMessage {
message_id,
error: None,
results,
results: vec![results],
total_host_execution_duration,
},
Err(err) => OneOffQueryResponseMessage {
message_id,
error: Some(format!("{}", err)),
results: Vec::new(),
results: vec![],
total_host_execution_duration,
},
};
self.send_message(response)?;
Ok(())
}
}

pub async fn disconnect(self) {
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::messages::{SubscriptionUpdateMessage, SwitchedServerMessage, ToProtocol, TransactionUpdateMessage};
use super::{ClientConnection, DataMessage};
use super::{ClientConnection, DataMessage, Protocol};
use crate::energy::EnergyQuanta;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
Expand Down Expand Up @@ -107,7 +107,10 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
query_string: query,
message_id,
}) => {
let res = client.one_off_query(&query, &message_id, timer);
let res = match client.config.protocol {
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
};
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Sql, &address, "")
Expand Down
Loading
Loading