Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Transaction file propagation (#58)
Browse files Browse the repository at this point in the history
Transaction file propagation enables distribution of transaction output files
among other nodes.
  • Loading branch information
musitdev authored Jan 25, 2024
1 parent da15d10 commit 2cb71b6
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 79 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.txt
.DS_Store
.idea
*.pki
crates/tests/e2e-tests/files
crates/tests/e2e-tests/prover
crates/tests/e2e-tests/verifier
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions crates/cli/src/keyfile.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use libsecp256k1::SecretKey;
use libsecp256k1::{PublicKey, SecretKey};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::fs;
use std::path::PathBuf;

pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult<()> {
pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult<PublicKey> {
let key = SecretKey::random(&mut StdRng::from_entropy());
let key_array = key.serialize();
if !file_path.as_path().exists() {
Ok(fs::write(file_path, &key_array[..])?)
fs::write(file_path, &key_array[..])?;
let pubkey = PublicKey::from_secret_key(&key);
Ok(pubkey)
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
5 changes: 3 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ async fn main() {

match args.command {
ConfCommands::GenerateKey => match gevulot_cli::keyfile::create_key_file(&args.keyfile) {
Ok(()) => println!(
"Key generated and saved in file:{}",
Ok(pubkey) => println!(
"Key generated pubkey:{} and saved in file:{}",
hex::encode(pubkey.serialize()),
args.keyfile.to_str().unwrap_or("")
),
Err(err) => println!("Error during key file creation:{err}"),
Expand Down
3 changes: 3 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ eyre = "0.6.8"
futures-util = "0.3"
hex = "0.4"
home = "0.5"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
jsonrpsee = { version = "0.20", features = [ "client", "server" ] }
libsecp256k1 = "0.7"
num-bigint = { version = "0.4", features = [ "serde" ] }
Expand Down
88 changes: 78 additions & 10 deletions crates/node/src/asset_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{path::PathBuf, sync::Arc, time::Duration};

use eyre::Result;
use eyre::{eyre, Result};
use gevulot_node::types::{
self,
transaction::{Payload, ProgramData},
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::{path::PathBuf, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::{io::AsyncWriteExt, time::sleep};

Expand Down Expand Up @@ -36,14 +37,20 @@ pub struct AssetManager {
config: Arc<Config>,
database: Arc<Database>,
http_client: reqwest::Client,
http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
}

impl AssetManager {
pub fn new(config: Arc<Config>, database: Arc<Database>) -> Self {
pub fn new(
config: Arc<Config>,
database: Arc<Database>,
http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
) -> Self {
AssetManager {
config,
database,
http_client: reqwest::Client::new(),
http_peer_list,
}
}

Expand Down Expand Up @@ -160,21 +167,82 @@ impl AssetManager {
// TODO: Blocking operation.
std::fs::create_dir_all(file_path.parent().unwrap())?;

let mut resp = self.http_client.get(url.clone()).send().await?;
let mut resp = match self.http_client.get(url.clone()).send().await {
Ok(resp) => resp,
Err(err) => {
let uri = file_path
.as_path()
.components()
.rev()
.take(2)
.map(|c| c.as_os_str().to_os_string())
.reduce(|acc, mut el| {
el.push("/");
el.push(&acc);
el
})
.ok_or_else(|| eyre!("Download bad file path: {:?}", file_path))
.and_then(|s| {
s.into_string()
.map_err(|err| eyre!("Download bad file path: {:?}", file_path))
})?;
let peer_urls: Vec<_> = {
let list = self.http_peer_list.read().await;
list.iter()
.filter_map(|(peer, port)| {
port.map(|port| {
//use parse to create an URL, no new method.
let mut url = reqwest::Url::parse("http://localhost").unwrap(); //unwrap always succeed
url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed
url.set_port(Some(port)).unwrap(); //unwrap always succeed
url.set_path(&uri); //unwrap always succeed
url
})
})
.collect()
};
tracing::debug!(
"asset manager download file from uri {uri} to {}, use peer list:{:?}",
file_path.as_path().to_str().unwrap().to_string(),
peer_urls
);

let mut resp = None;
for url in peer_urls {
if let Ok(val) = self.http_client.get(url).send().await {
resp = Some(val);
break;
}
}
match resp {
Some(resp) => resp,
_ => {
return Err(eyre!(
"Download no host found to download the file: {:?}",
file_path
));
}
}
}
};

if resp.status() == reqwest::StatusCode::OK {
let fd = tokio::fs::File::create(&file_path).await?;
//create a tmp file during download.
//this way the file won't be available for download from the other nodes
//until it is completely written.
let mut tmp_file_path = file_path.clone();
tmp_file_path.set_extension(".tmp");

let fd = tokio::fs::File::create(&tmp_file_path).await?;
let mut fd = tokio::io::BufWriter::new(fd);

while let Some(chunk) = resp.chunk().await? {
fd.write_all(&chunk).await?;
}

fd.flush().await?;
tracing::info!(
"downloaded file to {}",
file_path.as_path().to_str().unwrap().to_string()
);
//rename to original name
std::fs::rename(tmp_file_path, file_path)?;
} else {
tracing::error!(
"failed to download file from {}: response status: {}",
Expand Down
15 changes: 9 additions & 6 deletions crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ pub struct Config {
)]
pub node_key_file: PathBuf,

#[arg(
long,
long_help = "",
env = "GEVULOT_P2P_DISCOVERY_ADDR",
default_value = "34.88.251.176:9999"
)]
#[arg(long, long_help = "", env = "GEVULOT_P2P_DISCOVERY_ADDR")]
pub p2p_discovery_addrs: Vec<String>,

#[arg(
Expand All @@ -60,6 +55,14 @@ pub struct Config {
)]
pub p2p_listen_addr: SocketAddr,

#[arg(
long,
long_help = "Port open to download file between nodes. Use P2P interface to bind.",
env = "GEVULOT_HTTP_PORT",
default_value = "9995"
)]
pub http_download_port: u16,

#[arg(
long,
long_help = "P2P PSK passphrase",
Expand Down
25 changes: 20 additions & 5 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,17 @@ impl networking::p2p::TxHandler for P2PTxHandler {
async fn recv_tx(&self, tx: Transaction) -> Result<()> {
// The transaction was received from P2P network so we can consider it
// propagated at this point.
let tx_hash = tx.hash;
let mut tx = tx;
tx.propagated = true;

// Submit the tx to mempool.
self.mempool.write().await.add(tx).await
self.mempool.write().await.add(tx).await?;

//TODO copy paste of the asset manager handle_transaction method.
//added because when a tx arrive from the p2p asset are not added.
//should be done in a better way.
self.database.add_asset(&tx_hash).await
}
}

Expand All @@ -198,6 +204,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
"mempool-pubsub",
config.p2p_listen_addr,
&config.p2p_psk_passphrase,
Some(config.http_download_port),
)
.await,
);
Expand All @@ -212,6 +219,9 @@ async fn run(config: Arc<Config>) -> Result<()> {
)))
.await;

//start http download manager
let download_jh = networking::download_manager::serve_files(&config).await?;

// TODO(tuommaki): read total available resources from config / acquire system stats.
let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 };
let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new(
Expand All @@ -231,7 +241,11 @@ async fn run(config: Arc<Config>) -> Result<()> {
resource_manager.clone(),
);

let asset_mgr = Arc::new(AssetManager::new(config.clone(), database.clone()));
let asset_mgr = Arc::new(AssetManager::new(
config.clone(),
database.clone(),
p2p.as_ref().peer_http_port_list.clone(),
));

let node_key = read_node_key(&config.node_key_file)?;

Expand Down Expand Up @@ -295,10 +309,10 @@ async fn run(config: Arc<Config>) -> Result<()> {
)
.await?;

tracing::info!("gevulot node started");
loop {
sleep(Duration::from_secs(1));
if let Err(err) = download_jh.await {
tracing::info!("download_manager error:{err}");
}
Ok(())
}

/// p2p_beacon brings up P2P networking but nothing else. This function can be
Expand All @@ -311,6 +325,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> {
"gevulot-network",
config.p2p_listen_addr,
&config.p2p_psk_passphrase,
None,
)
.await,
);
Expand Down
102 changes: 87 additions & 15 deletions crates/node/src/networking/download_manager.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,93 @@
use std::sync::Arc;
use crate::cli::Config;
use eyre::Result;
use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{self, Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use std::path::Path;
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio_util::io::ReaderStream;

use crate::{cli::Config, storage};
//start the local server and serve the specified file path.
//Return the server task join handle.
pub async fn serve_files(config: &Config) -> Result<JoinHandle<()>> {
let mut bind_addr = config.p2p_listen_addr;
bind_addr.set_port(config.http_download_port);
let listener = TcpListener::bind(bind_addr).await?;

pub struct DownloadManager {
database: Arc<storage::Database>,
file_storage: Arc<storage::File>,
let jh = tokio::spawn({
let data_directory = config.data_directory.clone();
async move {
loop {
match listener.accept().await {
Ok((stream, _from)) => {
let io = TokioIo::new(stream);
tokio::task::spawn({
let data_directory = data_directory.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(|req| server_process_file(req, &data_directory)),
)
.await
{
tracing::error!("Error serving node connection: {err}. Wait for a new node connection.");
}
}
});
}
Err(err) => {
tracing::error!("Error during node connection to file http server:{err}");
}
}
}
}
});

Ok(jh)
}

impl DownloadManager {
pub fn new(
_config: Arc<Config>,
database: Arc<storage::Database>,
file_storage: Arc<storage::File>,
) -> Self {
DownloadManager {
database,
file_storage,
async fn server_process_file(
req: Request<body::Incoming>,
data_directory: &Path,
) -> std::result::Result<Response<BoxBody<Bytes, std::io::Error>>, hyper::Error> {
let file_digest = &req.uri().path()[1..];

let mut file_path = data_directory.join("images").join(file_digest);

let file = match File::open(&file_path).await {
Ok(file) => file,
Err(_) => {
//try to see if the file is currently being updated.
file_path.set_extension(".tmp");
let (status_code, message) = if file_path.as_path().exists() {
(
StatusCode::PARTIAL_CONTENT,
"Update in progess, retry later",
)
} else {
(StatusCode::NOT_FOUND, "File not found")
};
return Ok(Response::builder()
.status(status_code)
.body(Full::new(message.into()).map_err(|e| match e {}).boxed())
.unwrap());
}
}
};

let file_hash = file_digest.to_string();
let reader = ReaderStream::new(file);
let stream_body = StreamBody::new(reader.map_ok(Frame::data));

Ok(Response::builder()
.status(StatusCode::OK)
.body(BodyExt::boxed(stream_body))
.unwrap())
}
Loading

0 comments on commit 2cb71b6

Please sign in to comment.