From d4b06533d33cc6e1e3ff49b01bce448634c63c1d Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Fri, 29 Nov 2024 02:52:57 -0700 Subject: [PATCH] chore: remove and reuse upstream network delivery wrapper --- Cargo.lock | 140 +++++++++++++++++++++++--- Cargo.toml | 2 +- src/keygen.rs | 5 +- src/rounds/delivery.rs | 218 ----------------------------------------- src/rounds/mod.rs | 2 - src/sign.rs | 7 +- 6 files changed, 136 insertions(+), 238 deletions(-) delete mode 100644 src/rounds/delivery.rs diff --git a/Cargo.lock b/Cargo.lock index bd4a430..eb5cd62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1922,7 +1922,7 @@ dependencies = [ "failure", "futures", "gadget-io", - "gadget-sdk", + "gadget-sdk 0.4.0", "hex", "itertools 0.13.0", "libp2p", @@ -1946,7 +1946,7 @@ checksum = "4f8754ff7a709cf0643f82093fff582192a36afe3b075cde8e752ad9e39d35ed" dependencies = [ "cargo_metadata", "fs2", - "gadget-blueprint-proc-macro-core", + "gadget-blueprint-proc-macro-core 0.1.5", "rustdoc-types", "serde", "serde_json", @@ -1972,7 +1972,7 @@ dependencies = [ "color-eyre", "futures", "gadget-io", - "gadget-sdk", + "gadget-sdk 0.4.0", "lazy_static", "libp2p", "log", @@ -2224,8 +2224,8 @@ dependencies = [ "clap-cargo", "color-eyre", "escargot", - "gadget-blueprint-proc-macro-core", - "gadget-sdk", + "gadget-blueprint-proc-macro-core 0.1.5", + "gadget-sdk 0.4.0", "hex", "k256", "serde_json", @@ -4522,7 +4522,7 @@ dependencies = [ "frost-core", "frost-ed25519", "frost-secp256k1", - "gadget-sdk", + "gadget-sdk 0.5.1", "hex", "proptest", "rand_chacha", @@ -4835,7 +4835,22 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9279fa5ac3579da5e1c5467c395aae4c910cc63a702fcae097d529687398a4b7" dependencies = [ - "gadget-blueprint-proc-macro-core", + "gadget-blueprint-proc-macro-core 0.1.5", + "indexmap 2.6.0", + "itertools 0.13.0", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.87", +] + +[[package]] +name = "gadget-blueprint-proc-macro" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f6acfb3136e908bfa36a0e68d5556298aec5dd743a4fd6f2190fccf7125c5dd" +dependencies = [ + "gadget-blueprint-proc-macro-core 0.2.0", "indexmap 2.6.0", "itertools 0.13.0", "proc-macro2", @@ -4856,10 +4871,32 @@ dependencies = [ ] [[package]] -name = "gadget-context-derive" +name = "gadget-blueprint-proc-macro-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "926d5a5d8c7706be67b50fee8e57d9e9a9c8e4b5430e6ed464c9943128b006bc" +dependencies = [ + "cid", + "ethereum-types", + "serde", +] + +[[package]] +name = "gadget-blueprint-serde" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f15fa82798ae66fec9bea17791a5f37f200ed9803158b08180fd6ba85737c1" +checksum = "0c70fbc91721439daf6b33678e83ebed8b2f48fe73a4ab20a050f91040de7764" +dependencies = [ + "paste", + "serde", + "tangle-subxt", +] + +[[package]] +name = "gadget-context-derive" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41828995faf594c8d7c2e1d825b542aa3500cbd6ad529055bd11c60603db0566" dependencies = [ "proc-macro2", "quote", @@ -4922,8 +4959,89 @@ dependencies = [ "elliptic-curve", "failure", "futures", - "gadget-blueprint-proc-macro", - "gadget-blueprint-proc-macro-core", + "gadget-blueprint-proc-macro 0.3.1", + "gadget-blueprint-proc-macro-core 0.1.5", + "gadget-context-derive", + "gadget-io", + "getrandom", + "hex", + "http-body-util", + "hyper 1.5.0", + "hyper-util", + "itertools 0.13.0", + "k256", + "lazy_static", + "libp2p", + "lock_api", + "log", + "lru-mem", + "nix 0.29.0", + "num-bigint 0.4.6", + "parking_lot 0.12.3", + "prometheus", + "rand", + "round-based", + "scale-info", + "schnorrkel", + "serde", + "serde_json", + "sp-core", + "sp-io", + "sqlx", + "subxt", + "subxt-core", + "subxt-signer", + "symbiotic-rs", + "sysinfo", + "tangle-subxt", + "thiserror", + "tokio", + "tokio-retry", + "tokio-stream", + "tracing", + "tracing-subscriber 0.3.18", + "url", + "uuid 1.11.0", + "w3f-bls", +] + +[[package]] +name = "gadget-sdk" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f104e44f6946ae10ae4c0b87c4a81d0d5c6cc2e59b50fecc5231a1f0c4ea3a" +dependencies = [ + "alloy-contract", + "alloy-json-abi", + "alloy-network", + "alloy-primitives 0.7.7", + "alloy-provider", + "alloy-rpc-types", + "alloy-signer", + "alloy-signer-local", + "alloy-sol-types 0.7.7", + "alloy-transport 0.1.4", + "alloy-transport-http 0.1.4", + "ark-bn254", + "ark-ec", + "ark-ff 0.4.2", + "ark-serialize 0.4.2", + "async-trait", + "auto_impl", + "backon", + "bincode", + "bollard", + "clap 4.5.20", + "color-eyre", + "dashmap 6.1.0", + "ed25519-zebra 4.0.3", + "eigensdk", + "elliptic-curve", + "failure", + "futures", + "gadget-blueprint-proc-macro 0.4.0", + "gadget-blueprint-proc-macro-core 0.2.0", + "gadget-blueprint-serde", "gadget-context-derive", "gadget-io", "getrandom", diff --git a/Cargo.toml b/Cargo.toml index c8d9f60..ff95cc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ round-based = { version = "0.3.0", default-features = false, features = ["derive [dependencies.gadget-sdk] -version = "0.4" +version = "0.5.1" default-features = false features = ["getrandom"] diff --git a/src/keygen.rs b/src/keygen.rs index aa71a27..ba28543 100644 --- a/src/keygen.rs +++ b/src/keygen.rs @@ -4,6 +4,7 @@ use api::services::events::JobCalled; use frost_core::keys::{KeyPackage, PublicKeyPackage}; use frost_core::{Ciphersuite, VerifyingKey}; use gadget_sdk::futures::TryFutureExt; +use gadget_sdk::network::round_based_compat::NetworkDeliveryWrapper; use gadget_sdk::network::Network; use gadget_sdk::subxt_core::ext::sp_core::{ecdsa, Pair}; use gadget_sdk::subxt_core::utils::AccountId32; @@ -14,7 +15,7 @@ use sdk::event_listener::tangle::{ }; use sdk::tangle_subxt::tangle_testnet_runtime::api; -use crate::rounds::{delivery, keygen as keygen_protocol}; +use crate::rounds::keygen as keygen_protocol; use crate::FrostContext; #[derive(Debug, thiserror::Error)] @@ -167,7 +168,7 @@ where .enumerate() .map(|(j, (_, ecdsa))| (j as u16, ecdsa)) .collect(); - let delivery = delivery::NetworkDeliveryWrapper::new(net, i, parties); + let delivery = NetworkDeliveryWrapper::new(net, i, parties); let party = round_based::MpcParty::connected(delivery); let (key_package, public_key_package) = keygen_protocol::run::(&mut rng, t, n, i, party, None).await?; diff --git a/src/rounds/delivery.rs b/src/rounds/delivery.rs deleted file mode 100644 index 7b94d07..0000000 --- a/src/rounds/delivery.rs +++ /dev/null @@ -1,218 +0,0 @@ -use core::pin::Pin; -use core::sync::atomic::AtomicU64; -use core::task::{ready, Context, Poll}; -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::sync::Arc; - -use gadget_sdk::futures::prelude::*; -use gadget_sdk::network::{ - self, IdentifierInfo, Network, NetworkMultiplexer, StreamKey, SubNetwork, -}; -use gadget_sdk::subxt_core::ext::sp_core::ecdsa; -use round_based::{Delivery, Incoming, Outgoing}; -use round_based::{MessageDestination, MessageType, MsgId, PartyIndex}; -use stream::{SplitSink, SplitStream}; - -pub struct NetworkDeliveryWrapper { - /// The wrapped network implementation. - network: NetworkWrapper, -} - -impl NetworkDeliveryWrapper -where - N: Network + Unpin, - M: Clone + Send + Unpin + 'static, - M: serde::Serialize, - M: serde::de::DeserializeOwned, -{ - /// Create a new NetworkDeliveryWrapper over a network implementation with the given party index. - pub fn new(network: N, i: PartyIndex, parties: BTreeMap) -> Self { - let mux = NetworkMultiplexer::new(network); - // By default, we create 4 substreams for each party. - let sub_streams = (1..5) - .map(|i| { - let key = StreamKey { - // This is a dummy task hash, it should be replaced with the actual task hash - task_hash: [0u8; 32], - round_id: i, - }; - let substream = mux.multiplex(key); - (key, substream) - }) - .collect(); - let network = NetworkWrapper { - me: i, - mux, - incoming_queue: VecDeque::new(), - outgoing_queue: VecDeque::new(), - sub_streams, - participants: parties, - next_msg_id: Arc::new(NextMessageId::default()), - _network: core::marker::PhantomData, - }; - NetworkDeliveryWrapper { network } - } -} - -/// A NetworkWrapper wraps a network implementation and implements [`Stream`] and [`Sink`] for -/// it. -pub struct NetworkWrapper { - /// The current party index. - me: PartyIndex, - /// Our network Multiplexer. - mux: NetworkMultiplexer, - /// A Map of substreams for each round. - sub_streams: HashMap, - /// A queue of incoming messages. - incoming_queue: VecDeque>, - /// A queue of outgoing messages. - outgoing_queue: VecDeque>, - /// Participants in the network with their corresponding ECDSA public keys. - // Note: This is a BTreeMap to ensure that the participants are sorted by their party index. - participants: BTreeMap, - next_msg_id: Arc, - _network: core::marker::PhantomData, -} - -impl Delivery for NetworkDeliveryWrapper -where - N: Network + Unpin, - M: Clone + Send + Unpin + 'static, - M: serde::Serialize + serde::de::DeserializeOwned, - M: round_based::ProtocolMessage, -{ - type Send = SplitSink, Outgoing>; - type Receive = SplitStream>; - type SendError = gadget_sdk::Error; - type ReceiveError = gadget_sdk::Error; - - fn split(self) -> (Self::Receive, Self::Send) { - let (sink, stream) = self.network.split(); - (stream, sink) - } -} - -impl Stream for NetworkWrapper -where - N: Network + Unpin, - M: serde::de::DeserializeOwned + Unpin, - M: round_based::ProtocolMessage, -{ - type Item = Result, gadget_sdk::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let sub_streams = self.sub_streams.values(); - // pull all substreams - let mut messages = Vec::new(); - for sub_stream in sub_streams { - let p = sub_stream.next_message().poll_unpin(cx); - let m = match p { - Poll::Ready(Some(msg)) => msg, - _ => continue, - }; - let msg = network::deserialize::(&m.payload)?; - messages.push((m.sender.user_id, m.recipient, msg)); - } - - // Sort the incoming messages by round. - messages.sort_by_key(|(_, _, msg)| msg.round()); - - let this = self.get_mut(); - // Push all messages to the incoming queue - messages - .into_iter() - .map(|(sender, recipient, msg)| Incoming { - id: this.next_msg_id.next(), - sender, - msg_type: match recipient { - Some(_) => MessageType::P2P, - None => MessageType::Broadcast, - }, - msg, - }) - .for_each(|m| this.incoming_queue.push_back(m)); - // Reorder the incoming queue by round message. - let maybe_msg = this.incoming_queue.pop_front(); - if let Some(msg) = maybe_msg { - Poll::Ready(Some(Ok(msg))) - } else { - // No message in the queue, and no message in the substreams. - // Tell the network to wake us up when a new message arrives. - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} - -impl Sink> for NetworkWrapper -where - N: Network + Unpin, - M: Unpin + serde::Serialize, - M: round_based::ProtocolMessage, -{ - type Error = gadget_sdk::Error; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, msg: Outgoing) -> Result<(), Self::Error> { - self.get_mut().outgoing_queue.push_back(msg); - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Dequeue all messages and send them one by one to the network - let this = self.get_mut(); - while let Some(out) = this.outgoing_queue.pop_front() { - // Get the substream to send the message to. - let key = StreamKey { - // TODO: Set the correct task hash from the network. - task_hash: [0u8; 32], - round_id: i32::from(out.msg.round()), - }; - let substream = this - .sub_streams - .entry(key) - .or_insert_with(|| this.mux.multiplex(key)); - // TODO: Set the correct identifier info from the network. - let identifier_info = IdentifierInfo { - block_id: None, - session_id: None, - retry_id: None, - task_id: None, - }; - let (to, to_network_id) = match out.recipient { - MessageDestination::AllParties => (None, None), - MessageDestination::OneParty(p) => (Some(p), this.participants.get(&p).cloned()), - }; - let protocol_message = N::build_protocol_message( - identifier_info, - this.me, - to, - &out.msg, - this.participants.get(&this.me).cloned(), - to_network_id, - ); - let p = substream.send_message(protocol_message).poll_unpin(cx); - match ready!(p) { - Ok(()) => continue, - Err(e) => return Poll::Ready(Err(e)), - } - } - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } -} - -#[derive(Default)] -struct NextMessageId(AtomicU64); - -impl NextMessageId { - pub fn next(&self) -> MsgId { - self.0.fetch_add(1, core::sync::atomic::Ordering::Relaxed) - } -} diff --git a/src/rounds/mod.rs b/src/rounds/mod.rs index e29fe75..2fba047 100644 --- a/src/rounds/mod.rs +++ b/src/rounds/mod.rs @@ -1,5 +1,3 @@ -/// A custom delivery implementation on top of the gossip network. -pub mod delivery; /// FROST Keygen Protocol Rounds pub mod keygen; /// FROST Signing Protocol Rounds diff --git a/src/sign.rs b/src/sign.rs index 73ffeef..a612fb3 100644 --- a/src/sign.rs +++ b/src/sign.rs @@ -1,12 +1,10 @@ -use std::collections::BTreeMap; - -use crate::rounds::delivery; use crate::rounds::sign as sign_protocol; use api::services::events::JobCalled; use color_eyre::eyre; use frost_core::keys::{KeyPackage, PublicKeyPackage}; use frost_core::{Ciphersuite, Signature}; use gadget_sdk::futures::TryFutureExt; +use gadget_sdk::network::round_based_compat::NetworkDeliveryWrapper; use gadget_sdk::network::Network; use gadget_sdk::random::rand::seq::IteratorRandom; use gadget_sdk::random::SeedableRng; @@ -20,6 +18,7 @@ use sdk::event_listener::tangle::{ TangleEventListener, }; use sdk::tangle_subxt::tangle_testnet_runtime::api; +use std::collections::BTreeMap; use crate::FrostContext; @@ -207,7 +206,7 @@ where "Invalid number of signers" ); - let delivery = delivery::NetworkDeliveryWrapper::new(net, i, selected_parties); + let delivery = NetworkDeliveryWrapper::new(net, i, selected_parties); let party = round_based::MpcParty::connected(delivery); let signature = sign_protocol::run::( &mut rng,