Skip to content

Commit

Permalink
Add delete_object
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Dec 6, 2023
1 parent af3c3b5 commit 0081a69
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 0 deletions.
2 changes: 2 additions & 0 deletions migrations/2023-12-06-020636_deletes/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE vss_db
DROP COLUMN deleted;
68 changes: 68 additions & 0 deletions migrations/2023-12-06-020636_deletes/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
ALTER TABLE vss_db
ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE;

-- modify upsert_vss_db to set deleted to false
CREATE OR REPLACE FUNCTION upsert_vss_db(
p_store_id TEXT,
p_key TEXT,
p_value bytea,
p_version BIGINT
) RETURNS VOID AS
$$
BEGIN

WITH new_values (store_id, key, value, version) AS (VALUES (p_store_id, p_key, p_value, p_version))
INSERT
INTO vss_db
(store_id, key, value, version)
SELECT new_values.store_id,
new_values.key,
new_values.value,
new_values.version
FROM new_values
LEFT JOIN vss_db AS existing
ON new_values.store_id = existing.store_id
AND new_values.key = existing.key
WHERE CASE
WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1)
ELSE new_values.version > COALESCE(existing.version, -1)
END
ON CONFLICT (store_id, key)
DO UPDATE SET value = excluded.value,
version = excluded.version,
deleted = false;

END;
$$ LANGUAGE plpgsql;

-- modified upsert_vss_db but to delete
CREATE OR REPLACE FUNCTION delete_item(
p_store_id TEXT,
p_key TEXT,
p_version BIGINT
) RETURNS VOID AS
$$
BEGIN

WITH new_values (store_id, key, version) AS (VALUES (p_store_id, p_key, p_version))
INSERT
INTO vss_db
(store_id, key, version)
SELECT new_values.store_id,
new_values.key,
new_values.version
FROM new_values
LEFT JOIN vss_db AS existing
ON new_values.store_id = existing.store_id
AND new_values.key = existing.key
WHERE CASE
WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1)
ELSE new_values.version > COALESCE(existing.version, -1)
END
ON CONFLICT (store_id, key)
DO UPDATE SET value = NULL,
version = excluded.version,
deleted = true;

END;
$$ LANGUAGE plpgsql;
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ async fn main() -> anyhow::Result<()> {
.route("/v2/putObjects", put(put_objects))
.route("/listKeyVersions", post(list_key_versions))
.route("/v2/listKeyVersions", post(list_key_versions))
.route("/deleteObject", post(delete_object))
.route("/v2/deleteObject", post(delete_object))
.route("/migration", get(migration::migration))
.fallback(fallback)
.layer(
Expand Down
65 changes: 65 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct VssItem {

created_date: chrono::NaiveDateTime,
updated_date: chrono::NaiveDateTime,

pub deleted: bool,
}

impl VssItem {
Expand Down Expand Up @@ -68,6 +70,21 @@ impl VssItem {
Ok(())
}

pub fn delete_item(
conn: &mut PgConnection,
store_id: &str,
key: &str,
version: i64,
) -> anyhow::Result<()> {
sql_query("SELECT delete_item($1, $2, $3)")
.bind::<Text, _>(store_id)
.bind::<Text, _>(key)
.bind::<BigInt, _>(version)
.execute(conn)?;

Ok(())
}

pub fn list_key_versions(
conn: &mut PgConnection,
store_id: &str,
Expand Down Expand Up @@ -209,6 +226,54 @@ mod test {
clear_database(&state);
}

#[tokio::test]
async fn test_delete() {
let state = init_state();
clear_database(&state);

let store_id = "test_store_id";
let key = "test";
let value = [1, 2, 3];
let version = 0;

let mut conn = state.db_pool.get().unwrap();
VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap();

let versions = VssItem::list_key_versions(&mut conn, store_id, None).unwrap();

assert_eq!(versions.len(), 1);
assert_eq!(versions[0].0, key);
assert_eq!(versions[0].1, version);

// delete item
let new_version = version + 1;
VssItem::delete_item(&mut conn, store_id, key, new_version).unwrap();
let item = VssItem::get_item(&mut conn, store_id, key)
.unwrap()
.unwrap();
assert!(item.value.is_none());
assert!(item.deleted);

// bring item back with higher version

let final_value = [4, 5, 6];
let final_version = new_version + 1;

VssItem::put_item(&mut conn, store_id, key, &final_value, final_version).unwrap();

let item = VssItem::get_item(&mut conn, store_id, key)
.unwrap()
.unwrap();

assert_eq!(item.store_id, store_id);
assert_eq!(item.key, key);
assert_eq!(item.value.unwrap(), final_value);
assert_eq!(item.version, final_version);
assert!(!item.deleted);

clear_database(&state);
}

#[tokio::test]
async fn test_list_key_versions() {
let state = init_state();
Expand Down
1 change: 1 addition & 0 deletions src/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ diesel::table! {
version -> Int8,
created_date -> Timestamp,
updated_date -> Timestamp,
deleted -> Bool,
}
}
40 changes: 40 additions & 0 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,46 @@ pub async fn list_key_versions(
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteObjectRequest {
pub store_id: Option<String>,
pub key: String,
pub version: i64,
}

async fn delete_object_impl(req: DeleteObjectRequest, state: &State) -> anyhow::Result<()> {
let store_id = req.store_id.expect("must have");

let mut conn = state.db_pool.get()?;

VssItem::delete_item(&mut conn, &store_id, &req.key, req.version)?;

Ok(())
}

pub async fn delete_object(
origin: Option<TypedHeader<Origin>>,
auth: Option<TypedHeader<Authorization<Bearer>>>,
Extension(state): Extension<State>,
Json(mut payload): Json<DeleteObjectRequest>,
) -> Result<Json<()>, (StatusCode, String)> {
if !state.self_hosted {
validate_cors(origin)?;
}

let store_id = auth
.map(|TypedHeader(token)| verify_token(token.token(), &state))
.transpose()?
.flatten();

ensure_store_id!(payload, store_id);

match delete_object_impl(payload, &state).await {
Ok(res) => Ok(Json(res)),
Err(e) => Err(handle_anyhow_error("delete_object", e)),
}
}

pub async fn health_check() -> Result<Json<()>, (StatusCode, String)> {
Ok(Json(()))
}
Expand Down

0 comments on commit 0081a69

Please sign in to comment.