diff --git a/Cargo.lock b/Cargo.lock index e568571d1..280a9fd5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1078,6 +1078,8 @@ dependencies = [ "multibase 0.9.1", "prometheus-client", "recon", + "serde", + "serde_cbor", "swagger", "tokio", "tracing", @@ -1254,7 +1256,6 @@ name = "ceramic-metrics" version = "0.9.0" dependencies = [ "console-subscriber", - "iroh-util", "lazy_static", "libp2p", "names", @@ -1305,6 +1306,7 @@ dependencies = [ "prometheus-client", "recon", "serde", + "serde_cbor", "serde_json", "serde_repr", "signal-hook", @@ -1316,8 +1318,6 @@ dependencies = [ "tokio-prometheus-client", "tokio-util", "tracing", - "tracing-appender", - "tracing-subscriber", ] [[package]] @@ -1348,7 +1348,6 @@ dependencies = [ "iroh-bitswap", "iroh-rpc-client", "iroh-rpc-types", - "iroh-util", "libp2p", "libp2p-identity", "lru 0.10.1", @@ -1895,16 +1894,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "ctrlc" -version = "3.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf" -dependencies = [ - "nix 0.27.1", - "windows-sys", -] - [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -3433,15 +3422,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humansize" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" -dependencies = [ - "libm", -] - [[package]] name = "humantime" version = "1.3.0" @@ -3837,7 +3817,6 @@ dependencies = [ "cid 0.10.1", "futures", "iroh-rpc-types", - "iroh-util", "libp2p", "quic-rpc", "serde", @@ -3866,20 +3845,10 @@ dependencies = [ name = "iroh-util" version = "0.9.0" dependencies = [ - "anyhow", "cid 0.10.1", - "ctrlc", - "dirs-next", - "futures", - "humansize", "nix 0.26.4", - "rlimit", - "serde", - "sysinfo 0.27.8", "temp-env", "testdir", - "thiserror", - "tracing", ] [[package]] @@ -5287,6 +5256,7 @@ version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7005aaf257a59ff4de471a9d5538ec868a21586534fff7f85dd97d4043a6139" dependencies = [ + "half", "minicbor-derive", ] @@ -5635,17 +5605,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.1", - "cfg-if", - "libc", -] - [[package]] name = "nohash-hasher" version = "0.2.0" @@ -7196,15 +7155,6 @@ dependencies = [ "opaque-debug 0.3.0", ] -[[package]] -name = "rlimit" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8a29d87a652dc4d43c586328706bb5cdff211f3f39a530f240b53f7221dab8e" -dependencies = [ - "libc", -] - [[package]] name = "rsa" version = "0.6.1" @@ -8720,21 +8670,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "sysinfo" -version = "0.27.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a902e9050fca0a5d6877550b769abd2bd1ce8c04634b941dbe2809735e1a1e33" -dependencies = [ - "cfg-if", - "core-foundation-sys", - "libc", - "ntapi", - "once_cell", - "rayon", - "winapi", -] - [[package]] name = "system-configuration" version = "0.5.1" @@ -8831,7 +8766,7 @@ dependencies = [ "backtrace", "cargo_metadata", "once_cell", - "sysinfo 0.26.9", + "sysinfo", "whoami", ] @@ -9238,18 +9173,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time 0.3.30", - "tracing-subscriber", -] - [[package]] name = "tracing-attributes" version = "0.1.27" diff --git a/Cargo.toml b/Cargo.toml index b486d107f..69897e609 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,7 +102,7 @@ lru = "0.10" mime = "0.3" mime_classifier = "0.0.1" mime_guess = "2.0.4" -minicbor = { version = "0.19.1", features = ["alloc", "std"] } +minicbor = { version = "0.19.1", features = ["std", "derive", "half"] } mockall = "0.11.4" multiaddr = "0.18" multibase = "0.9" @@ -134,6 +134,7 @@ ruzstd = "0.3" serde = { version = "1.0", features = ["derive"] } serde-error = "0.1.2" serde_bytes = "0.11" +serde_cbor = "0.11.2" serde_ipld_dagcbor = "0.3" serde_json = "1.0.87" serde_qs = "0.10.1" diff --git a/api-server/.openapi-generator/FILES b/api-server/.openapi-generator/FILES index e6a6dfebe..af3db60ef 100644 --- a/api-server/.openapi-generator/FILES +++ b/api-server/.openapi-generator/FILES @@ -4,6 +4,7 @@ Cargo.toml README.md api/openapi.yaml docs/Event.md +docs/Ring.md docs/Version.md docs/default_api.md examples/ca.pem diff --git a/api-server/README.md b/api-server/README.md index c2f0f8d10..29726bfa4 100644 --- a/api-server/README.md +++ b/api-server/README.md @@ -100,6 +100,7 @@ Method | HTTP request | Description ------------- | ------------- | ------------- [****](docs/default_api.md#) | **POST** /events | Creates a new event [****](docs/default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node +[****](docs/default_api.md#) | **POST** /recon | Sends a Recon message [****](docs/default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream [****](docs/default_api.md#) | **POST** /version | Get the version of the Ceramic node @@ -107,6 +108,7 @@ Method | HTTP request | Description ## Documentation For Models - [Event](docs/Event.md) + - [Ring](docs/Ring.md) - [Version](docs/Version.md) diff --git a/api-server/api/openapi.yaml b/api-server/api/openapi.yaml index c52eaab56..442952e0b 100644 --- a/api-server/api/openapi.yaml +++ b/api-server/api/openapi.yaml @@ -96,6 +96,28 @@ paths: "204": description: success summary: Creates a new event + /recon: + post: + parameters: + - description: Recon ring + explode: true + in: query + name: ring + required: true + schema: + $ref: '#/components/schemas/Ring' + style: form + requestBody: + $ref: '#/components/requestBodies/Message' + responses: + "200": + content: + application/cbor-seq: + schema: + format: binary + type: string + description: success + summary: Sends a Recon message components: requestBodies: Event: @@ -105,6 +127,14 @@ components: $ref: '#/components/schemas/Event' description: Event to add to the node required: true + Message: + content: + application/cbor-seq: + schema: + format: byte + type: string + description: Recon message to send + required: true schemas: Version: description: "Version of the Ceramic node in semver format, e.g. 2.1.0" @@ -120,10 +150,15 @@ components: eventId: eventId properties: eventId: - description: Multibase encodeding of event id bytes. + description: Multibase encoding of event id bytes. type: string required: - eventId title: A Ceramic Event type: object + Ring: + enum: + - interest + - model + type: string diff --git a/api-server/docs/Event.md b/api-server/docs/Event.md index e2ce4cdae..2c34f13e5 100644 --- a/api-server/docs/Event.md +++ b/api-server/docs/Event.md @@ -3,7 +3,7 @@ ## Properties Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- -**event_id** | **String** | Multibase encodeding of event id bytes. | +**event_id** | **String** | Multibase encoding of event id bytes. | [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/api-server/docs/Message.md b/api-server/docs/Message.md new file mode 100644 index 000000000..a0c14d177 --- /dev/null +++ b/api-server/docs/Message.md @@ -0,0 +1,10 @@ +# Message + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**msg** | [***swagger::ByteArray**](file.md) | CBOR payload of Recon message | + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/Ring.md b/api-server/docs/Ring.md new file mode 100644 index 000000000..aa6d08181 --- /dev/null +++ b/api-server/docs/Ring.md @@ -0,0 +1,9 @@ +# Ring + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/default_api.md b/api-server/docs/default_api.md index 9ac9ca080..b7083961d 100644 --- a/api-server/docs/default_api.md +++ b/api-server/docs/default_api.md @@ -6,6 +6,7 @@ Method | HTTP request | Description ------------- | ------------- | ------------- ****](default_api.md#) | **POST** /events | Creates a new event ****](default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node +****](default_api.md#) | **POST** /recon | Sends a Recon message ****](default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream ****](default_api.md#) | **POST** /version | Get the version of the Ceramic node @@ -57,6 +58,32 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) +# **** +> swagger::ByteArray (ring, body) +Sends a Recon message + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **ring** | [****](.md)| Recon ring | + **body** | **swagger::ByteArray**| Recon message to send | + +### Return type + +[**swagger::ByteArray**](file.md) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: application/cbor-seq + - **Accept**: application/cbor-seq + +[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) + # **** > Vec (sort_key, sort_value, optional) Get events for a stream diff --git a/api-server/examples/client/main.rs b/api-server/examples/client/main.rs index b5598ba86..e2ecf099d 100644 --- a/api-server/examples/client/main.rs +++ b/api-server/examples/client/main.rs @@ -3,7 +3,7 @@ #[allow(unused_imports)] use ceramic_api_server::{ models, Api, ApiNoContext, Client, ContextWrapperExt, EventsPostResponse, LivenessGetResponse, - SubscribeSortKeySortValueGetResponse, VersionPostResponse, + ReconPostResponse, SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use clap::{App, Arg}; #[allow(unused_imports)] @@ -103,6 +103,15 @@ fn main() { (client.context() as &dyn Has).get().clone() ); } + /* Disabled because there's no example. + Some("ReconPost") => { + let result = rt.block_on(client.recon_post( + ???, + swagger::ByteArray(Vec::from("BYTE_ARRAY_DATA_HERE")) + )); + info!("{:?} (X-Span-ID: {:?})", result, (client.context() as &dyn Has).get().clone()); + }, + */ Some("SubscribeSortKeySortValueGet") => { let result = rt.block_on(client.subscribe_sort_key_sort_value_get( "sort_key_example".to_string(), diff --git a/api-server/examples/server/server.rs b/api-server/examples/server/server.rs index 9378df25a..d191c1bb5 100644 --- a/api-server/examples/server/server.rs +++ b/api-server/examples/server/server.rs @@ -101,8 +101,8 @@ impl Server { use ceramic_api_server::server::MakeService; use ceramic_api_server::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use std::error::Error; use swagger::ApiError; @@ -134,6 +134,23 @@ where Err(ApiError("Generic failure".into())) } + /// Sends a Recon message + async fn recon_post( + &self, + ring: models::Ring, + body: swagger::ByteArray, + context: &C, + ) -> Result { + let context = context.clone(); + info!( + "recon_post({:?}, {:?}) - X-Span-ID: {:?}", + ring, + body, + context.get().0.clone() + ); + Err(ApiError("Generic failure".into())) + } + /// Get events for a stream async fn subscribe_sort_key_sort_value_get( &self, diff --git a/api-server/src/client/mod.rs b/api-server/src/client/mod.rs index 375e33c3b..3067a084b 100644 --- a/api-server/src/client/mod.rs +++ b/api-server/src/client/mod.rs @@ -42,8 +42,8 @@ const FRAGMENT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS const ID_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'|'); use crate::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; /// Convert input into a base path, e.g. "http://example:123". Also checks the scheme as it goes. @@ -547,6 +547,104 @@ where } } + async fn recon_post( + &self, + param_ring: models::Ring, + param_body: swagger::ByteArray, + context: &C, + ) -> Result { + let mut client_service = self.client_service.clone(); + let mut uri = format!("{}/ceramic/recon", self.base_path); + + // Query parameters + let query_string = { + let mut query_string = form_urlencoded::Serializer::new("".to_owned()); + query_string.append_pair("ring", ¶m_ring.to_string()); + query_string.finish() + }; + if !query_string.is_empty() { + uri += "?"; + uri += &query_string; + } + + let uri = match Uri::from_str(&uri) { + Ok(uri) => uri, + Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), + }; + + let mut request = match Request::builder() + .method("POST") + .uri(uri) + .body(Body::empty()) + { + Ok(req) => req, + Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), + }; + + let body = param_body.0; + *request.body_mut() = Body::from(body); + + let header = "application/cbor-seq"; + request.headers_mut().insert( + CONTENT_TYPE, + match HeaderValue::from_str(header) { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create header: {} - {}", + header, e + ))) + } + }, + ); + let header = HeaderValue::from_str(Has::::get(context).0.as_str()); + request.headers_mut().insert( + HeaderName::from_static("x-span-id"), + match header { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create X-Span ID header value: {}", + e + ))) + } + }, + ); + + let response = client_service + .call((request, context.clone())) + .map_err(|e| ApiError(format!("No response received: {}", e))) + .await?; + + match response.status().as_u16() { + 200 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = swagger::ByteArray(body.to_vec()); + Ok(ReconPostResponse::Success(body)) + } + code => { + let headers = response.headers().clone(); + let body = response.into_body().take(100).into_raw().await; + Err(ApiError(format!( + "Unexpected response code {}:\n{:?}\n\n{}", + code, + headers, + match body { + Ok(body) => match String::from_utf8(body) { + Ok(body) => body, + Err(e) => format!("", e), + }, + Err(e) => format!("", e), + } + ))) + } + } + } + async fn subscribe_sort_key_sort_value_get( &self, param_sort_key: String, diff --git a/api-server/src/lib.rs b/api-server/src/lib.rs index 1a98b208d..6bd154470 100644 --- a/api-server/src/lib.rs +++ b/api-server/src/lib.rs @@ -34,6 +34,12 @@ pub enum LivenessGetResponse { Success, } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum ReconPostResponse { + /// success + Success(swagger::ByteArray), +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum SubscribeSortKeySortValueGetResponse { /// success @@ -67,6 +73,14 @@ pub trait Api { /// Test the liveness of the Ceramic node async fn liveness_get(&self, context: &C) -> Result; + /// Sends a Recon message + async fn recon_post( + &self, + ring: models::Ring, + body: swagger::ByteArray, + context: &C, + ) -> Result; + /// Get events for a stream async fn subscribe_sort_key_sort_value_get( &self, @@ -100,6 +114,13 @@ pub trait ApiNoContext { /// Test the liveness of the Ceramic node async fn liveness_get(&self) -> Result; + /// Sends a Recon message + async fn recon_post( + &self, + ring: models::Ring, + body: swagger::ByteArray, + ) -> Result; + /// Get events for a stream async fn subscribe_sort_key_sort_value_get( &self, @@ -152,6 +173,16 @@ impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for Contex self.api().liveness_get(&context).await } + /// Sends a Recon message + async fn recon_post( + &self, + ring: models::Ring, + body: swagger::ByteArray, + ) -> Result { + let context = self.context().clone(); + self.api().recon_post(ring, body, &context).await + } + /// Get events for a stream async fn subscribe_sort_key_sort_value_get( &self, diff --git a/api-server/src/models.rs b/api-server/src/models.rs index 9d2158671..81613d242 100644 --- a/api-server/src/models.rs +++ b/api-server/src/models.rs @@ -9,7 +9,7 @@ use crate::models; #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] pub struct Event { - /// Multibase encodeding of event id bytes. + /// Multibase encoding of event id bytes. #[serde(rename = "eventId")] pub event_id: String, } @@ -136,6 +136,43 @@ impl std::convert::TryFrom for header::IntoHeaderVal } } +/// Enumeration of values. +/// Since this enum's variants do not hold data, we can easily define them as `#[repr(C)]` +/// which helps with FFI. +#[allow(non_camel_case_types)] +#[repr(C)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize, +)] +#[cfg_attr(feature = "conversion", derive(frunk_enum_derive::LabelledGenericEnum))] +pub enum Ring { + #[serde(rename = "interest")] + Interest, + #[serde(rename = "model")] + Model, +} + +impl std::fmt::Display for Ring { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + Ring::Interest => write!(f, "interest"), + Ring::Model => write!(f, "model"), + } + } +} + +impl std::str::FromStr for Ring { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s { + "interest" => std::result::Result::Ok(Ring::Interest), + "model" => std::result::Result::Ok(Ring::Model), + _ => std::result::Result::Err(format!("Value not valid: {}", s)), + } + } +} + /// Version of the Ceramic node in semver format, e.g. 2.1.0 #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] diff --git a/api-server/src/server/mod.rs b/api-server/src/server/mod.rs index 7da330f56..e8a27fdf7 100644 --- a/api-server/src/server/mod.rs +++ b/api-server/src/server/mod.rs @@ -22,8 +22,8 @@ pub use crate::context; type ServiceFuture = BoxFuture<'static, Result, crate::ServiceError>>; use crate::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; mod paths { @@ -33,6 +33,7 @@ mod paths { pub static ref GLOBAL_REGEX_SET: regex::RegexSet = regex::RegexSet::new(vec![ r"^/ceramic/events$", r"^/ceramic/liveness$", + r"^/ceramic/recon$", r"^/ceramic/subscribe/(?P[^/?#]*)/(?P[^/?#]*)$", r"^/ceramic/version$" ]) @@ -40,7 +41,8 @@ mod paths { } pub(crate) static ID_EVENTS: usize = 0; pub(crate) static ID_LIVENESS: usize = 1; - pub(crate) static ID_SUBSCRIBE_SORT_KEY_SORT_VALUE: usize = 2; + pub(crate) static ID_RECON: usize = 2; + pub(crate) static ID_SUBSCRIBE_SORT_KEY_SORT_VALUE: usize = 3; lazy_static! { pub static ref REGEX_SUBSCRIBE_SORT_KEY_SORT_VALUE: regex::Regex = #[allow(clippy::invalid_regex)] @@ -49,7 +51,7 @@ mod paths { ) .expect("Unable to create regex for SUBSCRIBE_SORT_KEY_SORT_VALUE"); } - pub(crate) static ID_VERSION: usize = 3; + pub(crate) static ID_VERSION: usize = 4; } pub struct MakeService @@ -269,6 +271,100 @@ where Ok(response) } + // ReconPost - POST /recon + hyper::Method::POST if path.matched(paths::ID_RECON) => { + // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) + let query_params = + form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) + .collect::>(); + let param_ring = query_params + .iter() + .filter(|e| e.0 == "ring") + .map(|e| e.1.clone()) + .next(); + let param_ring = match param_ring { + Some(param_ring) => { + let param_ring = + ::from_str(¶m_ring); + match param_ring { + Ok(param_ring) => Some(param_ring), + Err(e) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't parse query parameter ring - doesn't match schema: {}", e))) + .expect("Unable to create Bad Request response for invalid query parameter ring")), + } + } + None => None, + }; + let param_ring = match param_ring { + Some(param_ring) => param_ring, + None => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Missing required query parameter ring")) + .expect("Unable to create Bad Request response for missing query parameter ring")), + }; + + // Body parameters (note that non-required body parameters will ignore garbage + // values, rather than causing a 400 response). Produce warning header and logs for + // any unused fields. + let result = body.into_raw().await; + match result { + Ok(body) => { + let param_body: Option = if !body.is_empty() { + Some(swagger::ByteArray(body.to_vec())) + } else { + None + }; + let param_body = match param_body { + Some(param_body) => param_body, + None => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Missing required body parameter body")) + .expect("Unable to create Bad Request response for missing body parameter body")), + }; + + let result = api_impl.recon_post( + param_ring, + param_body, + &context + ).await; + let mut response = Response::new(Body::empty()); + response.headers_mut().insert( + HeaderName::from_static("x-span-id"), + HeaderValue::from_str((&context as &dyn Has).get().0.clone().as_str()) + .expect("Unable to create X-Span-ID header value")); + + match result { + Ok(rsp) => match rsp { + ReconPostResponse::Success + (body) + => { + *response.status_mut() = StatusCode::from_u16(200).expect("Unable to turn 200 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/cbor-seq") + .expect("Unable to create Content-Type header for RECON_POST_SUCCESS")); + let body = body.0; + *response.body_mut() = Body::from(body); + }, + }, + Err(_) => { + // Application code returned an error. This should not happen, as the implementation should + // return a valid response. + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *response.body_mut() = Body::from("An internal error occurred"); + }, + } + + Ok(response) + }, + Err(e) => Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't read body parameter body: {}", e))) + .expect("Unable to create Bad Request response due to unable to read body parameter body")), + } + } + // SubscribeSortKeySortValueGet - GET /subscribe/{sort_key}/{sort_value} hyper::Method::GET if path.matched(paths::ID_SUBSCRIBE_SORT_KEY_SORT_VALUE) => { // Path parameters @@ -479,6 +575,7 @@ where _ if path.matched(paths::ID_EVENTS) => method_not_allowed(), _ if path.matched(paths::ID_LIVENESS) => method_not_allowed(), + _ if path.matched(paths::ID_RECON) => method_not_allowed(), _ if path.matched(paths::ID_SUBSCRIBE_SORT_KEY_SORT_VALUE) => method_not_allowed(), _ if path.matched(paths::ID_VERSION) => method_not_allowed(), _ => Ok(Response::builder() @@ -501,6 +598,8 @@ impl RequestParser for ApiRequestParser { hyper::Method::POST if path.matched(paths::ID_EVENTS) => Some("EventsPost"), // LivenessGet - GET /liveness hyper::Method::GET if path.matched(paths::ID_LIVENESS) => Some("LivenessGet"), + // ReconPost - POST /recon + hyper::Method::POST if path.matched(paths::ID_RECON) => Some("ReconPost"), // SubscribeSortKeySortValueGet - GET /subscribe/{sort_key}/{sort_value} hyper::Method::GET if path.matched(paths::ID_SUBSCRIBE_SORT_KEY_SORT_VALUE) => { Some("SubscribeSortKeySortValueGet") diff --git a/api/Cargo.toml b/api/Cargo.toml index 83d8c64d1..903b3956d 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -19,6 +19,8 @@ futures.workspace = true hyper.workspace = true multibase.workspace = true recon.workspace = true +serde.workspace = true +serde_cbor.workspace = true swagger.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/api/ceramic.yaml b/api/ceramic.yaml index ae0661342..dfa4fa4bb 100644 --- a/api/ceramic.yaml +++ b/api/ceramic.yaml @@ -88,6 +88,26 @@ paths: responses: '204': description: success + /recon: + post: + parameters: + - name: ring + in: query + description: Recon ring + required: true + schema: + $ref: '#/components/schemas/Ring' + requestBody: + $ref: '#/components/requestBodies/Message' + responses: + '200': + description: success + content: + application/cbor-seq: + schema: + format: binary + type: string + summary: Sends a Recon message components: requestBodies: @@ -98,6 +118,14 @@ components: $ref: '#/components/schemas/Event' description: Event to add to the node required: true + Message: + content: + application/cbor-seq: + schema: + format: byte + type: string + description: Recon message to send + required: true schemas: Version: description: Version of the Ceramic node in semver format, e.g. 2.1.0 @@ -114,4 +142,9 @@ components: properties: eventId: type: string - description: Multibase encodeding of event id bytes. + description: Multibase encoding of event id bytes. + Ring: + type: string + enum: + - interest + - model diff --git a/api/src/metrics/api.rs b/api/src/metrics/api.rs index 0b05616f0..ded9fcb74 100644 --- a/api/src/metrics/api.rs +++ b/api/src/metrics/api.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; use ceramic_api_server::{ - models, Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + models, Api, EventsPostResponse, LivenessGetResponse, ReconPostResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use ceramic_metrics::Recorder; use futures::Future; -use swagger::ApiError; +use swagger::{ApiError, ByteArray}; use tokio::time::Instant; use crate::{metrics::Event, Metrics}; @@ -82,4 +82,14 @@ where self.record("/version", self.api.version_post(context)) .await } + + async fn recon_post( + &self, + ring: models::Ring, + body: ByteArray, + context: &C, + ) -> std::result::Result { + self.record("/recon", self.api.recon_post(ring, body, context)) + .await + } } diff --git a/api/src/server.rs b/api/src/server.rs index a15698425..de660438b 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -19,21 +19,24 @@ use async_trait::async_trait; use futures::{future, Stream, StreamExt, TryFutureExt, TryStreamExt}; use hyper::service::Service; use hyper::{server::conn::Http, Request}; -use recon::{AssociativeHash, InterestProvider, Key, Store}; -use swagger::{EmptyContext, XSpanIdString}; +use recon::{AssociativeHash, InterestProvider, Key, Message, Response, Store}; +use serde::{Deserialize, Serialize}; +use swagger::{ByteArray, EmptyContext, XSpanIdString}; use tokio::net::TcpListener; use tracing::{debug, info, instrument, Level}; use ceramic_api_server::{ models::{self, Event}, - EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + EventsPostResponse, LivenessGetResponse, ReconPostResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use ceramic_core::{EventId, Interest, Network, PeerId, StreamId}; #[async_trait] pub trait Recon: Clone + Send + Sync { type Key: Key; + type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>; + async fn insert(&self, key: Self::Key) -> Result<()>; async fn range( &self, @@ -42,15 +45,21 @@ pub trait Recon: Clone + Send + Sync { offset: usize, limit: usize, ) -> Result>; + + async fn process_messages( + &self, + received: Vec>, + ) -> Result>; } #[async_trait] impl Recon for recon::Client where K: Key, - H: AssociativeHash, + H: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, { type Key = K; + type Hash = H; async fn insert(&self, key: Self::Key) -> Result<()> { let _ = recon::Client::insert(self, key).await?; @@ -68,6 +77,13 @@ where .await? .collect()) } + + async fn process_messages( + &self, + received: Vec>, + ) -> Result> { + recon::Client::process_messages(self, received).await + } } #[derive(Clone)] @@ -138,6 +154,43 @@ where Ok(EventsPostResponse::Success) } + async fn recon_post( + &self, + ring: models::Ring, + body: ByteArray, + _context: &C, + ) -> std::result::Result { + let response = match ring { + models::Ring::Interest => { + let interest_messages: Vec::Key, ::Hash>> = + serde_cbor::de::from_slice(body.as_slice()).map_err(|e| { + ApiError(format!("failed to deserialize interest messages: {e}")) + })?; + let interest_response = self + .interest + .process_messages(interest_messages) + .await + .map_err(|e| ApiError(format!("failed to process interest messages: {e}")))?; + serde_cbor::to_vec(&interest_response.into_messages()) + .map_err(|e| ApiError(format!("failed to serialize interest response: {e}")))? + } + models::Ring::Model => { + let model_messages: Vec::Key, ::Hash>> = + serde_cbor::de::from_slice(body.as_slice()).map_err(|e| { + ApiError(format!("failed to deserialize model messages: {e}")) + })?; + let model_response = self + .model + .process_messages(model_messages) + .await + .map_err(|e| ApiError(format!("failed to process model messages: {e}")))?; + serde_cbor::to_vec(&model_response.into_messages()) + .map_err(|e| ApiError(format!("failed to serialize model response: {e}")))? + } + }; + Ok(ReconPostResponse::Success(swagger::ByteArray(response))) + } + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn subscribe_sort_key_sort_value_get( &self, @@ -255,6 +308,7 @@ mod tests { use expect_test::expect; use mockall::{mock, predicate}; use multibase::Base; + use recon::Sha256a; use tracing_test::traced_test; struct Context; @@ -278,6 +332,7 @@ mod tests { #[async_trait] impl Recon for MockReconInterestTest { type Key = Interest; + type Hash = Sha256a; async fn insert(&self, key: Self::Key) -> Result<()> { self.insert(key) } @@ -290,6 +345,13 @@ mod tests { ) -> Result> { self.range(start, end, offset, limit) } + async fn process_messages( + &self, + received: Vec>, + ) -> Result> { + let _ = received; + todo!("not implemented") + } } mock! { @@ -311,6 +373,7 @@ mod tests { #[async_trait] impl Recon for MockReconModelTest { type Key = EventId; + type Hash = Sha256a; async fn insert(&self, key: Self::Key) -> Result<()> { self.insert(key) } @@ -323,6 +386,13 @@ mod tests { ) -> Result> { self.range(start, end, offset, limit) } + async fn process_messages( + &self, + received: Vec>, + ) -> Result> { + let _ = received; + todo!("not implemented") + } } #[tokio::test] diff --git a/beetle/iroh-rpc-client/Cargo.toml b/beetle/iroh-rpc-client/Cargo.toml index dc42df144..6ce259ae0 100644 --- a/beetle/iroh-rpc-client/Cargo.toml +++ b/beetle/iroh-rpc-client/Cargo.toml @@ -16,7 +16,6 @@ cid.workspace = true ceramic-core.workspace = true futures.workspace = true iroh-rpc-types.workspace = true -iroh-util.workspace = true libp2p = { workspace = true, features = ["gossipsub"] } quic-rpc = { workspace = true, features = ["http2"] } serde = { workspace = true, features = ["derive"] } diff --git a/beetle/iroh-util/Cargo.toml b/beetle/iroh-util/Cargo.toml index 71cdfadd2..d6debb0a4 100644 --- a/beetle/iroh-util/Cargo.toml +++ b/beetle/iroh-util/Cargo.toml @@ -9,17 +9,7 @@ repository.workspace = true publish = false [dependencies] -anyhow.workspace = true cid.workspace = true -ctrlc.workspace = true -dirs-next.workspace = true -futures.workspace = true -humansize.workspace = true -rlimit.workspace = true -serde = { workspace = true, features = ["derive"] } -sysinfo.workspace = true -thiserror.workspace = true -tracing.workspace = true [dev-dependencies] temp-env.workspace = true diff --git a/core/src/event_id.rs b/core/src/event_id.rs index 196255c02..f11686ce8 100644 --- a/core/src/event_id.rs +++ b/core/src/event_id.rs @@ -37,7 +37,25 @@ impl EventId { pub fn builder() -> Builder { Builder { state: Init } } - /// EventId.new builds a Vec with the event id data. + + /// EventId.new builds a Vec with the event id data. + /// ## Example + /// ``` + /// use std::str::FromStr; + /// use cid::Cid; + /// use ceramic_core::{EventId, Network}; + /// + /// let event = EventId::new( + /// &Network::Mainnet, + /// "model", + /// "kh4q0ozorrgaq2mezktnrmdwleo1d", + /// "did:key:z6MkgSV3tAuw7gUWqKCUY7ae6uWNxqYgdwPhUJbJhF9EFXm9", + /// &Cid::from_str("bagcqceraplay4erv6l32qrki522uhiz7rf46xccwniw7ypmvs3cvu2b3oulq").unwrap(), + /// 1, + /// &Cid::from_str("bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy").unwrap(), + /// ); + /// + /// ``` pub fn new( network: &Network, sort_key: &str, @@ -222,10 +240,10 @@ impl Builder { pub fn with_network(self, network: &Network) -> Builder { // Maximum EventId size is 72. // - // varint(0xce) + // streamid, 1 byte - // varint(0x05) + // cip-124 EventID, 1 byte - // varint(networkId), // 5 bytes for local network - //! last8Bytes(sha256(separator_key + "|" + separator_value)), // 16 bytes + // varint(0xce) + // streamid, 2 bytes b'\xce\x01' + // varint(0x05) + // cip-124 EventID, 1 byte b'\x05' + // varint(networkId), // 1-5 bytes for local network + // last8Bytes(sha256(separator_key + "|" + separator_value)), // 16 bytes // last8Bytes(sha256(stream_controller_DID)), // 8 bytes // last4Bytes(init_event_CID) // 4 bytes // cbor(eventHeight), // u64_max 9 bytes diff --git a/cspell.json b/cspell.json index 50688bfdb..0261ec03c 100644 --- a/cspell.json +++ b/cspell.json @@ -11,6 +11,7 @@ "addrs", "ahash", "ahashs", + "appender", "autonat", "behaviour", "bitswap", @@ -21,6 +22,7 @@ "cids", "cidv1", "codespan", + "consts", "dagcbor", "Dcutr", "Deque", @@ -36,6 +38,7 @@ "Kademlia", "kbucket", "kbuckets", + "Keccak", "Keychain", "Keypair", "kubo", diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 0acc6a966..2c85a18a4 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -9,7 +9,6 @@ publish = false [dependencies] console-subscriber = { workspace = true, optional = true } -iroh-util.workspace = true lazy_static.workspace = true names.workspace = true opentelemetry = { workspace = true, features = ["rt-tokio"] } diff --git a/one/Cargo.toml b/one/Cargo.toml index 51504a456..ae857e206 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -39,6 +39,7 @@ names.workspace = true prometheus-client.workspace = true recon.workspace = true serde = "1" +serde_cbor.workspace = true serde_json = "1" serde_repr = "0.1" signal-hook = "0.3.17" @@ -49,8 +50,6 @@ tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" tokio-util.workspace = true tokio.workspace = true -tracing-appender = "0.2.2" -tracing-subscriber.workspace = true tracing.workspace = true diff --git a/one/src/lib.rs b/one/src/lib.rs index 301ea1025..7a3c2119a 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -6,6 +6,7 @@ mod http; mod metrics; mod network; mod pubsub; +mod recon_loop; mod sql; use std::{env, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; @@ -208,6 +209,10 @@ struct DaemonOpts { env = "CERAMIC_ONE_KADEMLIA_PROVIDER_RECORD_TTL_SECS" )] kademlia_provider_record_ttl_secs: u64, + + /// When true Recon http will be used to synchronized events with peers. + #[arg(long, default_value_t = false, env = "CERAMIC_ONE_RECON_HTTP")] + recon_http: bool, } #[derive(ValueEnum, Debug, Clone, Default)] @@ -306,10 +311,9 @@ type ModelInterest = ReconInterestProvider; type ReconModel = Server; struct Daemon { + opts: DaemonOpts, peer_id: PeerId, network: ceramic_core::Network, - bind_address: String, - metrics_bind_address: String, ipfs: Ipfs, metrics_handle: MetricsHandle, metrics: Arc, @@ -369,7 +373,7 @@ impl Daemon { // 1 path from options // 2 path $HOME/.ceramic-one // 3 pwd/.ceramic-one - let dir = match opts.store_dir { + let dir = match opts.store_dir.clone() { Some(dir) => dir, None => match home::home_dir() { Some(home_dir) => home_dir.join(".ceramic-one"), @@ -478,10 +482,9 @@ impl Daemon { .await?; Ok(Daemon { + opts, peer_id, network, - bind_address: opts.bind_address, - metrics_bind_address: opts.metrics_bind_address, ipfs, metrics_handle, metrics, @@ -493,11 +496,11 @@ impl Daemon { async fn run(mut self) -> Result<()> { // Start metrics server debug!( - bind_address = self.metrics_bind_address, + bind_address = self.opts.metrics_bind_address, "starting prometheus metrics server" ); let (tx_metrics_server_shutdown, metrics_server_handle) = - metrics::start(&self.metrics_bind_address.parse()?); + metrics::start(&self.opts.metrics_bind_address.parse()?); // Build HTTP server let network = self.network.clone(); @@ -536,6 +539,13 @@ impl Daemon { ); // Start recon tasks + let recon_event_loop_handle = match self.opts.recon_http { + true => Some(tokio::spawn(recon_loop::recon_event_loop( + self.recon_interest.client(), + self.recon_model.client(), + ))), + false => None, + }; let recon_interest_handle = tokio::spawn(self.recon_interest.run()); let recon_model_handle = tokio::spawn(self.recon_model.run()); @@ -549,7 +559,7 @@ impl Daemon { // The server task blocks until we are ready to start shutdown debug!("starting api server"); - hyper::server::Server::bind(&self.bind_address.parse()?) + hyper::server::Server::bind(&self.opts.bind_address.parse()?) .serve(service) .with_graceful_shutdown(async { rx.await.ok(); @@ -572,6 +582,12 @@ impl Daemon { warn!(%err, "recon interest task error"); } debug!("recon interests server stopped"); + if let Some(handle) = recon_event_loop_handle { + if let Err(err) = handle.await { + warn!(%err, "recon event loop error"); + } + debug!("recon event loop stopped"); + } // Shutdown metrics server and collection handler tx_metrics_server_shutdown diff --git a/one/src/recon_loop.rs b/one/src/recon_loop.rs new file mode 100644 index 000000000..8598b738d --- /dev/null +++ b/one/src/recon_loop.rs @@ -0,0 +1,182 @@ +//! recon_event_loop synchronizes interests and models +//! To generate events for the loop to synchronize, use the following Ceramic Pocket Knife (cpk) command: +//! ```bash +//! model=k2t6wz4ylx0qnsxc7rroideki5ea96hy0m99gt9bhqhpjfnt7n5bydluk9l29x +//! controller=did:key:z5XyAyVKJpA2G4c1R52U4JBDSJX6qvBMHaseQVbbkLuYR +//! count=32000 +//! +//! # echo "Resetting dbs..." +//! # rm peer_1/db.sqlite3 +//! # rm peer_2/db.sqlite3 +//! +//! cpk sql-db-generate \ +//! --sort-key model \ +//! --sort-value $model \ +//! --controller $controller \ +//! --count $count \ +//! --path peer_1/db.sqlite3 +//! +//! cpk sql-db-generate \ +//! --sort-key model \ +//! --sort-value $model \ +//! --controller $controller \ +//! --count $count \ +//! --path peer_2/db.sqlite3 +//! +//! echo "Starting peer 1" +//! RUST_LOG=info,ceramic_p2p=debug,ceramic_api=debug,recon::libp2p=debug \ +//! CERAMIC_ONE_NETWORK=in-memory \ +//! CERAMIC_ONE_BIND_ADDRESS=127.0.0.1:5001 \ +//! CERAMIC_ONE_PEER_PORT=5002 \ +//! CERAMIC_ONE_RECON_HTTP=true \ +//! CERAMIC_ONE_STORE_DIR=peer_1 \ +//! CERAMIC_ONE_METRICS_BIND_ADDRESS=127.0.0.1:9091 +//! CERAMIC_ONE_SWARM_ADDRESSES /ip4/0.0.0.0/tcp/0 \ +//! cargo run --release -p ceramic-one -- daemon +//! +//! echo "Starting peer 2" +//! RUST_LOG=info,ceramic_p2p=debug,ceramic_api=debug,recon::libp2p=debug \ +//! CERAMIC_ONE_NETWORK=in-memory \ +//! CERAMIC_ONE_BIND_ADDRESS=127.0.0.1:5002 \ +//! CERAMIC_ONE_PEER_PORT=5001 \ +//! CERAMIC_ONE_RECON_HTTP=true \ +//! CERAMIC_ONE_STORE_DIR=peer_2 \ +//! CERAMIC_ONE_METRICS_BIND_ADDRESS=127.0.0.1:9092 +//! CERAMIC_ONE_SWARM_ADDRESSES /ip4/0.0.0.0/tcp/0 \ +//! cargo run --release -p ceramic-one -- daemon +//! +//! echo "Subscribing to model..." +//! echo "curl -s \"http://localhost:5001/ceramic/subscribe/model/$model?controller=$controller&limit=10\"" +//! curl -s "http://localhost:5001/ceramic/subscribe/model/$model?controller=$controller&limit=10" +//! curl -s "http://localhost:5002/ceramic/subscribe/model/$model?controller=$controller&limit=10" +//! ``` +use anyhow::{anyhow, Result}; +use ceramic_core::{EventId, Interest}; +use hyper::client::HttpConnector; +use hyper::{Method, Request}; +use recon::libp2p::Config; +use recon::{Client, Sha256a}; +use std::time::Instant; +use std::{env, time::Duration}; +use tracing::info; + +type ReconInterestClient = Client; +type ReconModelClient = Client; +type ReconInterestMessage = recon::Message; +type ReconModelMessage = recon::Message; + +struct Peer { + last_sync: Option, + ip: String, + port: u16, + config: Config, +} + +pub async fn recon_event_loop( + interest_client: ReconInterestClient, + model_client: ReconModelClient, +) -> Result<()> { + info!("Started Recon event loop!"); + // TODO: Peer management + let peer_port = match env::var_os("CERAMIC_ONE_PEER_PORT") { + Some(v) => v.into_string().unwrap(), + None => panic!("Peer address is not set"), + }; + let mut peers = vec![Peer { + ip: "127.0.0.1".to_owned(), + port: peer_port.parse()?, + last_sync: None, + config: Config::default(), + }]; + let client = hyper::Client::new(); + loop { + // Check each peer and start synchronization as needed. + for peer in &mut peers { + let should_sync = if let Some(last_sync) = &peer.last_sync { + last_sync.elapsed() > peer.config.per_peer_sync_timeout + } else { + true + }; + if should_sync { + match recon_sync_interests(peer, &interest_client, &client).await { + Ok(interest_messages) => { + info!( + "{}:{} interest messages: {:?}", + peer.ip, peer.port, interest_messages + ) + } + Err(err) => info!(?err, "failed to connect to peer {}:{}", peer.ip, peer.port), + } + match recon_sync_models(peer, &model_client, &client).await { + Ok(model_messages) => info!( + "{}:{} model messages: {:?}", + peer.ip, peer.port, model_messages + ), + Err(err) => info!(?err, "failed to connect to peer {}:{}", peer.ip, peer.port), + } + } + } + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +async fn recon_sync_interests( + peer: &mut Peer, + interest_client: &ReconInterestClient, + http_client: &hyper::Client, +) -> Result> { + // TODO: This is an url injection vulnerability, validate the IP address first. + let peer_addr = format!( + "http://{}:{}/ceramic/recon?ring=interest", + peer.ip, peer.port + ); + let messages_to_send = interest_client.initial_messages().await?; + let bytes_to_send: Vec = serde_cbor::to_vec(&messages_to_send)?; + // TODO: Support TLS certs for peers + let mut received_message = http_client + .request( + Request::builder() + .method(Method::POST) + .uri(peer_addr.as_str()) + .body(bytes_to_send.into())?, + ) + .await?; + let received_bytes = hyper::body::to_bytes(received_message.body_mut()).await?; + serde_cbor::from_slice(received_bytes.as_ref()) + .map_err(|e| anyhow!("failed to deserialize interest messages: {}", e)) +} + +async fn recon_sync_models( + peer: &mut Peer, + model_client: &ReconModelClient, + http_client: &hyper::Client, +) -> Result> { + // TODO: This is an url injection vulnerability, validate the IP address first. + let peer_addr = format!("http://{}:{}/ceramic/recon?ring=model", peer.ip, peer.port); + let mut messages_to_send = model_client.initial_messages().await?; + loop { + // Serialize messages to send + let bytes_to_send: Vec = serde_cbor::to_vec(&messages_to_send)?; + // TODO: Support TLS certs for peers + let mut received_message = http_client + .request( + Request::builder() + .method(Method::POST) + .uri(peer_addr.as_str()) + .body(bytes_to_send.into())?, + ) + .await?; + let received_bytes = hyper::body::to_bytes(received_message.body_mut()).await?; + // Deserialize received messages + let received_messages: Vec = + serde_cbor::from_slice(received_bytes.as_ref()) + .map_err(|e| anyhow!("failed to deserialize model messages: {}", e))?; + let process_response = model_client.process_messages(received_messages).await?; + if process_response.is_synchronized() { + peer.last_sync = Some(Instant::now()); + break; + } + messages_to_send = process_response.into_messages(); + } + Ok(vec![]) +} diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 2d9904a07..e2e6ba02b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -32,7 +32,6 @@ hex.workspace = true iroh-bitswap.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true -iroh-util.workspace = true libp2p-identity.workspace = true lru.workspace = true multihash.workspace = true diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 486ea151c..04f862a98 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -16,6 +16,6 @@ pub use self::metrics::Metrics; pub use self::node::*; pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream}; pub use libp2p::PeerId; -pub use sqliteblockstore::SQLiteBlockStore; +pub use sqliteblockstore::{SQLiteBlock, SQLiteBlockStore}; pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/p2p/src/sqliteblockstore.rs b/p2p/src/sqliteblockstore.rs index 609419e17..7964f4e72 100644 --- a/p2p/src/sqliteblockstore.rs +++ b/p2p/src/sqliteblockstore.rs @@ -6,15 +6,22 @@ use cid::{ multihash::MultihashDigest, Cid, }; +use futures_util::stream::BoxStream; use iroh_bitswap::{Block, Store}; use multihash::Multihash; -use sqlx::{Row, SqlitePool}; +use sqlx::{sqlite::Sqlite, Error, Row, SqlitePool}; #[derive(Debug, Clone)] pub struct SQLiteBlockStore { pool: SqlitePool, } +#[derive(sqlx::FromRow)] +pub struct SQLiteBlock { + pub multihash: Vec, + pub bytes: Vec, +} + impl SQLiteBlockStore { /// ```sql /// CREATE TABLE IF NOT EXISTS blocks ( @@ -101,6 +108,11 @@ impl SQLiteBlockStore { .map(|row| row.get::<'_, Vec, _>(0).into())) } + pub fn scan(&self) -> BoxStream> { + sqlx::query_as::("SELECT multihash, bytes FROM blocks;") + .fetch(&self.pool) + } + /// Store a DAG node into IPFS. /// Reports true when the block does not previously exist in the store. pub async fn put(&self, cid: Cid, blob: Bytes, _links: Vec) -> Result { @@ -238,7 +250,7 @@ mod tests { expect![["7"]].assert_eq(&size.unwrap().to_string()); let block = Store::get(&store, &cid).await.unwrap(); - expect!["bafybeibazl2z4vqp2tmwcfag6wirmtpnomxknqcgrauj7m2yisrz3qjbom"] + expect!["bafybeibazl2z4vqp2tmwcfag6wirmtpnomxknqcgrauj7m2yisrz3qjbom"] // cspell:disable-line .assert_eq(&block.cid().to_string()); expect![["0A050001020304"]].assert_eq(&hex::encode_upper(block.data())); } @@ -268,13 +280,13 @@ mod tests { expect![["7"]].assert_eq(&size.unwrap().to_string()); let block = Store::get(&store, &cid).await.unwrap(); - expect!["bafybeibazl2z4vqp2tmwcfag6wirmtpnomxknqcgrauj7m2yisrz3qjbom"] + expect!["bafybeibazl2z4vqp2tmwcfag6wirmtpnomxknqcgrauj7m2yisrz3qjbom"] // cspell:disable-line .assert_eq(&block.cid().to_string()); expect![["0A050001020304"]].assert_eq(&hex::encode_upper(block.data())); } #[tokio::test] - async fn test_get_nonexistant_block() { + async fn test_get_nonexistent_block() { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); let store: SQLiteBlockStore = SQLiteBlockStore::new(pool).await.unwrap(); diff --git a/recon/Cargo.toml b/recon/Cargo.toml index 0025be278..a8295bd9f 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -38,7 +38,7 @@ libp2p-swarm-test = "0.3.0" pretty = "0.12.1" quickcheck = "1.0.3" regex = "1" -serde_cbor = "0.11.2" +serde_cbor.workspace = true tracing-test.workspace = true [build-dependencies] diff --git a/recon/src/client.rs b/recon/src/client.rs index e2088fe53..fd40a2a45 100644 --- a/recon/src/client.rs +++ b/recon/src/client.rs @@ -129,7 +129,7 @@ where S: Store + Send + 'static, I: InterestProvider + 'static, { - /// Consturct a [`Server`] from a [`Recon`] instance. + /// Construct a [`Server`] from a [`Recon`] instance. pub fn new(recon: Recon) -> Self { let (tx, rx) = channel(1024); Self { diff --git a/recon/src/recon.rs b/recon/src/recon.rs index 3594a913a..a1443b4c9 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -80,9 +80,8 @@ where _ => { // -> (first h(middle) last) response.keys.push(first.to_owned()); - response - .hashes - .push(self.store.hash_range(&first, &last).await?); + let hash: H = self.store.hash_range(&first, &last).await?.hash; + response.hashes.push(hash); response.keys.push(last.to_owned()); } } @@ -93,6 +92,7 @@ where } /// Process an incoming message and respond with a message reply. + /// Return Result<(response_message, synced_count, syncing_count), Error> #[instrument(skip_all, ret)] pub async fn process_messages(&mut self, received: &[Message]) -> Result> { // First we must find the intersection of interests. @@ -116,14 +116,12 @@ where is_synchronized: true, ..Default::default() }; + let mut synced = 0; + let mut syncing = 0; for (range, received) in intersections { trace!(?range, "processing range"); let mut response_message = Message::new(&range.start, &range.end); - for key in &received.keys { - if self.insert(key).await? { - response.is_synchronized = false; - } - } + self.insert_many(received.keys.iter()).await?; if let Some(mut left_fencepost) = self.store.first(&range.start, &range.end).await? { let mut received_hashs = received.hashes.iter(); @@ -145,7 +143,10 @@ where && left_fencepost < *received.keys.last().unwrap() && (response_message.keys.len() < 32 * 1024) { - response.is_synchronized &= response_message + let SynchronizedCount { + is_synchronized, + count, + } = response_message .process_range( &left_fencepost, &right_fencepost, @@ -153,6 +154,12 @@ where &mut self.store, ) .await?; + response.is_synchronized &= is_synchronized; + if is_synchronized { + synced += count; + } else { + syncing += count; + } left_fencepost = right_fencepost; right_fencepost = match received_keys.next() { Some(k) => k.to_owned(), @@ -165,7 +172,10 @@ where received_hash = received_hashs.next().unwrap_or(zero); } if !received.keys.is_empty() { - response.is_synchronized &= response_message + let SynchronizedCount { + is_synchronized, + count, + } = response_message .process_range( received.keys.last().unwrap(), &self @@ -177,6 +187,12 @@ where &mut self.store, ) .await?; + response.is_synchronized &= is_synchronized; + if is_synchronized { + synced += count; + } else { + syncing += count; + } } response_message .end_streak( @@ -190,6 +206,8 @@ where .await?; }; response.messages.push(response_message); + response.synchronized_count = synced; + response.synchronizing_count = syncing; } Ok(response) } @@ -204,6 +222,15 @@ where Ok(new_key) } + /// Insert many keys into the key space. + pub async fn insert_many<'a, IT>(&mut self, keys: IT) -> Result + where + IT: Iterator + Send, + { + let new_key = self.store.insert_many(keys).await?; + Ok(new_key) + } + /// Reports total number of keys pub async fn len(&mut self) -> Result { self.store.len().await @@ -240,6 +267,21 @@ where } } +#[derive(Debug)] +pub struct HashCount +where + H: AssociativeHash, +{ + hash: H, + count: u64, +} + +#[derive(Debug)] +pub struct SynchronizedCount { + is_synchronized: bool, + count: u64, +} + /// Store defines the API needed to store the Recon set. #[async_trait] pub trait Store: std::fmt::Debug { @@ -252,13 +294,27 @@ pub trait Store: std::fmt::Debug { /// Returns true if the key did not previously exist. async fn insert(&mut self, key: &Self::Key) -> Result; + /// Insert new keys into the key space. + /// Returns true if a key did not previously exist. + async fn insert_many<'a, I>(&mut self, keys: I) -> Result + where + I: Iterator + Send, + { + let mut new = false; + for key in keys { + new |= self.insert(key).await?; + } + Ok(new) + } + /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. + /// Returns Result<(Hash, count), Err> async fn hash_range( &mut self, left_fencepost: &Self::Key, right_fencepost: &Self::Key, - ) -> Result; + ) -> Result>; /// Return all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. @@ -672,7 +728,8 @@ where { let h = local_store .hash_range(self.keys.last().unwrap(), left_fencepost) - .await?; + .await? + .hash; // If the left fencepost has not been sent send it now if self.keys.last().unwrap() != left_fencepost { // Add the left_fencepost to end the match streak. @@ -682,27 +739,38 @@ where Ok(()) } - // Process keys within a specific range. Returns true if the ranges were already in sync. + // Process keys within a specific range. The returned value is a SynchronizedCount, where is_synchronized indicates + // whether the range is in sync, and count is the count of keys in the range. async fn process_range( &mut self, left_fencepost: &K, right_fencepost: &K, received_hash: &H, local_store: &mut S, - ) -> Result + ) -> Result where S: Store + Send, { if left_fencepost == right_fencepost { - return Ok(true); + return Ok(SynchronizedCount { + is_synchronized: true, + count: 0, + }); // zero size range is in sync } - let calculated_hash = local_store + let HashCount { + hash: calculated_hash, + count, + }: HashCount = local_store .hash_range(left_fencepost, right_fencepost) .await?; if &calculated_hash == received_hash { - return Ok(true); + // range is in sync, return sync count + return Ok(SynchronizedCount { + is_synchronized: true, + count, + }); } self.end_streak(left_fencepost, local_store).await?; @@ -739,7 +807,10 @@ where self.send_split(left_fencepost, right_fencepost, local_store) .await?; } - Ok(false) + Ok(SynchronizedCount { + is_synchronized: false, + count, + }) } #[instrument(skip(self, local_store))] @@ -771,11 +842,15 @@ where if let Some(mid_key) = mid_key { self.keys.push(mid_key.to_owned()); self.hashes - .push(local_store.hash_range(left_fencepost, &mid_key).await?); + .push(local_store.hash_range(left_fencepost, &mid_key).await?.hash); self.keys.push(right_fencepost.to_owned()); - self.hashes - .push(local_store.hash_range(&mid_key, right_fencepost).await?); + self.hashes.push( + local_store + .hash_range(&mid_key, right_fencepost) + .await? + .hash, + ); } else { bail!("unable to find a split key") }; @@ -836,6 +911,8 @@ struct BoundedMessage { pub struct Response { messages: Vec>, is_synchronized: bool, + synchronized_count: u64, + synchronizing_count: u64, } impl std::fmt::Debug for Response @@ -862,6 +939,8 @@ impl Default for Response { Self { messages: Default::default(), is_synchronized: Default::default(), + synchronized_count: 0, + synchronizing_count: 0, } } } diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index 41010ff08..72651be97 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -7,6 +7,8 @@ use async_trait::async_trait; use crate::recon::{AssociativeHash, Key, MaybeHashedKey, Store}; +use super::HashCount; + /// An implementation of a Store that stores keys in an in-memory BTree #[derive(Clone, Debug)] pub struct BTreeStore @@ -49,19 +51,31 @@ where /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. - pub fn hash_range(&self, left_fencepost: &K, right_fencepost: &K) -> anyhow::Result { + pub fn hash_range( + &self, + left_fencepost: &K, + right_fencepost: &K, + ) -> anyhow::Result> { if left_fencepost >= right_fencepost { - return Ok(H::identity()); + return Ok(HashCount { + hash: H::identity(), + count: 0, + }); } let range = ( Bound::Excluded(left_fencepost), Bound::Excluded(right_fencepost), ); - Ok(H::identity().digest_many( + let hash: H = H::identity().digest_many( self.keys .range(range) .map(|(key, hash)| MaybeHashedKey::new(key, Some(hash))), - )) + ); + let count: usize = self.keys.range(range).count(); + Ok(HashCount { + hash, + count: count as u64, + }) } /// Return all keys in the range between left_fencepost and right_fencepost. @@ -108,7 +122,7 @@ where &mut self, left_fencepost: &Self::Key, right_fencepost: &Self::Key, - ) -> anyhow::Result { + ) -> anyhow::Result> { // Self does not need async to implement hash_range, so it exposes a pub non async hash_range function // and we delegate to its implementation here. BTreeStore::hash_range(self, left_fencepost, right_fencepost) diff --git a/recon/src/recon/sqlitestore.rs b/recon/src/recon/sqlitestore.rs index 56e57da20..1144bccf7 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/recon/src/recon/sqlitestore.rs @@ -1,10 +1,12 @@ #![warn(missing_docs, missing_debug_implementations, clippy::all)] +use super::HashCount; use crate::{AssociativeHash, Key, Store}; use anyhow::Result; use async_trait::async_trait; use sqlx::{Row, SqlitePool}; use std::marker::PhantomData; +use std::result::Result::Ok; use tracing::{debug, instrument}; /// ReconSQLite is a implementation of Recon store @@ -110,7 +112,7 @@ where .fetch_all(&self.pool) .await; match resp { - Ok(_) => Ok(true), + std::result::Result::Ok(_rows) => Ok(true), Err(sqlx::Error::Database(err)) => { if err.is_unique_violation() { Ok(false) @@ -122,14 +124,18 @@ where } } + /// return the hash and count for a range #[instrument(skip(self))] async fn hash_range( &mut self, left_fencepost: &Self::Key, right_fencepost: &Self::Key, - ) -> Result { + ) -> Result> { if left_fencepost >= right_fencepost { - return Ok(H::identity()); + return Ok(HashCount { + hash: H::identity(), + count: 0, + }); } let query = sqlx::query( @@ -137,7 +143,8 @@ where TOTAL(ahash_0) & 0xFFFFFFFF, TOTAL(ahash_1) & 0xFFFFFFFF, TOTAL(ahash_2) & 0xFFFFFFFF, TOTAL(ahash_3) & 0xFFFFFFFF, TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, - TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF + TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, + COUNT(1) FROM recon WHERE sort_key = ? AND key > ? AND key < ?;", ); let row = query @@ -156,7 +163,14 @@ where row.get(6), row.get(7), ]; - Ok(H::from(bytes)) + let count: i64 = row.get(8); // sql int type is signed + let count: u64 = count + .try_into() + .expect("COUNT(1) should never return a negative number"); + Ok(HashCount { + hash: H::from(bytes), + count, + }) } #[instrument(skip(self))] @@ -369,9 +383,10 @@ mod tests { let hash: Sha256a = store .hash_range(&b"a".as_slice().into(), &b"z".as_slice().into()) .await - .unwrap(); + .unwrap() + .hash; expect![[r#"7460F21C83815F5EDC682F7A4154BC09AA3A0AE5DD1A2DEDCD709888A12751CC"#]] - .assert_eq(&hash.to_hex()) + .assert_eq(&hash.to_hex()); } #[test] diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index ffaad1deb..dc3655b6c 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -231,6 +231,7 @@ where BTreeStore::from_set(set) .hash_range(&Bytes::min_value(), &Bytes::max_value()) .unwrap() + .hash }) .collect(), } @@ -527,7 +528,7 @@ async fn word_lists() { } #[tokio::test] async fn response_is_synchronized() { - let mut a = ReconMemoryBytes::new( + let mut client = ReconMemoryBytes::new( BTreeStore::from_set(BTreeSet::from_iter([ Bytes::from("a"), Bytes::from("b"), @@ -537,7 +538,7 @@ async fn response_is_synchronized() { FullInterests::default(), Metrics::register(&mut Registry::default()), ); - let mut x = ReconMemoryBytes::new( + let mut server = ReconMemoryBytes::new( BTreeStore::from_set(BTreeSet::from_iter([ Bytes::from("x"), Bytes::from("y"), @@ -547,21 +548,40 @@ async fn response_is_synchronized() { FullInterests::default(), Metrics::register(&mut Registry::default()), ); - let response = x - .process_messages(&a.initial_messages().await.unwrap()) + // Client -> Server: Init + let server_first_response = server + .process_messages(&client.initial_messages().await.unwrap()) .await .unwrap(); - assert!(!response.is_synchronized); - let response = a.process_messages(&response.messages).await.unwrap(); - assert!(!response.is_synchronized); - let response = x.process_messages(&response.messages).await.unwrap(); - assert!(!response.is_synchronized); + // Server -> Client: Not yet in sync + assert!(!server_first_response.is_synchronized); + let client_first_response = client + .process_messages(&server_first_response.messages) + .await + .unwrap(); + // Client -> Server: Not yet in sync + assert!(!client_first_response.is_synchronized); // After this message we should be synchronized - let response = a.process_messages(&response.messages).await.unwrap(); - assert!(response.is_synchronized); - let response = x.process_messages(&response.messages).await.unwrap(); - assert!(response.is_synchronized); + let server_second_response = server + .process_messages(&client_first_response.messages) + .await + .unwrap(); + // Server -> Client: Synced + assert!(server_second_response.is_synchronized); + let client_second_response = client + .process_messages(&server_second_response.messages) + .await + .unwrap(); + // Client -> Server: Synced + assert!(client_second_response.is_synchronized); + // Check that we remained in sync. This exchange is not needed for a real sync. + let server_third_response = server + .process_messages(&client_second_response.messages) + .await + .unwrap(); + // Once synced, always synced. + assert!(server_third_response.is_synchronized); } #[test]