Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: synchronize value for synchronized ranges #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 41 additions & 48 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ pub trait Recon: Clone + Send + Sync {
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()>;
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>>;
) -> Result<Vec<(Self::Key, Vec<u8>)>>;

async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>>;
}
Expand All @@ -63,16 +63,18 @@ where
Ok(())
}

async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
Ok(recon::Client::range(self, start, end, offset, limit)
.await?
.collect())
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
Ok(
recon::Client::range_with_values(self, start, end, offset, limit)
.await?
.collect(),
)
}
async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>> {
recon::Client::value_for_key(self, key).await
Expand Down Expand Up @@ -132,13 +134,12 @@ where
Ok(resp)
}

#[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))]
#[instrument(skip(self, _context, event), fields(event.id = event.event_id, event.data.len = event.event_data.len()), ret(level = Level::DEBUG), err(level = Level::ERROR))]
async fn events_post(
&self,
event: Event,
_context: &C,
) -> Result<EventsPostResponse, ApiError> {
debug!(event_id = event.event_id, "events_post");
let event_id = decode_event_id(&event.event_id)?;
let event_data = decode_event_data(&event.event_data)?;
self.model
Expand Down Expand Up @@ -233,33 +234,24 @@ where
.with_not_after(0)
.build();
self.interest
.insert(interest, None)
// We must store a value for the interest otherwise Recon will try forever to
// synchronize the value.
// In the case of interests an empty value is sufficient.
.insert(interest, Some(vec![]))
.await
.map_err(|err| ApiError(format!("failed to update interest: {err}")))?;

let mut events = Vec::new();
for id in self
let events = self
.model
.range(start, stop, offset, limit)
.range_with_values(start, stop, offset, limit)
.await
.map_err(|err| ApiError(format!("failed to get keys: {err}")))?
.into_iter()
{
let event_data = self
.model
.value_for_key(id.clone())
.await
.map_err(|err| ApiError(format!("failed to get event data: {err}")))?;
events.push(Event {
.map(|(id, data)| Event {
event_id: multibase::encode(multibase::Base::Base16Lower, id.as_bytes()),
event_data: multibase::encode(
multibase::Base::Base64,
// Use the empty bytes for keys with no value.
// This way we are explicit there is no value rather that its just missing.
&event_data.unwrap_or_default(),
),
});
}
event_data: multibase::encode(multibase::Base::Base64, data),
})
.collect();
Ok(SubscribeSortKeySortValueGetResponse::Success(events))
}
}
Expand Down Expand Up @@ -291,13 +283,13 @@ mod tests {
mock! {
pub ReconInterestTest {
fn insert(&self, key: Interest, value: Option<Vec<u8>>) -> Result<()>;
fn range(
fn range_with_values(
&self,
start: Interest,
end: Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>>;
) -> Result<Vec<(Interest, Vec<u8>)>>;
}

impl Clone for ReconInterestTest {
Expand All @@ -312,14 +304,14 @@ mod tests {
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
self.range(start, end, offset, limit)
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
self.range_with_values(start, end, offset, limit)
}
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
Expand All @@ -329,13 +321,13 @@ mod tests {
mock! {
pub ReconModelTest {
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<()>;
fn range(
fn range_with_values(
&self,
start: EventId,
end: EventId,
offset: usize,
limit: usize,
) -> Result<Vec<EventId>>;
) -> Result<Vec<(EventId,Vec<u8>)>>;
}
impl Clone for ReconModelTest {
fn clone(&self) -> Self;
Expand All @@ -349,14 +341,14 @@ mod tests {
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<()> {
self.insert(key, value)
}
async fn range(
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
self.range(start, end, offset, limit)
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
self.range_with_values(start, end, offset, limit)
}
async fn value_for_key(&self, _key: Self::Key) -> Result<Option<Vec<u8>>> {
Ok(None)
Expand Down Expand Up @@ -444,9 +436,10 @@ mod tests {
.unwrap(),
)
.build();
let event_data = b"hello world";
let event = models::Event {
event_id: multibase::encode(multibase::Base::Base16Lower, event_id.as_slice()),
event_data: multibase::encode(multibase::Base::Base64, b""),
event_data: multibase::encode(multibase::Base::Base64, event_data),
};
// Setup mock expectations
let mut mock_interest = MockReconInterestTest::new();
Expand All @@ -461,21 +454,21 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
predicate::eq(0),
predicate::eq(usize::MAX),
)
.times(1)
.returning(move |_, _, _, _| Ok(vec![event_id.clone()]));
.returning(move |_, _, _, _| Ok(vec![(event_id.clone(), event_data.into())]));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let resp = server
.subscribe_sort_key_sort_value_get(
Expand Down Expand Up @@ -527,13 +520,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down Expand Up @@ -593,13 +586,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down Expand Up @@ -659,13 +652,13 @@ mod tests {
.with_not_after(0)
.build(),
),
predicate::eq(None),
predicate::eq(Some(vec![])),
)
.times(1)
.returning(|_, _| Ok(()));
let mut mock_model = MockReconModelTest::new();
mock_model
.expect_range()
.expect_range_with_values()
.with(
predicate::eq(start),
predicate::eq(end),
Expand Down
Loading
Loading