Skip to content

Commit

Permalink
docs: prettify all things
Browse files Browse the repository at this point in the history
  • Loading branch information
sacha-l committed May 16, 2024
1 parent 8400fe1 commit 611d111
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 303 deletions.
49 changes: 26 additions & 23 deletions swarm_nl/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2024 Algorealm
// Apache 2.0 License
//
// This file is a part of SwarmNL

//! Core data structures and protocol implementations for building a swarm.
Expand Down Expand Up @@ -112,16 +110,22 @@ impl From<gossipsub::Event> for CoreEvent {

/// Structure containing necessary data to build [`Core`].
pub struct CoreBuilder<T: EventHandler + Clone + Send + Sync + 'static> {
/// The network ID of the network.
network_id: StreamProtocol,
/// The cryptographic keypair of the node.
keypair: Keypair,
/// The TCP and UDP ports to listen on.
tcp_udp_port: (Port, Port),
/// The bootnodes to connect to.
boot_nodes: HashMap<PeerIdString, MultiaddrString>,
/// The blacklist of peers to ignore.
blacklist: Blacklist,
/// the network event handler
/// The network event handler.
handler: T,
/// The size of the stream buffers to use to track application requests to the network layer
/// internally.
stream_size: usize,
/// The IP address to listen on.
ip_address: IpAddr,
/// Connection keep-alive duration while idle.
keep_alive_duration: Seconds,
Expand All @@ -137,7 +141,7 @@ pub struct CoreBuilder<T: EventHandler + Clone + Send + Sync + 'static> {
/// The `Behaviour` of the `Request-Response` protocol. The second field value is the function
/// to handle an incoming request from a peer.
request_response: Behaviour<Rpc, Rpc>,
/// The `Behaviour` of the `GossipSub` protocol
/// The `Behaviour` of the `GossipSub` protocol.
gossipsub: gossipsub::Behaviour,
}

Expand Down Expand Up @@ -243,7 +247,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {

/// Configure the size of the stream buffers to use to track application requests to the network
/// layer internally. This should be as large an possible to prevent dropping off requests to
/// the network layer. Defaults to [`usize::MAX`]
/// the network layer. Defaults to [`usize::MAX`].
pub fn with_stream_size(self, size: usize) -> Self {
CoreBuilder {
stream_size: size,
Expand Down Expand Up @@ -291,10 +295,11 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {
CoreBuilder { kademlia, ..self }
}

/// Configure the `Gossipsub` protocol for the network
/// Configure the `Gossipsub` protocol for the network.
///
/// # Panics
///
/// This function panics if `Gossipsub` cannot be configured properly
/// This function panics if `Gossipsub` cannot be configured properly.
pub fn with_gossipsub(
self,
config: gossipsub::Config,
Expand All @@ -317,7 +322,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {
CoreBuilder { handler, ..self }
}

/// Return the id of the network
/// Return the id of the network.
pub fn network_id(&self) -> String {
self.network_id.to_string()
}
Expand Down Expand Up @@ -601,16 +606,14 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {
let network_core = Core {
keypair: self.keypair,
application_sender,
// network_sender,
// application_receiver,
stream_request_buffer: stream_request_buffer.clone(),
stream_response_buffer: stream_response_buffer.clone(),
current_stream_id: Arc::new(Mutex::new(stream_id)),
// Save handler as the state of the application
state: self.handler,
};

// Spin up task to handle async operations and data on the network.
// Spin up task to handle async operations and data on the network
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(Core::handle_async_operations(
swarm,
Expand All @@ -620,7 +623,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {
network_core.clone(),
));

// Spin up task to handle async operations and data on the network.
// Spin up task to handle async operations and data on the network
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(Core::handle_async_operations(
swarm,
Expand Down Expand Up @@ -713,7 +716,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
false
}

/// Return the node's `PeerId`
/// Return the node's `PeerId`.
pub fn peer_id(&self) -> PeerId {
self.keypair.public().to_peer_id()
}
Expand Down Expand Up @@ -836,9 +839,9 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
///
/// If the internal buffer is full, it will return an error.
pub async fn query_network(&mut self, request: AppData) -> NetworkResult {
// send request
// Send request
if let Some(stream_id) = self.send_to_network(request).await {
// wait to recieve response from the network
// Wait to recieve response from the network
self.recv_from_network(stream_id).await
} else {
Err(NetworkError::StreamBufferOverflow)
Expand Down Expand Up @@ -899,10 +902,10 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
let mut exec_queue_3 = ExecQueue::new();
let mut exec_queue_4 = ExecQueue::new();

// Loop to handle incoming application streams indefinitely.
// Loop to handle incoming application streams indefinitely
loop {
select! {
// handle incoming stream data
// Handle incoming stream data
stream_data = receiver.next() => {
match stream_data {
Some(incoming_data) => {
Expand Down Expand Up @@ -931,7 +934,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
},
// Store a value in the DHT and (optionally) on explicit specific peers
AppData::KademliaStoreRecord { key, value, expiration_time, explicit_peers } => {
// create a kad record
// Create a kad record
let mut record = Record::new(key.clone(), value);

// Set (optional) expiration time
Expand Down Expand Up @@ -1341,12 +1344,12 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
// We just recieved an `Identify` info from a peer.s
network_core.state.identify_info_recieved(peer_id, info.clone());

// disconnect from peer of the network id is different
// Disconnect from peer of the network id is different
if info.protocol_version != network_info.id.as_ref() {
// disconnect
// Disconnect
let _ = swarm.disconnect_peer_id(peer_id);
} else {
// add to routing table if not present already
// Add to routing table if not present already
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, info.listen_addrs[0].clone());
}
}
Expand All @@ -1364,7 +1367,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
// Pass request data to configured request handler
let response_data = network_core.state.rpc_incoming_message_handled(data);

// construct an RPC
// Construct an RPC
let response_rpc = Rpc::ReqResponse { data: response_data };

// Send the response
Expand Down Expand Up @@ -1432,7 +1435,7 @@ impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
established_in,
} => {
// Before a node dails a peer, it firstg adds the peer to its routing table.
// To enable DHT operations, the listener must do the same on establishing a new connection
// To enable DHT operations, the listener must do the same on establishing a new connection.
if let ConnectedPoint::Listener { send_back_addr, .. } = endpoint.clone() {
// Add peer to routing table
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, send_back_addr);
Expand Down
44 changes: 23 additions & 21 deletions swarm_nl/src/core/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2024 Algorealm
// Apache 2.0 License

use libp2p::gossipsub::MessageId;
/// Copyright (c) 2024 Algorealm
use serde::{Deserialize, Serialize};
use std::{collections::VecDeque, time::Instant};
use thiserror::Error;
Expand Down Expand Up @@ -181,7 +183,7 @@ pub type NetworkResult = Result<AppResponse, NetworkError>;
/// This type has a maximum buffer size and will drop subsequent requests when full.
/// It is unlikely to be ever full as the default is usize::MAX except otherwise specified during
/// configuration. It is always good practice to read responses from the internal stream buffer
/// using `query_network()` or explicitly using `recv_from_network`
/// using `query_network()` or explicitly using `recv_from_network`.
#[derive(Clone, Debug)]
pub(super) struct StreamRequestBuffer {
/// Max requests we can keep track of.
Expand Down Expand Up @@ -426,48 +428,48 @@ pub trait EventHandler {

/// Event that announces the beginning of the filtering and authentication of the incoming gossip message.
/// It returns a boolean to specify whether the massage should be dropped or should reach the application.
/// All incoming messages are allowed in by default
/// All incoming messages are allowed in by default.
fn gossipsub_incoming_message_filtered(&mut self, propagation_source: PeerId, message_id: MessageId, source: Option<PeerId>, topic: String, data: Vec<String>) -> bool {
true
}
}

/// Default network event handler
/// Default network event handler.
#[derive(Clone)]
pub struct DefaultHandler;
/// Implement [`EventHandler`] for [`DefaultHandler`]
impl EventHandler for DefaultHandler {
/// Echo the message back to the sender
/// Echo the message back to the sender.
fn rpc_incoming_message_handled(&mut self, data: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
data
}

/// Echo the incoming gossip message to the console
/// Echo the incoming gossip message to the console.
fn gossipsub_incoming_message_handled(&mut self, _source: PeerId, _data: Vec<String>) {
// Default implementation
}

}

/// Important information to obtain from the [`CoreBuilder`], to properly handle network
/// operations
/// operations.
#[derive(Clone)]
pub(super) struct NetworkInfo {
/// The name/id of the network
/// The name/id of the network.
pub id: StreamProtocol,
/// Important information to manage `Ping` operations
/// Important information to manage `Ping` operations.
pub ping: PingInfo,
/// Important information to manage `Gossipsub` operations
/// Important information to manage `Gossipsub` operations.
pub gossipsub: gossipsub_cfg::GossipsubInfo,
}

/// Module that contains important data structures to manage `Ping` operations on the network
/// Module that contains important data structures to manage `Ping` operations on the network.
pub mod ping_config {
use libp2p_identity::PeerId;
use std::{collections::HashMap, time::Duration};

/// Policies to handle a `Ping` error
/// - All connections to peers are closed during a disconnect operation.
/// Policies to handle a `Ping` error.
/// All connections to peers are closed during a disconnect operation.
#[derive(Debug, Clone)]
pub enum PingErrorPolicy {
/// Do not disconnect under any circumstances.
Expand All @@ -481,13 +483,13 @@ pub mod ping_config {
/// Struct that stores critical information for the execution of the [`PingErrorPolicy`].
#[derive(Debug, Clone)]
pub struct PingManager {
/// The number of timeout errors encountered from a peer
/// The number of timeout errors encountered from a peer.
pub timeouts: HashMap<PeerId, u16>,
/// The number of outbound errors encountered from a peer
/// The number of outbound errors encountered from a peer.
pub outbound_errors: HashMap<PeerId, u16>,
}

/// The configuration for the `Ping` protocol
/// The configuration for the `Ping` protocol.
#[derive(Debug, Clone)]
pub struct PingConfig {
/// The interval between successive pings.
Expand All @@ -508,32 +510,32 @@ pub mod ping_config {
}
}

/// Module containing important state relating to the `Gossipsub` protocol
/// Module containing important state relating to the `Gossipsub` protocol.
pub(crate) mod gossipsub_cfg {
use super::*;

/// The struct containing the list of blacklisted peers
/// The struct containing the list of blacklisted peers.
#[derive(Clone, Debug, Default)]
pub struct Blacklist {
// Blacklist
pub list: HashSet<PeerId>,
}

impl Blacklist {
/// Return the inner list we're keeping track of
/// Return the inner list we're keeping track of.
pub fn into_inner(&self) -> HashSet<PeerId> {
self.list.clone()
}
}

/// Important information to manage `Gossipsub` operations
/// Important information to manage `Gossipsub` operations.
#[derive(Clone)]
pub struct GossipsubInfo {
pub blacklist: Blacklist,
}
}

/// Network queue that tracks the execution of application requests in the network layer
/// Network queue that tracks the execution of application requests in the network layer.
pub(super) struct ExecQueue {
buffer: Mutex<VecDeque<StreamId>>,
}
Expand Down
Loading

0 comments on commit 611d111

Please sign in to comment.