diff --git a/migrations/2023-12-06-020636_deletes/down.sql b/migrations/2023-12-06-020636_deletes/down.sql new file mode 100644 index 0000000..867b7ce --- /dev/null +++ b/migrations/2023-12-06-020636_deletes/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE vss_db + DROP COLUMN deleted; diff --git a/migrations/2023-12-06-020636_deletes/up.sql b/migrations/2023-12-06-020636_deletes/up.sql new file mode 100644 index 0000000..eb2e1eb --- /dev/null +++ b/migrations/2023-12-06-020636_deletes/up.sql @@ -0,0 +1,68 @@ +ALTER TABLE vss_db + ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE; + +-- modify upsert_vss_db to set deleted to false +CREATE OR REPLACE FUNCTION upsert_vss_db( + p_store_id TEXT, + p_key TEXT, + p_value bytea, + p_version BIGINT +) RETURNS VOID AS +$$ +BEGIN + + WITH new_values (store_id, key, value, version) AS (VALUES (p_store_id, p_key, p_value, p_version)) + INSERT + INTO vss_db + (store_id, key, value, version) + SELECT new_values.store_id, + new_values.key, + new_values.value, + new_values.version + FROM new_values + LEFT JOIN vss_db AS existing + ON new_values.store_id = existing.store_id + AND new_values.key = existing.key + WHERE CASE + WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1) + ELSE new_values.version > COALESCE(existing.version, -1) + END + ON CONFLICT (store_id, key) + DO UPDATE SET value = excluded.value, + version = excluded.version, + deleted = false; + +END; +$$ LANGUAGE plpgsql; + +-- modified upsert_vss_db but to delete +CREATE OR REPLACE FUNCTION delete_item( + p_store_id TEXT, + p_key TEXT, + p_version BIGINT +) RETURNS VOID AS +$$ +BEGIN + + WITH new_values (store_id, key, version) AS (VALUES (p_store_id, p_key, p_version)) + INSERT + INTO vss_db + (store_id, key, version) + SELECT new_values.store_id, + new_values.key, + new_values.version + FROM new_values + LEFT JOIN vss_db AS existing + ON new_values.store_id = existing.store_id + AND new_values.key = existing.key + WHERE CASE + WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1) + ELSE new_values.version > COALESCE(existing.version, -1) + END + ON CONFLICT (store_id, key) + DO UPDATE SET value = NULL, + version = excluded.version, + deleted = true; + +END; +$$ LANGUAGE plpgsql; diff --git a/src/main.rs b/src/main.rs index 5e78799..3989ead 100644 --- a/src/main.rs +++ b/src/main.rs @@ -119,6 +119,8 @@ async fn main() -> anyhow::Result<()> { .route("/v2/putObjects", put(put_objects)) .route("/listKeyVersions", post(list_key_versions)) .route("/v2/listKeyVersions", post(list_key_versions)) + .route("/deleteObject", post(delete_object)) + .route("/v2/deleteObject", post(delete_object)) .route("/migration", get(migration::migration)) .fallback(fallback) .layer( diff --git a/src/models/mod.rs b/src/models/mod.rs index 0da8931..7610383 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -31,6 +31,8 @@ pub struct VssItem { created_date: chrono::NaiveDateTime, updated_date: chrono::NaiveDateTime, + + pub deleted: bool, } impl VssItem { @@ -68,6 +70,21 @@ impl VssItem { Ok(()) } + pub fn delete_item( + conn: &mut PgConnection, + store_id: &str, + key: &str, + version: i64, + ) -> anyhow::Result<()> { + sql_query("SELECT delete_item($1, $2, $3)") + .bind::(store_id) + .bind::(key) + .bind::(version) + .execute(conn)?; + + Ok(()) + } + pub fn list_key_versions( conn: &mut PgConnection, store_id: &str, @@ -209,6 +226,54 @@ mod test { clear_database(&state); } + #[tokio::test] + async fn test_delete() { + let state = init_state(); + clear_database(&state); + + let store_id = "test_store_id"; + let key = "test"; + let value = [1, 2, 3]; + let version = 0; + + let mut conn = state.db_pool.get().unwrap(); + VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap(); + + let versions = VssItem::list_key_versions(&mut conn, store_id, None).unwrap(); + + assert_eq!(versions.len(), 1); + assert_eq!(versions[0].0, key); + assert_eq!(versions[0].1, version); + + // delete item + let new_version = version + 1; + VssItem::delete_item(&mut conn, store_id, key, new_version).unwrap(); + let item = VssItem::get_item(&mut conn, store_id, key) + .unwrap() + .unwrap(); + assert!(item.value.is_none()); + assert!(item.deleted); + + // bring item back with higher version + + let final_value = [4, 5, 6]; + let final_version = new_version + 1; + + VssItem::put_item(&mut conn, store_id, key, &final_value, final_version).unwrap(); + + let item = VssItem::get_item(&mut conn, store_id, key) + .unwrap() + .unwrap(); + + assert_eq!(item.store_id, store_id); + assert_eq!(item.key, key); + assert_eq!(item.value.unwrap(), final_value); + assert_eq!(item.version, final_version); + assert!(!item.deleted); + + clear_database(&state); + } + #[tokio::test] async fn test_list_key_versions() { let state = init_state(); diff --git a/src/models/schema.rs b/src/models/schema.rs index de1aed0..db62d6d 100644 --- a/src/models/schema.rs +++ b/src/models/schema.rs @@ -8,5 +8,6 @@ diesel::table! { version -> Int8, created_date -> Timestamp, updated_date -> Timestamp, + deleted -> Bool, } } diff --git a/src/routes.rs b/src/routes.rs index 7003a73..fccd83b 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -216,6 +216,46 @@ pub async fn list_key_versions( } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteObjectRequest { + pub store_id: Option, + pub key: String, + pub version: i64, +} + +async fn delete_object_impl(req: DeleteObjectRequest, state: &State) -> anyhow::Result<()> { + let store_id = req.store_id.expect("must have"); + + let mut conn = state.db_pool.get()?; + + VssItem::delete_item(&mut conn, &store_id, &req.key, req.version)?; + + Ok(()) +} + +pub async fn delete_object( + origin: Option>, + auth: Option>>, + Extension(state): Extension, + Json(mut payload): Json, +) -> Result, (StatusCode, String)> { + if !state.self_hosted { + validate_cors(origin)?; + } + + let store_id = auth + .map(|TypedHeader(token)| verify_token(token.token(), &state)) + .transpose()? + .flatten(); + + ensure_store_id!(payload, store_id); + + match delete_object_impl(payload, &state).await { + Ok(res) => Ok(Json(res)), + Err(e) => Err(handle_anyhow_error("delete_object", e)), + } +} + pub async fn health_check() -> Result, (StatusCode, String)> { Ok(Json(())) }