Skip to content

Commit

Permalink
Make KeyValueStoreDispatcher async.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Bruijnzeels committed Nov 20, 2023
1 parent e046858 commit 5b7c872
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 117 deletions.
4 changes: 4 additions & 0 deletions src/commons/api/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl ImportCa {
ImportCa { handle, parents, roas }
}

pub fn handle(&self) -> &CaHandle {
&self.handle
}

pub fn unpack(self) -> (CaHandle, Vec<ImportParent>, Vec<RoaConfiguration>) {
(self.handle, self.parents, self.roas)
}
Expand Down
22 changes: 11 additions & 11 deletions src/commons/eventsourcing/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where

let init_command_key = Self::key_for_command(&handle, 0);

if kv.has(&init_command_key)? {
if kv.has(&init_command_key).await? {
// This is no good.. this aggregate already exists.
Ok(Err(A::Error::from(AggregateStoreError::DuplicateAggregate(handle))))
} else {
Expand All @@ -155,7 +155,7 @@ where
let processed_command = processed_command_builder.finish_with_init_event(init_event);

let json = serde_json::to_value(&processed_command)?;
kv.store(&init_command_key, json)?;
kv.store(&init_command_key, json).await?;

let arc = Arc::new(aggregate);

Expand Down Expand Up @@ -235,14 +235,14 @@ where
changed_from_cached = true;

let snapshot_key = Self::key_for_snapshot(handle);
match kv.get(&snapshot_key)? {
match kv.get(&snapshot_key).await? {
Some(value) => {
let agg: A = serde_json::from_value(value)?;
Ok(Arc::new(agg))
}
None => {
let init_key = Self::key_for_command(handle, 0);
match kv.get(&init_key)? {
match kv.get(&init_key).await? {
Some(value) => {
let init_command: StoredCommand<A> = serde_json::from_value(value)?;

Expand Down Expand Up @@ -273,7 +273,7 @@ where
// to mark the aggregate as changed so that the we can update the cache
// later.
let next_command = Self::key_for_command(handle, agg.version());
if kv.has(&next_command)? {
if kv.has(&next_command).await? {
let aggregate = Arc::make_mut(&mut agg);

// check and apply any applicable processed commands until:
Expand All @@ -284,7 +284,7 @@ where

let key = Self::key_for_command(handle, version);

match kv.get(&key)? {
match kv.get(&key).await? {
None => break,
Some(value) => {
let command: StoredCommand<A> = serde_json::from_value(value)?;
Expand Down Expand Up @@ -317,7 +317,7 @@ where
// from. So, exit here, as there is nothing sensible we can do with this error.
//
// See issue: https://github.com/NLnetLabs/krill/issues/322
if kv.has(&command_key)? {
if kv.has(&command_key).await? {
error!("Command key for '{handle}' version '{version}' already exists.");
error!("This is a bug. Please report this issue to [email protected].");
error!("Krill will exit. If this issue repeats, consider removing {}.", handle);
Expand All @@ -333,7 +333,7 @@ where
aggregate.apply_command(processed_command);

changed_from_cached = true;
kv.store(&command_key, json)?;
kv.store(&command_key, json).await?;

Err(e)
}
Expand Down Expand Up @@ -371,7 +371,7 @@ where
} else {
// Save the latest command.
let json = serde_json::to_value(&processed_command)?;
kv.store(&command_key, json)?;
kv.store(&command_key, json).await?;

// Now send the events to the 'post-save' listeners.
if let Some(events) = processed_command.events() {
Expand All @@ -398,7 +398,7 @@ where
if save_snapshot {
let key = Self::key_for_snapshot(handle);
let value = serde_json::to_value(agg.as_ref())?;
kv.store(&key, value)?;
kv.store(&key, value).await?;
}

if let Err(e) = res {
Expand Down Expand Up @@ -546,7 +546,7 @@ where
let scope = Self::scope_for_agg(id);

self.kv
.execute(&Scope::global(), |kv| async move { kv.delete_scope(&scope) })
.execute(&Scope::global(), |kv| async move { kv.delete_scope(&scope).await })
.await?;

self.cache_remove(id).await;
Expand Down
18 changes: 9 additions & 9 deletions src/commons/eventsourcing/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl<T: WalSupport> WalStore<T> {
.execute(&scope, |kv| async move {
let key = Self::key_for_snapshot(handle);
let json = serde_json::to_value(instance.as_ref())?;
kv.store(&key, json)?;
kv.store(&key, json).await?;

self.cache_update(handle, instance.clone());

Expand Down Expand Up @@ -189,7 +189,7 @@ impl<T: WalSupport> WalStore<T> {

self.kv
.execute(&Scope::global(), |kv| async move {
kv.delete_scope(&scope)?;
kv.delete_scope(&scope).await?;
self.cache_remove(handle);
Ok(())
})
Expand Down Expand Up @@ -248,7 +248,7 @@ impl<T: WalSupport> WalStore<T> {

let key = Self::key_for_snapshot(handle);

match kv.get(&key)? {
match kv.get(&key).await? {
Some(value) => {
trace!("Deserializing stored instance for '{handle}'");
let latest: T = serde_json::from_value(value)?;
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<T: WalSupport> WalStore<T> {
let revision = latest_inner.revision();
let key = Self::key_for_wal_set(handle, revision);

if let Some(value) = kv.get(&key)? {
if let Some(value) = kv.get(&key).await? {
let set: WalSet<T> = serde_json::from_value(value)?;
trace!("applying revision '{revision}' to '{handle}'");
latest_inner.apply(set);
Expand Down Expand Up @@ -329,7 +329,7 @@ impl<T: WalSupport> WalStore<T> {

let key_for_wal_set = Self::key_for_wal_set(handle, revision);

if kv.has(&key_for_wal_set)? {
if kv.has(&key_for_wal_set).await? {
error!("Change set for '{handle}' version '{revision}' already exists.");
error!("This is a bug. Please report this issue to [email protected].");
error!("Krill will exit. If this issue repeats, consider removing {}.", handle);
Expand All @@ -340,7 +340,7 @@ impl<T: WalSupport> WalStore<T> {

latest_inner.apply(set);

kv.store(&key_for_wal_set, json)?;
kv.store(&key_for_wal_set, json).await?;
}
}
}
Expand All @@ -355,16 +355,16 @@ impl<T: WalSupport> WalStore<T> {
// Save the latest version as snapshot
let key = Self::key_for_snapshot(handle);
let value = serde_json::to_value(latest.as_ref())?;
kv.store(&key, value)?;
kv.store(&key, value).await?;

// Delete all wal sets (changes), since we are doing
// this inside a transaction or locked scope we can
// assume that all changes were applied, and there
// are no other threads creating additional changes
// that we were not aware of.
for key in kv.list_keys(&Self::scope_for_handle(handle))? {
for key in kv.list_keys(&Self::scope_for_handle(handle)).await? {
if key.name().as_str().starts_with("wal-") {
kv.delete(&key)?;
kv.delete(&key).await?;
}
}
}
Expand Down
Loading

0 comments on commit 5b7c872

Please sign in to comment.