From 94c9dc98ba4914c54c23393921a145426c6c208a Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Sat, 6 Jan 2024 18:35:56 +0530 Subject: [PATCH] refactor(client)!: refactor `Files` into `FilesUpload` --- sn_cli/src/subcommands/files/mod.rs | 16 +- sn_client/src/files/api.rs | 468 ---------------- sn_client/src/files/mod.rs | 733 +++++++++++++------------- sn_client/src/files/upload.rs | 441 ++++++++++++++++ sn_client/src/lib.rs | 4 +- sn_node/tests/data_with_churn.rs | 16 +- sn_node/tests/nodes_rewards.rs | 24 +- sn_node/tests/storage_payments.rs | 16 +- sn_node/tests/verify_data_location.rs | 8 +- 9 files changed, 843 insertions(+), 883 deletions(-) delete mode 100644 sn_client/src/files/api.rs create mode 100644 sn_client/src/files/upload.rs diff --git a/sn_cli/src/subcommands/files/mod.rs b/sn_cli/src/subcommands/files/mod.rs index a92641fe5f..89d742d6e0 100644 --- a/sn_cli/src/subcommands/files/mod.rs +++ b/sn_cli/src/subcommands/files/mod.rs @@ -20,8 +20,8 @@ use indicatif::{ProgressBar, ProgressStyle}; use rand::{seq::SliceRandom, thread_rng}; use serde::Deserialize; use sn_client::{ - Client, Error as ClientError, FileUploadEvent, Files, FilesApi, FilesDownload, - FilesDownloadEvent, BATCH_SIZE, MAX_UPLOAD_RETRIES, + Client, Error as ClientError, FileUploadEvent, FilesApi, FilesDownload, FilesDownloadEvent, + FilesUpload, BATCH_SIZE, MAX_UPLOAD_RETRIES, }; use sn_protocol::storage::{Chunk, ChunkAddress}; use sn_transfers::{Error as TransfersError, WalletError}; @@ -306,11 +306,11 @@ async fn upload_files( let chunks_to_upload_len = chunks_to_upload.len(); let progress_bar = get_progress_bar(chunks_to_upload.len() as u64)?; let total_existing_chunks = Arc::new(AtomicU64::new(0)); - let mut files = Files::new(files_api) + let mut files_upload = FilesUpload::new(files_api) .set_batch_size(batch_size) .set_verify_store(verify_store) .set_max_retries(max_retries); - let mut upload_event_rx = files.get_upload_events(); + let mut upload_event_rx = files_upload.get_upload_events(); // keep track of the progress in a separate task let progress_bar_clone = progress_bar.clone(); let total_existing_chunks_clone = total_existing_chunks.clone(); @@ -384,7 +384,7 @@ async fn upload_files( // upload the files println!("Uploading {chunks_to_upload_len} chunks",); let now = Instant::now(); - let upload_result = match files.upload_chunks(chunks_to_upload).await { + let upload_result = match files_upload.upload_chunks(chunks_to_upload).await { Ok(()) => {Ok(())} Err(ClientError::Transfers(WalletError::Transfer(TransfersError::NotEnoughBalance( available, @@ -405,9 +405,9 @@ async fn upload_files( let elapsed = format_elapsed_time(now.elapsed()); let total_existing_chunks = total_existing_chunks.load(Ordering::Relaxed); - let total_storage_cost = files.get_upload_storage_cost(); - let total_royalty_fees = files.get_upload_royalty_fees(); - let final_balance = files.get_upload_final_balance(); + let total_storage_cost = files_upload.get_upload_storage_cost(); + let total_royalty_fees = files_upload.get_upload_royalty_fees(); + let final_balance = files_upload.get_upload_final_balance(); let uploaded_chunks = chunks_to_upload_len - total_existing_chunks as usize; println!("Among {chunks_to_upload_len} chunks, found {total_existing_chunks} already existed in network, uploaded the leftover {uploaded_chunks} chunks in {elapsed}"); diff --git a/sn_client/src/files/api.rs b/sn_client/src/files/api.rs deleted file mode 100644 index 655c9fb9b0..0000000000 --- a/sn_client/src/files/api.rs +++ /dev/null @@ -1,468 +0,0 @@ -// Copyright 2023 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use crate::{ - chunks::{to_chunk, DataMapLevel, Error as ChunksError, SmallFile}, - error::Result, - Client, WalletClient, -}; -use bytes::Bytes; -use futures::{future::join_all, stream::FuturesOrdered, StreamExt}; -use itertools::Itertools; -use libp2p::PeerId; -use self_encryption::{self, ChunkInfo, DataMap, EncryptedChunk, MIN_ENCRYPTABLE_BYTES}; -use self_encryption::{decrypt_full_set, StreamSelfDecryptor}; -use sn_protocol::{ - storage::{Chunk, ChunkAddress}, - NetworkAddress, -}; -use sn_transfers::{LocalWallet, NanoTokens}; - -use std::{ - fs::{self, create_dir_all, File}, - io::{Read, Write}, - path::{Path, PathBuf}, - time::Instant, -}; -use tempfile::tempdir; -use tokio::task; -use tracing::trace; -use xor_name::XorName; - -/// File APIs. -#[derive(Clone)] -pub struct FilesApi { - pub(crate) client: Client, - pub(crate) wallet_dir: PathBuf, -} - -/// This is the (file xorname, datamap_data, filesize, and chunks) -/// If the DataMapChunk exists and is not stored on the network, then it will not be accessible at this address of ChunkAddress(XorName) . -type ChunkFileResult = Result<(ChunkAddress, Option, u64, Vec<(XorName, PathBuf)>)>; - -impl FilesApi { - /// Create file apis instance. - pub fn new(client: Client, wallet_dir: PathBuf) -> Self { - Self { client, wallet_dir } - } - - /// Return the client instance - pub fn client(&self) -> &Client { - &self.client - } - - /// Create a new WalletClient for a given root directory. - pub fn wallet(&self) -> Result { - let path = self.wallet_dir.as_path(); - let wallet = LocalWallet::load_from(path)?; - - Ok(WalletClient::new(self.client.clone(), wallet)) - } - - /// Reads a file from the network, whose contents are contained within one or more chunks. - /// Optionally we can pass a data_map_chunk if the data_map is local to this machine - pub async fn read_bytes( - &self, - address: ChunkAddress, - downloaded_file_path: Option, - data_map_chunk: Option, - show_holders: bool, - batch_size: usize, - ) -> Result> { - let chunk = if let Some(chunk) = data_map_chunk { - info!("Downloading via supplied local datamap"); - chunk - } else { - match self.client.get_chunk(address, show_holders).await { - Ok(chunk) => chunk, - Err(err) => { - error!("Failed to fetch head chunk {address:?}"); - return Err(err); - } - } - }; - - // first try to deserialize a LargeFile, if it works, we go and seek it - if let Ok(data_map) = self.unpack_chunk(chunk.clone(), batch_size).await { - self.read_all(data_map, downloaded_file_path, show_holders, batch_size) - .await - } else { - // if an error occurs, we assume it's a SmallFile - if let Some(path) = downloaded_file_path { - fs::write(path, chunk.value().clone())?; - Ok(None) - } else { - Ok(Some(chunk.value().clone())) - } - } - } - - /// Read bytes from the network. The contents are spread across - /// multiple chunks in the network. This function invokes the self-encryptor and returns - /// the data that was initially stored. - /// - /// Takes `position` and `length` arguments which specify the start position - /// and the length of bytes to be read. - /// Passing `0` to position reads the data from the beginning, - /// and the `length` is just an upper limit. - pub async fn read_from( - &self, - address: ChunkAddress, - position: usize, - length: usize, - batch_size: usize, - ) -> Result { - trace!("Reading {length} bytes at: {address:?}, starting from position: {position}"); - let chunk = self.client.get_chunk(address, false).await?; - - // First try to deserialize a LargeFile, if it works, we go and seek it. - // If an error occurs, we consider it to be a SmallFile. - if let Ok(data_map) = self.unpack_chunk(chunk.clone(), batch_size).await { - return self.seek(data_map, position, length).await; - } - - // The error above is ignored to avoid leaking the storage format detail of SmallFiles and LargeFiles. - // The basic idea is that we're trying to deserialize as one, and then the other. - // The cost of it is that some errors will not be seen without a refactor. - let mut bytes = chunk.value().clone(); - - let _ = bytes.split_to(position); - bytes.truncate(length); - - Ok(bytes) - } - - /// Tries to chunk the file, returning `(head_address, data_map_chunk, file_size, chunk_names)` - /// and writes encrypted chunks to disk. - pub fn chunk_file( - file_path: &Path, - chunk_dir: &Path, - include_data_map_in_chunks: bool, - ) -> ChunkFileResult { - let mut file = File::open(file_path)?; - let metadata = file.metadata()?; - let file_size = metadata.len(); - - let (head_address, data_map_chunk, mut chunks_paths) = - if file_size < MIN_ENCRYPTABLE_BYTES as u64 { - let mut bytes = Vec::new(); - let _ = file.read_to_end(&mut bytes)?; - let chunk = package_small(SmallFile::new(bytes.into())?)?; - - // Write the result to disk - let small_chunk_file_path = chunk_dir.join(hex::encode(*chunk.name())); - info!("Creating normal small chunk in {small_chunk_file_path:?}"); - let mut output_file = File::create(small_chunk_file_path.clone())?; - output_file.write_all(&chunk.value)?; - - ( - *chunk.name(), - None, - vec![(*chunk.name(), small_chunk_file_path)], - ) - } else { - let (data_map_chunk, chunks) = encrypt_large(file_path, chunk_dir)?; - (*data_map_chunk.name(), Some(data_map_chunk), chunks) - }; - - debug!("include_data_map_in_chunks {include_data_map_in_chunks:?}"); - - debug!( - "Is there a datamap for chuink?? {:?}", - data_map_chunk.is_some() - ); - // only write out the data_map if one exists for this file - if let Some(data_map_chunk) = &data_map_chunk { - if include_data_map_in_chunks { - info!("Data_map_chunk to be written!"); - let data_map_path = chunk_dir.join(hex::encode(*data_map_chunk.name())); - - trace!("Data_map_chunk being written to {data_map_path:?}"); - let mut output_file = File::create(data_map_path.clone())?; - output_file.write_all(&data_map_chunk.value)?; - - chunks_paths.push((*data_map_chunk.name(), data_map_path)) - } - } - - Ok(( - ChunkAddress::new(head_address), - data_map_chunk.map(|c| c.value), - file_size, - chunks_paths, - )) - } - - /// Directly writes Chunks to the network in the - /// form of immutable self encrypted chunks. - /// - pub async fn get_local_payment_and_upload_chunk( - &self, - chunk: Chunk, - payee: PeerId, - verify_store: bool, - ) -> Result<()> { - let chunk_addr = chunk.network_address(); - trace!("Client upload started for chunk: {chunk_addr:?} to {payee:?}"); - - let wallet_client = self.wallet()?; - let payment = wallet_client.get_payment_for_addr(&chunk_addr)?; - - debug!( - "{:?} payments for chunk: {chunk_addr:?}: {payment:?}", - payment - ); - - self.client - .store_chunk(chunk, payee, payment, verify_store) - .await?; - - trace!("Client upload completed for chunk: {chunk_addr:?}"); - Ok(()) - } - - /// Pay for a given set of chunks. - /// - /// Returns the cost and the resulting new balance of the local wallet. - pub async fn pay_for_chunks( - &self, - chunks: Vec, - ) -> Result<( - (NanoTokens, NanoTokens, NanoTokens), - (Vec<(XorName, PeerId)>, Vec), - )> { - let mut wallet_client = self.wallet()?; - info!("Paying for and uploading {:?} chunks", chunks.len()); - - let ((storage_cost, royalties_fees), (payee_map, skipped_chunks)) = - wallet_client - .pay_for_storage(chunks.iter().map(|name| { - sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name)) - })) - .await?; - - wallet_client.store_local_wallet()?; - let new_balance = wallet_client.balance(); - Ok(( - (storage_cost, royalties_fees, new_balance), - (payee_map, skipped_chunks), - )) - } - - // -------------------------------------------- - // ---------- Private helpers ----------------- - // -------------------------------------------- - - /// Used for testing - pub async fn upload_test_bytes(&self, bytes: Bytes, verify: bool) -> Result { - let temp_dir = tempdir()?; - let file_path = temp_dir.path().join("tempfile"); - let mut file = File::create(&file_path)?; - file.write_all(&bytes)?; - - let chunk_path = temp_dir.path().join("chunk_path"); - create_dir_all(chunk_path.clone())?; - - let (head_address, _data_map, _file_size, chunks_paths) = - Self::chunk_file(&file_path, &chunk_path, true)?; - - for (_chunk_name, chunk_path) in chunks_paths { - let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?)); - self.get_local_payment_and_upload_chunk(chunk, PeerId::random(), verify) - .await?; - } - - Ok(NetworkAddress::ChunkAddress(head_address)) - } - - // Gets and decrypts chunks from the network using nothing else but the data map. - // If a downloaded path is given, the decrypted file will be written to the given path, - // by the decryptor directly. - // Otherwise, will assume the fetched content is a small one and return as bytes. - async fn read_all( - &self, - data_map: DataMap, - decrypted_file_path: Option, - show_holders: bool, - batch_size: usize, - ) -> Result> { - let mut decryptor = if let Some(path) = decrypted_file_path { - StreamSelfDecryptor::decrypt_to_file(Box::new(path), &data_map)? - } else { - let encrypted_chunks = self.try_get_chunks(data_map.infos()).await?; - let bytes = decrypt_full_set(&data_map, &encrypted_chunks) - .map_err(ChunksError::SelfEncryption)?; - return Ok(Some(bytes)); - }; - - let expected_count = data_map.infos().len(); - // let mut missing_chunks = Vec::new(); - let mut ordered_read_futures = FuturesOrdered::new(); - let now = Instant::now(); - - let mut index = 0; - - for chunk_info in data_map.infos().iter() { - let dst_hash = chunk_info.dst_hash; - // The futures are executed concurrently, - // but the result is returned in the order in which they were inserted. - ordered_read_futures.push_back(async move { - ( - dst_hash, - self.client - .get_chunk(ChunkAddress::new(dst_hash), show_holders) - .await, - ) - }); - - if ordered_read_futures.len() >= batch_size || index + batch_size > expected_count { - while let Some((dst_hash, result)) = ordered_read_futures.next().await { - let chunk = result.map_err(|error| { - error!("Chunk missing {dst_hash:?} with {error:?}"); - ChunksError::ChunkMissing(dst_hash) - })?; - let encrypted_chunk = EncryptedChunk { - index, - content: chunk.value().clone(), - }; - let _ = decryptor.next_encrypted(encrypted_chunk)?; - - index += 1; - info!("Client (read all) download progress {index:?}/{expected_count:?}"); - println!("Client (read all) download progress {index:?}/{expected_count:?}"); - } - } - } - - let elapsed = now.elapsed(); - println!("Client downloaded file in {elapsed:?}"); - - Ok(None) - } - - /// Extracts a file DataMapLevel from a chunk. - /// If the DataMapLevel is not the first level mapping directly to the user's contents, - /// the process repeats itself until it obtains the first level DataMapLevel. - pub async fn unpack_chunk(&self, mut chunk: Chunk, batch_size: usize) -> Result { - loop { - match rmp_serde::from_slice(chunk.value()).map_err(ChunksError::Deserialisation)? { - DataMapLevel::First(data_map) => { - return Ok(data_map); - } - DataMapLevel::Additional(data_map) => { - let serialized_chunk = self - .read_all(data_map, None, false, batch_size) - .await? - .expect("error encountered on reading additional datamap"); - chunk = rmp_serde::from_slice(&serialized_chunk) - .map_err(ChunksError::Deserialisation)?; - } - } - } - } - // Gets a subset of chunks from the network, decrypts and - // reads `len` bytes of the data starting at given `pos` of original file. - async fn seek(&self, data_map: DataMap, pos: usize, len: usize) -> Result { - let info = self_encryption::seek_info(data_map.file_size(), pos, len); - let range = &info.index_range; - let all_infos = data_map.infos(); - - let encrypted_chunks = self - .try_get_chunks( - (range.start..range.end + 1) - .clone() - .map(|i| all_infos[i].clone()) - .collect_vec(), - ) - .await?; - - let bytes = - self_encryption::decrypt_range(&data_map, &encrypted_chunks, info.relative_pos, len) - .map_err(ChunksError::SelfEncryption)?; - - Ok(bytes) - } - - async fn try_get_chunks(&self, chunks_info: Vec) -> Result> { - let expected_count = chunks_info.len(); - let mut retrieved_chunks = vec![]; - - let mut tasks = Vec::new(); - for chunk_info in chunks_info.clone().into_iter() { - let client = self.client.clone(); - let task = task::spawn(async move { - let chunk = client - .get_chunk(ChunkAddress::new(chunk_info.dst_hash), false) - .await - .map_err(|error| { - error!("Chunk missing {:?} with {error:?}", chunk_info.dst_hash); - ChunksError::ChunkMissing(chunk_info.dst_hash) - })?; - Ok::(EncryptedChunk { - index: chunk_info.index, - content: chunk.value().clone(), - }) - }); - tasks.push(task); - } - - // This swallowing of errors is basically a compaction into a single - // error saying "didn't get all chunks". - retrieved_chunks.extend(join_all(tasks).await.into_iter().flatten().flatten()); - - info!( - "Client download progress {:?}/{expected_count:?}", - retrieved_chunks.len() - ); - println!( - "Client download progress {:?}/{expected_count:?}", - retrieved_chunks.len() - ); - - if expected_count > retrieved_chunks.len() { - let missing_chunks: Vec = chunks_info - .iter() - .filter_map(|expected_info| { - if retrieved_chunks.iter().any(|retrieved_chunk| { - XorName::from_content(&retrieved_chunk.content) == expected_info.dst_hash - }) { - None - } else { - Some(expected_info.dst_hash) - } - }) - .collect(); - Err(ChunksError::NotEnoughChunksRetrieved { - expected: expected_count, - retrieved: retrieved_chunks.len(), - missing_chunks, - })? - } else { - Ok(retrieved_chunks) - } - } -} - -/// Encrypts a [`LargeFile`] and returns the resulting address and all chunk names. -/// Correspondent encrypted chunks are written in the specified output folder. -/// Does not store anything to the network. -/// -/// Returns data map as a chunk, and the resulting chunks -fn encrypt_large(file_path: &Path, output_dir: &Path) -> Result<(Chunk, Vec<(XorName, PathBuf)>)> { - Ok(crate::chunks::encrypt_large(file_path, output_dir)?) -} - -/// Packages a [`SmallFile`] and returns the resulting address and the chunk. -/// Does not store anything to the network. -fn package_small(file: SmallFile) -> Result { - let chunk = to_chunk(file.bytes()); - if chunk.value().len() >= self_encryption::MIN_ENCRYPTABLE_BYTES { - return Err(ChunksError::SmallFilePaddingNeeded(chunk.value().len()).into()); - } - Ok(chunk) -} diff --git a/sn_client/src/files/mod.rs b/sn_client/src/files/mod.rs index 3808cab454..9b57b8d7f2 100644 --- a/sn_client/src/files/mod.rs +++ b/sn_client/src/files/mod.rs @@ -6,23 +6,36 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -pub(crate) mod api; pub(crate) mod download; +pub(crate) mod upload; use crate::{ - error::{Error as ClientError, Result}, - FilesApi, + chunks::{to_chunk, DataMapLevel, Error as ChunksError, SmallFile}, + error::Result, + Client, WalletClient, }; use bytes::Bytes; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{future::join_all, stream::FuturesOrdered, StreamExt}; +use itertools::Itertools; use libp2p::PeerId; -use sn_protocol::storage::{Chunk, ChunkAddress}; -use sn_transfers::NanoTokens; -use std::{collections::HashSet, path::PathBuf}; -use tokio::{ - sync::mpsc::{self}, - task::JoinHandle, +use self_encryption::{ + self, decrypt_full_set, ChunkInfo, DataMap, EncryptedChunk, StreamSelfDecryptor, + MIN_ENCRYPTABLE_BYTES, }; +use sn_protocol::{ + storage::{Chunk, ChunkAddress}, + NetworkAddress, +}; +use sn_transfers::{LocalWallet, NanoTokens}; +use std::{ + fs::{self, create_dir_all, File}, + io::{Read, Write}, + path::{Path, PathBuf}, + time::Instant, +}; +use tempfile::tempdir; +use tokio::task; +use tracing::trace; use xor_name::XorName; /// `BATCH_SIZE` determines the number of chunks that are processed in parallel during the payment and upload process. @@ -31,421 +44,397 @@ pub const BATCH_SIZE: usize = 16; /// The maximum number of retries to perform on a failed chunk. pub const MAX_UPLOAD_RETRIES: usize = 3; -/// The maximum number of sequential payment failures before aborting the upload process. -const MAX_SEQUENTIAL_PAYMENT_FAILS: usize = 3; - -/// The events emitted from the upload process. -pub enum FileUploadEvent { - /// Uploaded a Chunk to the network - Uploaded(ChunkAddress), - /// The Chunk already exists in the network, skipping upload. - AlreadyExistsInNetwork(ChunkAddress), - /// Failed to upload a chunk to the network. This event can be emitted multiple times for a single ChunkAddress - /// if retries are enabled. - FailedToUpload(ChunkAddress), - /// Payment for a batch of chunk has been made. - PayedForChunks { - storage_cost: NanoTokens, - royalty_fees: NanoTokens, - new_balance: NanoTokens, - }, - /// The upload process has terminated with an error. - Error, -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct ChunkInfo { - name: XorName, - path: PathBuf, +/// File APIs. +#[derive(Clone)] +pub struct FilesApi { + pub(crate) client: Client, + pub(crate) wallet_dir: PathBuf, } -/// `Files` provides functionality for uploading and downloading chunks with support for retries and queuing. -/// This struct is not cloneable. To create a new instance with default configuration, use the `new` function. -/// To modify the configuration, use the provided setter methods (`set_...` functions). -pub struct Files { - // Configurations - batch_size: usize, - verify_store: bool, - show_holders: bool, - max_retries: usize, - // API - api: FilesApi, - // Uploads - failed_chunks: HashSet, - uploading_chunks: FuturesUnordered)>>, - // Upload stats - upload_storage_cost: NanoTokens, - upload_royalty_fees: NanoTokens, - upload_final_balance: NanoTokens, - // Events - event_sender: Option>, - logged_event_sender_absence: bool, -} +/// This is the (file xorname, datamap_data, filesize, and chunks) +/// If the DataMapChunk exists and is not stored on the network, then it will not be accessible at this address of ChunkAddress(XorName) . +type ChunkFileResult = Result<(ChunkAddress, Option, u64, Vec<(XorName, PathBuf)>)>; -impl Files { - /// Creates a new instance of `Files` with the default configuration. - /// To modify the configuration, use the provided setter methods (`set_...` functions). - pub fn new(files_api: FilesApi) -> Self { - Self { - batch_size: BATCH_SIZE, - verify_store: true, - show_holders: false, - max_retries: MAX_UPLOAD_RETRIES, - api: files_api, - failed_chunks: Default::default(), - uploading_chunks: Default::default(), - upload_storage_cost: NanoTokens::zero(), - upload_royalty_fees: NanoTokens::zero(), - upload_final_balance: NanoTokens::zero(), - event_sender: None, - logged_event_sender_absence: false, - } +impl FilesApi { + /// Create file apis instance. + pub fn new(client: Client, wallet_dir: PathBuf) -> Self { + Self { client, wallet_dir } } - /// Sets the default batch size that determines the number of chunks that are processed in parallel during the - /// payment and upload process. - /// - /// By default, this option is set to the constant `BATCH_SIZE: usize = 64`. - pub fn set_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; - self + /// Return the client instance + pub fn client(&self) -> &Client { + &self.client } - /// Sets the option to verify the chunks after they have been uploaded. - /// - /// By default, this option is set to true. - pub fn set_verify_store(mut self, verify_store: bool) -> Self { - self.verify_store = verify_store; - self - } + /// Create a new WalletClient for a given root directory. + pub fn wallet(&self) -> Result { + let path = self.wallet_dir.as_path(); + let wallet = LocalWallet::load_from(path)?; - /// Sets the option to display the holders that are expected to be holding a chunk during verification. - /// - /// By default, this option is set to false. - pub fn set_show_holders(mut self, show_holders: bool) -> Self { - self.show_holders = show_holders; - self + Ok(WalletClient::new(self.client.clone(), wallet)) } - /// Sets the maximum number of retries to perform if a chunk fails to upload. + /// Read bytes from the network. The contents are spread across + /// multiple chunks in the network. This function invokes the self-encryptor and returns + /// the data that was initially stored. /// - /// By default, this option is set to the constant `MAX_UPLOAD_RETRIES: usize = 3`. - pub fn set_max_retries(mut self, max_retries: usize) -> Self { - self.max_retries = max_retries; - self - } - - /// Returns a receiver for file upload events. - /// This method is optional and the upload process can be performed without it. - pub fn get_upload_events(&mut self) -> mpsc::Receiver { - let (event_sender, event_receiver) = mpsc::channel(10); - // should we return error if an sender is already set? - self.event_sender = Some(event_sender); + /// Takes `position` and `length` arguments which specify the start position + /// and the length of bytes to be read. + /// Passing `0` to position reads the data from the beginning, + /// and the `length` is just an upper limit. + pub async fn read_from( + &self, + address: ChunkAddress, + position: usize, + length: usize, + batch_size: usize, + ) -> Result { + trace!("Reading {length} bytes at: {address:?}, starting from position: {position}"); + let chunk = self.client.get_chunk(address, false).await?; + + // First try to deserialize a LargeFile, if it works, we go and seek it. + // If an error occurs, we consider it to be a SmallFile. + if let Ok(data_map) = self.unpack_chunk(chunk.clone(), batch_size).await { + return self.seek(data_map, position, length).await; + } - event_receiver - } + // The error above is ignored to avoid leaking the storage format detail of SmallFiles and LargeFiles. + // The basic idea is that we're trying to deserialize as one, and then the other. + // The cost of it is that some errors will not be seen without a refactor. + let mut bytes = chunk.value().clone(); - /// Returns the total amount of fees paid for storage after the upload completes. - pub fn get_upload_storage_cost(&self) -> NanoTokens { - self.upload_storage_cost - } - /// Returns the total amount of royalties paid after the upload completes. - pub fn get_upload_royalty_fees(&self) -> NanoTokens { - self.upload_royalty_fees - } + let _ = bytes.split_to(position); + bytes.truncate(length); - /// Returns the final wallet balance after the upload completes. - pub fn get_upload_final_balance(&self) -> NanoTokens { - self.upload_final_balance + Ok(bytes) } - /// get the set of failed chunks that could not be uploaded - pub fn get_failed_chunks(&self) -> HashSet { - self.failed_chunks - .clone() - .into_iter() - .map(|chunk_info| chunk_info.name) - .collect() - } + /// Tries to chunk the file, returning `(head_address, data_map_chunk, file_size, chunk_names)` + /// and writes encrypted chunks to disk. + pub fn chunk_file( + file_path: &Path, + chunk_dir: &Path, + include_data_map_in_chunks: bool, + ) -> ChunkFileResult { + let mut file = File::open(file_path)?; + let metadata = file.metadata()?; + let file_size = metadata.len(); + + let (head_address, data_map_chunk, mut chunks_paths) = + if file_size < MIN_ENCRYPTABLE_BYTES as u64 { + let mut bytes = Vec::new(); + let _ = file.read_to_end(&mut bytes)?; + let chunk = package_small(SmallFile::new(bytes.into())?)?; + + // Write the result to disk + let small_chunk_file_path = chunk_dir.join(hex::encode(*chunk.name())); + info!("Creating normal small chunk in {small_chunk_file_path:?}"); + let mut output_file = File::create(small_chunk_file_path.clone())?; + output_file.write_all(&chunk.value)?; + + ( + *chunk.name(), + None, + vec![(*chunk.name(), small_chunk_file_path)], + ) + } else { + let (data_map_chunk, chunks) = encrypt_large(file_path, chunk_dir)?; + (*data_map_chunk.name(), Some(data_map_chunk), chunks) + }; - /// Uploads the provided chunks to the network. - /// If you want to track the upload progress, use the `get_upload_events` method. - pub async fn upload_chunks(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { - // make sure we log that the event sender is absent atleast once - self.logged_event_sender_absence = false; + debug!("include_data_map_in_chunks {include_data_map_in_chunks:?}"); - // clean up the trackers/stats - self.failed_chunks = Default::default(); - self.uploading_chunks = Default::default(); - self.upload_storage_cost = NanoTokens::zero(); - self.upload_royalty_fees = NanoTokens::zero(); - self.upload_final_balance = NanoTokens::zero(); + debug!( + "Is there a datamap for chuink?? {:?}", + data_map_chunk.is_some() + ); + // only write out the data_map if one exists for this file + if let Some(data_map_chunk) = &data_map_chunk { + if include_data_map_in_chunks { + info!("Data_map_chunk to be written!"); + let data_map_path = chunk_dir.join(hex::encode(*data_map_chunk.name())); - let result = self.upload(chunks).await; + trace!("Data_map_chunk being written to {data_map_path:?}"); + let mut output_file = File::create(data_map_path.clone())?; + output_file.write_all(&data_map_chunk.value)?; - // send an event indicating that the upload process completed with an error - if result.is_err() { - self.send_event(FileUploadEvent::Error).await?; + chunks_paths.push((*data_map_chunk.name(), data_map_path)) + } } - // drop the sender to close the channel. - let sender = self.event_sender.take(); - drop(sender); - - result + Ok(( + ChunkAddress::new(head_address), + data_map_chunk.map(|c| c.value), + file_size, + chunks_paths, + )) } - async fn upload(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { - let mut sequential_payment_fails = 0; + /// Directly writes Chunks to the network in the + /// form of immutable self encrypted chunks. + /// + pub async fn get_local_payment_and_upload_chunk( + &self, + chunk: Chunk, + payee: PeerId, + verify_store: bool, + ) -> Result<()> { + let chunk_addr = chunk.network_address(); + trace!("Client upload started for chunk: {chunk_addr:?} to {payee:?}"); - let mut chunk_batches = Vec::with_capacity(chunks.len()); - chunk_batches.extend( - chunks - .into_iter() - .map(|(name, path)| ChunkInfo { name, path }), - ); - let n_batches = { - let total_elements = chunk_batches.len(); - // to get +1 if there is a remainder - (total_elements + self.batch_size - 1) / self.batch_size - }; - let mut batch = 1; - let chunk_batches = chunk_batches.chunks(self.batch_size); + let wallet_client = self.wallet()?; + let payment = wallet_client.get_payment_for_addr(&chunk_addr)?; - for chunks_batch in chunk_batches { - trace!("Uploading batch {batch}/{n_batches}"); - if sequential_payment_fails >= MAX_SEQUENTIAL_PAYMENT_FAILS { - return Err(ClientError::SequentialUploadPaymentError); - } - // if the payment fails, we can continue to the next batch - let res = self.handle_chunk_batch(chunks_batch, false).await; - batch += 1; - match res { - Ok(()) => { - trace!("Uploaded batch {batch}/{n_batches}"); - sequential_payment_fails = 0; - } - Err(err) => match err { - ClientError::CouldNotVerifyTransfer(err) => { - warn!( - "Failed to verify transfer validity in the network. Chunk batch will be retried... {err:?}" - ); - println!( - "Failed to verify transfer validity in the network. Chunk batch will be retried..." - ); - sequential_payment_fails += 1; - continue; - } - error => { - return Err(error); - } - }, - } - } - - // ensure we wait on any remaining uploading_chunks - self.progress_uploading_chunks(true).await?; - - let mut retry_count = 0; - let max_retries = self.max_retries; - let mut failed_chunks_to_upload = self.take_failed_chunks(); - while !failed_chunks_to_upload.is_empty() && retry_count < max_retries { - warn!( - "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", - failed_chunks_to_upload.len() - ); - println!( - "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", - failed_chunks_to_upload.len() - ); - retry_count += 1; - let batches = failed_chunks_to_upload.chunks(self.batch_size); - for chunks_batch in batches { - self.handle_chunk_batch(chunks_batch, true).await?; - } - // ensure we wait on any remaining uploading_chunks w/ drain_all - self.progress_uploading_chunks(true).await?; + debug!( + "{:?} payments for chunk: {chunk_addr:?}: {payment:?}", + payment + ); - // take the new failed chunks - failed_chunks_to_upload = self.take_failed_chunks(); - } + self.client + .store_chunk(chunk, payee, payment, verify_store) + .await?; + trace!("Client upload completed for chunk: {chunk_addr:?}"); Ok(()) } - /// Handles a batch of chunks for upload. This includes paying for the chunks, uploading them, - /// and handling any errors that occur during the process. + /// Pay for a given set of chunks. /// - /// If `failed_batch` is true, we emit FilesUploadEvent::Uploaded for the skipped_chunks. This is because, - /// the failed_batch was already paid for, but could not be verified on the first try. - async fn handle_chunk_batch( - &mut self, - chunks_batch: &[ChunkInfo], - failed_batch: bool, - ) -> Result<()> { - // while we don't have a full batch_size of ongoing uploading_chunks - // we can pay for the next batch and carry on - self.progress_uploading_chunks(false).await?; - - // pay for and verify payment... if we don't verify here, chunks uploads will surely fail - let (payee_map, skipped_chunks) = match self - .api - .pay_for_chunks(chunks_batch.iter().map(|info| info.name).collect()) - .await - { - Ok(((storage_cost, royalty_fees, new_balance), (payee_map, skipped_chunks))) => { - // store the stats and emit event too - self.upload_storage_cost = self - .upload_storage_cost - .checked_add(storage_cost) - .ok_or(ClientError::TotalPriceTooHigh)?; - self.upload_royalty_fees = self - .upload_royalty_fees - .checked_add(royalty_fees) - .ok_or(ClientError::TotalPriceTooHigh)?; - self.upload_final_balance = new_balance; - self.send_event(FileUploadEvent::PayedForChunks { - storage_cost, - royalty_fees, - new_balance, - }) + /// Returns the cost and the resulting new balance of the local wallet. + pub async fn pay_for_chunks( + &self, + chunks: Vec, + ) -> Result<( + (NanoTokens, NanoTokens, NanoTokens), + (Vec<(XorName, PeerId)>, Vec), + )> { + let mut wallet_client = self.wallet()?; + info!("Paying for and uploading {:?} chunks", chunks.len()); + + let ((storage_cost, royalties_fees), (payee_map, skipped_chunks)) = + wallet_client + .pay_for_storage(chunks.iter().map(|name| { + sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name)) + })) .await?; - (payee_map, skipped_chunks) - } - Err(err) => return Err(err), - }; - let mut chunks_to_upload = chunks_batch.to_vec(); - // don't reupload skipped chunks - chunks_to_upload.retain(|info| !skipped_chunks.contains(&info.name)); + wallet_client.store_local_wallet()?; + let new_balance = wallet_client.balance(); + Ok(( + (storage_cost, royalties_fees, new_balance), + (payee_map, skipped_chunks), + )) + } - // send update about the existing chunks - for chunk in skipped_chunks { - if failed_batch { - // the chunk was already paid for but might have not been verified on the first try. - self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new(chunk))) - .await?; - } else { - // if during the first try we skip the chunk, then it was already uploaded. - self.send_event(FileUploadEvent::AlreadyExistsInNetwork(ChunkAddress::new( - chunk, - ))) - .await?; - } - } + // -------------------------------------------- + // ---------- Private helpers ----------------- + // -------------------------------------------- - // upload paid chunks - for chunk_info in chunks_to_upload.into_iter() { - let files_api = self.api.clone(); - let verify_store = self.verify_store; + /// Used for testing + pub async fn upload_test_bytes(&self, bytes: Bytes, verify: bool) -> Result { + let temp_dir = tempdir()?; + let file_path = temp_dir.path().join("tempfile"); + let mut file = File::create(&file_path)?; + file.write_all(&bytes)?; - let payee = if let Some(payee) = payee_map - .iter() - .find(|itr| itr.0 == chunk_info.name) - .map(|result| result.1) - { - payee - } else { - error!( - "Cannot find payee of {:?} among the payee_map", - chunk_info.name - ); - continue; - }; - - // Spawn a task for each chunk to be uploaded - let handle = tokio::spawn(Self::upload_chunk( - files_api, - chunk_info, - payee, - verify_store, - )); + let chunk_path = temp_dir.path().join("chunk_path"); + create_dir_all(chunk_path.clone())?; - self.progress_uploading_chunks(false).await?; + let (head_address, _data_map, _file_size, chunks_paths) = + Self::chunk_file(&file_path, &chunk_path, true)?; - self.uploading_chunks.push(handle); + for (_chunk_name, chunk_path) in chunks_paths { + let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?)); + self.get_local_payment_and_upload_chunk(chunk, PeerId::random(), verify) + .await?; } - Ok(()) + Ok(NetworkAddress::ChunkAddress(head_address)) } - /// Progresses the uploading of chunks. If the number of ongoing uploading chunks is less than the batch size, - /// it pays for the next batch and continues. If an error occurs during the upload, it will be returned. - /// - /// If `drain_all` is true, will wait for all ongoing uploads to complete before returning. - async fn progress_uploading_chunks(&mut self, drain_all: bool) -> Result<()> { - while drain_all || self.uploading_chunks.len() >= self.batch_size { - if let Some(result) = self.uploading_chunks.next().await { - // bail if we've had any errors so far - match result? { - (chunk_info, Ok(())) => { - self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new( - chunk_info.name, - ))) - .await?; - } - (chunk_info, Err(err)) => { - warn!("Failed to upload a chunk: {err}"); - self.send_event(FileUploadEvent::FailedToUpload(ChunkAddress::new( - chunk_info.name, - ))) - .await?; - // store the failed chunk to be retried later - self.failed_chunks.insert(chunk_info); - } + // Gets and decrypts chunks from the network using nothing else but the data map. + // If a downloaded path is given, the decrypted file will be written to the given path, + // by the decryptor directly. + // Otherwise, will assume the fetched content is a small one and return as bytes. + async fn read_all( + &self, + data_map: DataMap, + decrypted_file_path: Option, + show_holders: bool, + batch_size: usize, + ) -> Result> { + let mut decryptor = if let Some(path) = decrypted_file_path { + StreamSelfDecryptor::decrypt_to_file(Box::new(path), &data_map)? + } else { + let encrypted_chunks = self.try_get_chunks(data_map.infos()).await?; + let bytes = decrypt_full_set(&data_map, &encrypted_chunks) + .map_err(ChunksError::SelfEncryption)?; + return Ok(Some(bytes)); + }; + + let expected_count = data_map.infos().len(); + // let mut missing_chunks = Vec::new(); + let mut ordered_read_futures = FuturesOrdered::new(); + let now = Instant::now(); + + let mut index = 0; + + for chunk_info in data_map.infos().iter() { + let dst_hash = chunk_info.dst_hash; + // The futures are executed concurrently, + // but the result is returned in the order in which they were inserted. + ordered_read_futures.push_back(async move { + ( + dst_hash, + self.client + .get_chunk(ChunkAddress::new(dst_hash), show_holders) + .await, + ) + }); + + if ordered_read_futures.len() >= batch_size || index + batch_size > expected_count { + while let Some((dst_hash, result)) = ordered_read_futures.next().await { + let chunk = result.map_err(|error| { + error!("Chunk missing {dst_hash:?} with {error:?}"); + ChunksError::ChunkMissing(dst_hash) + })?; + let encrypted_chunk = EncryptedChunk { + index, + content: chunk.value().clone(), + }; + let _ = decryptor.next_encrypted(encrypted_chunk)?; + + index += 1; + info!("Client (read all) download progress {index:?}/{expected_count:?}"); + println!("Client (read all) download progress {index:?}/{expected_count:?}"); } - } else { - // we're finished - break; } } - Ok(()) + + let elapsed = now.elapsed(); + println!("Client downloaded file in {elapsed:?}"); + + Ok(None) } - /// Store chunks from chunk_paths (assuming payments have already been made and are in our local wallet). - /// If verify_store is true, we will attempt to fetch the chunks from the network to verify it is stored. - async fn upload_chunk( - files_api: FilesApi, - chunk_info: ChunkInfo, - payee: PeerId, - verify_store: bool, - ) -> (ChunkInfo, Result<()>) { - let chunk_address = ChunkAddress::new(chunk_info.name); - let bytes = match tokio::fs::read(chunk_info.path.clone()).await { - Ok(bytes) => Bytes::from(bytes), - Err(error) => { - warn!("Chunk {chunk_address:?} could not be read from the system from {:?}. - Normally this happens if it has been uploaded, but the cleanup process was interrupted. Ignoring error: {error}", chunk_info.path); - - return (chunk_info, Ok(())); + /// Extracts a file DataMapLevel from a chunk. + /// If the DataMapLevel is not the first level mapping directly to the user's contents, + /// the process repeats itself until it obtains the first level DataMapLevel. + pub async fn unpack_chunk(&self, mut chunk: Chunk, batch_size: usize) -> Result { + loop { + match rmp_serde::from_slice(chunk.value()).map_err(ChunksError::Deserialisation)? { + DataMapLevel::First(data_map) => { + return Ok(data_map); + } + DataMapLevel::Additional(data_map) => { + let serialized_chunk = self + .read_all(data_map, None, false, batch_size) + .await? + .expect("error encountered on reading additional datamap"); + chunk = rmp_serde::from_slice(&serialized_chunk) + .map_err(ChunksError::Deserialisation)?; + } } - }; - let chunk = Chunk::new(bytes); - match files_api - .get_local_payment_and_upload_chunk(chunk, payee, verify_store) - .await - { - Ok(()) => (chunk_info, Ok(())), - Err(err) => (chunk_info, Err(err)), } } - - fn take_failed_chunks(&mut self) -> Vec { - std::mem::take(&mut self.failed_chunks) - .into_iter() - .collect() + // Gets a subset of chunks from the network, decrypts and + // reads `len` bytes of the data starting at given `pos` of original file. + async fn seek(&self, data_map: DataMap, pos: usize, len: usize) -> Result { + let info = self_encryption::seek_info(data_map.file_size(), pos, len); + let range = &info.index_range; + let all_infos = data_map.infos(); + + let encrypted_chunks = self + .try_get_chunks( + (range.start..range.end + 1) + .clone() + .map(|i| all_infos[i].clone()) + .collect_vec(), + ) + .await?; + + let bytes = + self_encryption::decrypt_range(&data_map, &encrypted_chunks, info.relative_pos, len) + .map_err(ChunksError::SelfEncryption)?; + + Ok(bytes) } - async fn send_event(&mut self, event: FileUploadEvent) -> Result<()> { - if let Some(sender) = self.event_sender.as_ref() { - sender.send(event).await.map_err(|err| { - error!("Could not send files event due to {err:?}"); - ClientError::CouldNotSendFilesEvent - })?; - } else if !self.logged_event_sender_absence { - info!("Files upload event sender is not set. Use get_upload_events() if you need to keep track of the progress"); - self.logged_event_sender_absence = true; + async fn try_get_chunks(&self, chunks_info: Vec) -> Result> { + let expected_count = chunks_info.len(); + let mut retrieved_chunks = vec![]; + + let mut tasks = Vec::new(); + for chunk_info in chunks_info.clone().into_iter() { + let client = self.client.clone(); + let task = task::spawn(async move { + let chunk = client + .get_chunk(ChunkAddress::new(chunk_info.dst_hash), false) + .await + .map_err(|error| { + error!("Chunk missing {:?} with {error:?}", chunk_info.dst_hash); + ChunksError::ChunkMissing(chunk_info.dst_hash) + })?; + Ok::(EncryptedChunk { + index: chunk_info.index, + content: chunk.value().clone(), + }) + }); + tasks.push(task); } - Ok(()) + + // This swallowing of errors is basically a compaction into a single + // error saying "didn't get all chunks". + retrieved_chunks.extend(join_all(tasks).await.into_iter().flatten().flatten()); + + info!( + "Client download progress {:?}/{expected_count:?}", + retrieved_chunks.len() + ); + println!( + "Client download progress {:?}/{expected_count:?}", + retrieved_chunks.len() + ); + + if expected_count > retrieved_chunks.len() { + let missing_chunks: Vec = chunks_info + .iter() + .filter_map(|expected_info| { + if retrieved_chunks.iter().any(|retrieved_chunk| { + XorName::from_content(&retrieved_chunk.content) == expected_info.dst_hash + }) { + None + } else { + Some(expected_info.dst_hash) + } + }) + .collect(); + Err(ChunksError::NotEnoughChunksRetrieved { + expected: expected_count, + retrieved: retrieved_chunks.len(), + missing_chunks, + })? + } else { + Ok(retrieved_chunks) + } + } +} + +/// Encrypts a [`LargeFile`] and returns the resulting address and all chunk names. +/// Correspondent encrypted chunks are written in the specified output folder. +/// Does not store anything to the network. +/// +/// Returns data map as a chunk, and the resulting chunks +fn encrypt_large(file_path: &Path, output_dir: &Path) -> Result<(Chunk, Vec<(XorName, PathBuf)>)> { + Ok(crate::chunks::encrypt_large(file_path, output_dir)?) +} + +/// Packages a [`SmallFile`] and returns the resulting address and the chunk. +/// Does not store anything to the network. +fn package_small(file: SmallFile) -> Result { + let chunk = to_chunk(file.bytes()); + if chunk.value().len() >= self_encryption::MIN_ENCRYPTABLE_BYTES { + return Err(ChunksError::SmallFilePaddingNeeded(chunk.value().len()).into()); } + Ok(chunk) } diff --git a/sn_client/src/files/upload.rs b/sn_client/src/files/upload.rs new file mode 100644 index 0000000000..1a2cbf619a --- /dev/null +++ b/sn_client/src/files/upload.rs @@ -0,0 +1,441 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{ + error::{Error as ClientError, Result}, + FilesApi, BATCH_SIZE, MAX_UPLOAD_RETRIES, +}; +use bytes::Bytes; +use futures::{stream::FuturesUnordered, StreamExt}; +use libp2p::PeerId; +use sn_protocol::storage::{Chunk, ChunkAddress}; +use sn_transfers::NanoTokens; +use std::{collections::HashSet, path::PathBuf}; +use tokio::{ + sync::mpsc::{self}, + task::JoinHandle, +}; +use xor_name::XorName; + +/// The maximum number of sequential payment failures before aborting the upload process. +const MAX_SEQUENTIAL_PAYMENT_FAILS: usize = 3; + +/// The events emitted from the upload process. +pub enum FileUploadEvent { + /// Uploaded a Chunk to the network + Uploaded(ChunkAddress), + /// The Chunk already exists in the network, skipping upload. + AlreadyExistsInNetwork(ChunkAddress), + /// Failed to upload a chunk to the network. This event can be emitted multiple times for a single ChunkAddress + /// if retries are enabled. + FailedToUpload(ChunkAddress), + /// Payment for a batch of chunk has been made. + PayedForChunks { + storage_cost: NanoTokens, + royalty_fees: NanoTokens, + new_balance: NanoTokens, + }, + /// The upload process has terminated with an error. + Error, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct ChunkInfo { + name: XorName, + path: PathBuf, +} + +/// `FilesUpload` provides functionality for uploading chunks with support for retries and queuing. +/// This struct is not cloneable. To create a new instance with default configuration, use the `new` function. +/// To modify the configuration, use the provided setter methods (`set_...` functions). +pub struct FilesUpload { + // Configurations + batch_size: usize, + verify_store: bool, + show_holders: bool, + max_retries: usize, + // API + api: FilesApi, + // Uploads + failed_chunks: HashSet, + uploading_chunks: FuturesUnordered)>>, + // Upload stats + upload_storage_cost: NanoTokens, + upload_royalty_fees: NanoTokens, + upload_final_balance: NanoTokens, + // Events + event_sender: Option>, + logged_event_sender_absence: bool, +} + +impl FilesUpload { + /// Creates a new instance of `FilesUpload` with the default configuration. + /// To modify the configuration, use the provided setter methods (`set_...` functions). + pub fn new(files_api: FilesApi) -> Self { + Self { + batch_size: BATCH_SIZE, + verify_store: true, + show_holders: false, + max_retries: MAX_UPLOAD_RETRIES, + api: files_api, + failed_chunks: Default::default(), + uploading_chunks: Default::default(), + upload_storage_cost: NanoTokens::zero(), + upload_royalty_fees: NanoTokens::zero(), + upload_final_balance: NanoTokens::zero(), + event_sender: None, + logged_event_sender_absence: false, + } + } + + /// Sets the default batch size that determines the number of chunks that are processed in parallel during the + /// payment and upload process. + /// + /// By default, this option is set to the constant `BATCH_SIZE: usize = 64`. + pub fn set_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Sets the option to verify the chunks after they have been uploaded. + /// + /// By default, this option is set to true. + pub fn set_verify_store(mut self, verify_store: bool) -> Self { + self.verify_store = verify_store; + self + } + + /// Sets the option to display the holders that are expected to be holding a chunk during verification. + /// + /// By default, this option is set to false. + pub fn set_show_holders(mut self, show_holders: bool) -> Self { + self.show_holders = show_holders; + self + } + + /// Sets the maximum number of retries to perform if a chunk fails to upload. + /// + /// By default, this option is set to the constant `MAX_UPLOAD_RETRIES: usize = 3`. + pub fn set_max_retries(mut self, max_retries: usize) -> Self { + self.max_retries = max_retries; + self + } + + /// Returns a receiver for file upload events. + /// This method is optional and the upload process can be performed without it. + pub fn get_upload_events(&mut self) -> mpsc::Receiver { + let (event_sender, event_receiver) = mpsc::channel(10); + // should we return error if an sender is already set? + self.event_sender = Some(event_sender); + + event_receiver + } + + /// Returns the total amount of fees paid for storage after the upload completes. + pub fn get_upload_storage_cost(&self) -> NanoTokens { + self.upload_storage_cost + } + /// Returns the total amount of royalties paid after the upload completes. + pub fn get_upload_royalty_fees(&self) -> NanoTokens { + self.upload_royalty_fees + } + + /// Returns the final wallet balance after the upload completes. + pub fn get_upload_final_balance(&self) -> NanoTokens { + self.upload_final_balance + } + + /// get the set of failed chunks that could not be uploaded + pub fn get_failed_chunks(&self) -> HashSet { + self.failed_chunks + .clone() + .into_iter() + .map(|chunk_info| chunk_info.name) + .collect() + } + + /// Uploads the provided chunks to the network. + /// If you want to track the upload progress, use the `get_upload_events` method. + pub async fn upload_chunks(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { + // make sure we log that the event sender is absent atleast once + self.logged_event_sender_absence = false; + + // clean up the trackers/stats + self.failed_chunks = Default::default(); + self.uploading_chunks = Default::default(); + self.upload_storage_cost = NanoTokens::zero(); + self.upload_royalty_fees = NanoTokens::zero(); + self.upload_final_balance = NanoTokens::zero(); + + let result = self.upload(chunks).await; + + // send an event indicating that the upload process completed with an error + if result.is_err() { + self.send_event(FileUploadEvent::Error).await?; + } + + // drop the sender to close the channel. + let sender = self.event_sender.take(); + drop(sender); + + result + } + + async fn upload(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { + let mut sequential_payment_fails = 0; + + let mut chunk_batches = Vec::with_capacity(chunks.len()); + chunk_batches.extend( + chunks + .into_iter() + .map(|(name, path)| ChunkInfo { name, path }), + ); + let n_batches = { + let total_elements = chunk_batches.len(); + // to get +1 if there is a remainder + (total_elements + self.batch_size - 1) / self.batch_size + }; + let mut batch = 1; + let chunk_batches = chunk_batches.chunks(self.batch_size); + + for chunks_batch in chunk_batches { + trace!("Uploading batch {batch}/{n_batches}"); + if sequential_payment_fails >= MAX_SEQUENTIAL_PAYMENT_FAILS { + return Err(ClientError::SequentialUploadPaymentError); + } + // if the payment fails, we can continue to the next batch + let res = self.handle_chunk_batch(chunks_batch, false).await; + batch += 1; + match res { + Ok(()) => { + trace!("Uploaded batch {batch}/{n_batches}"); + } + Err(err) => match err { + ClientError::CouldNotVerifyTransfer(err) => { + warn!( + "Failed to verify transfer validity in the network. Chunk batch will be retried... {err:?}" + ); + println!( + "Failed to verify transfer validity in the network. Chunk batch will be retried..." + ); + sequential_payment_fails += 1; + continue; + } + error => { + return Err(error); + } + }, + } + } + + // ensure we wait on any remaining uploading_chunks + self.progress_uploading_chunks(true).await?; + + let mut retry_count = 0; + let max_retries = self.max_retries; + let mut failed_chunks_to_upload = self.take_failed_chunks(); + while !failed_chunks_to_upload.is_empty() && retry_count < max_retries { + warn!( + "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", + failed_chunks_to_upload.len() + ); + println!( + "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", + failed_chunks_to_upload.len() + ); + retry_count += 1; + let batches = failed_chunks_to_upload.chunks(self.batch_size); + for chunks_batch in batches { + self.handle_chunk_batch(chunks_batch, true).await?; + } + // ensure we wait on any remaining uploading_chunks w/ drain_all + self.progress_uploading_chunks(true).await?; + + // take the new failed chunks + failed_chunks_to_upload = self.take_failed_chunks(); + } + + Ok(()) + } + + /// Handles a batch of chunks for upload. This includes paying for the chunks, uploading them, + /// and handling any errors that occur during the process. + /// + /// If `failed_batch` is true, we emit FilesUploadEvent::Uploaded for the skipped_chunks. This is because, + /// the failed_batch was already paid for, but could not be verified on the first try. + async fn handle_chunk_batch( + &mut self, + chunks_batch: &[ChunkInfo], + failed_batch: bool, + ) -> Result<()> { + // while we don't have a full batch_size of ongoing uploading_chunks + // we can pay for the next batch and carry on + self.progress_uploading_chunks(false).await?; + + // pay for and verify payment... if we don't verify here, chunks uploads will surely fail + let (payee_map, skipped_chunks) = match self + .api + .pay_for_chunks(chunks_batch.iter().map(|info| info.name).collect()) + .await + { + Ok(((storage_cost, royalty_fees, new_balance), (payee_map, skipped_chunks))) => { + // store the stats and emit event too + self.upload_storage_cost = self + .upload_storage_cost + .checked_add(storage_cost) + .ok_or(ClientError::TotalPriceTooHigh)?; + self.upload_royalty_fees = self + .upload_royalty_fees + .checked_add(royalty_fees) + .ok_or(ClientError::TotalPriceTooHigh)?; + self.upload_final_balance = new_balance; + self.send_event(FileUploadEvent::PayedForChunks { + storage_cost, + royalty_fees, + new_balance, + }) + .await?; + (payee_map, skipped_chunks) + } + Err(err) => return Err(err), + }; + + let mut chunks_to_upload = chunks_batch.to_vec(); + // don't reupload skipped chunks + chunks_to_upload.retain(|info| !skipped_chunks.contains(&info.name)); + + // send update about the existing chunks + for chunk in skipped_chunks { + if failed_batch { + // the chunk was already paid for but might have not been verified on the first try. + self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new(chunk))) + .await?; + } else { + // if during the first try we skip the chunk, then it was already uploaded. + self.send_event(FileUploadEvent::AlreadyExistsInNetwork(ChunkAddress::new( + chunk, + ))) + .await?; + } + } + + // upload paid chunks + for chunk_info in chunks_to_upload.into_iter() { + let files_api = self.api.clone(); + let verify_store = self.verify_store; + + let payee = if let Some(payee) = payee_map + .iter() + .find(|itr| itr.0 == chunk_info.name) + .map(|result| result.1) + { + payee + } else { + error!( + "Cannot find payee of {:?} among the payee_map", + chunk_info.name + ); + continue; + }; + + // Spawn a task for each chunk to be uploaded + let handle = tokio::spawn(Self::upload_chunk( + files_api, + chunk_info, + payee, + verify_store, + )); + + self.progress_uploading_chunks(false).await?; + + self.uploading_chunks.push(handle); + } + + Ok(()) + } + + /// Progresses the uploading of chunks. If the number of ongoing uploading chunks is less than the batch size, + /// it pays for the next batch and continues. If an error occurs during the upload, it will be returned. + /// + /// If `drain_all` is true, will wait for all ongoing uploads to complete before returning. + async fn progress_uploading_chunks(&mut self, drain_all: bool) -> Result<()> { + while drain_all || self.uploading_chunks.len() >= self.batch_size { + if let Some(result) = self.uploading_chunks.next().await { + // bail if we've had any errors so far + match result? { + (chunk_info, Ok(())) => { + self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new( + chunk_info.name, + ))) + .await?; + } + (chunk_info, Err(err)) => { + warn!("Failed to upload a chunk: {err}"); + self.send_event(FileUploadEvent::FailedToUpload(ChunkAddress::new( + chunk_info.name, + ))) + .await?; + // store the failed chunk to be retried later + self.failed_chunks.insert(chunk_info); + } + } + } else { + // we're finished + break; + } + } + Ok(()) + } + + /// Store chunks from chunk_paths (assuming payments have already been made and are in our local wallet). + /// If verify_store is true, we will attempt to fetch the chunks from the network to verify it is stored. + async fn upload_chunk( + files_api: FilesApi, + chunk_info: ChunkInfo, + payee: PeerId, + verify_store: bool, + ) -> (ChunkInfo, Result<()>) { + let chunk_address = ChunkAddress::new(chunk_info.name); + let bytes = match tokio::fs::read(chunk_info.path.clone()).await { + Ok(bytes) => Bytes::from(bytes), + Err(error) => { + warn!("Chunk {chunk_address:?} could not be read from the system from {:?}. + Normally this happens if it has been uploaded, but the cleanup process was interrupted. Ignoring error: {error}", chunk_info.path); + + return (chunk_info, Ok(())); + } + }; + let chunk = Chunk::new(bytes); + match files_api + .get_local_payment_and_upload_chunk(chunk, payee, verify_store) + .await + { + Ok(()) => (chunk_info, Ok(())), + Err(err) => (chunk_info, Err(err)), + } + } + + fn take_failed_chunks(&mut self) -> Vec { + std::mem::take(&mut self.failed_chunks) + .into_iter() + .collect() + } + + async fn send_event(&mut self, event: FileUploadEvent) -> Result<()> { + if let Some(sender) = self.event_sender.as_ref() { + sender.send(event).await.map_err(|err| { + error!("Could not send files event due to {err:?}"); + ClientError::CouldNotSendFilesEvent + })?; + } else if !self.logged_event_sender_absence { + info!("FilesUpload upload event sender is not set. Use get_upload_events() if you need to keep track of the progress"); + self.logged_event_sender_absence = true; + } + Ok(()) + } +} diff --git a/sn_client/src/lib.rs b/sn_client/src/lib.rs index 9ab99f20b1..3b32d208ae 100644 --- a/sn_client/src/lib.rs +++ b/sn_client/src/lib.rs @@ -26,9 +26,9 @@ pub use self::{ event::{ClientEvent, ClientEventsReceiver}, faucet::{get_tokens_from_faucet, load_faucet_wallet_from_genesis_wallet}, files::{ - api::FilesApi, download::{FilesDownload, FilesDownloadEvent}, - FileUploadEvent, Files, BATCH_SIZE, MAX_UPLOAD_RETRIES, + upload::{FileUploadEvent, FilesUpload}, + FilesApi, BATCH_SIZE, MAX_UPLOAD_RETRIES, }, register::ClientRegister, wallet::{send, WalletClient}, diff --git a/sn_node/tests/data_with_churn.rs b/sn_node/tests/data_with_churn.rs index 0dcf126595..42e5bdc9aa 100644 --- a/sn_node/tests/data_with_churn.rs +++ b/sn_node/tests/data_with_churn.rs @@ -18,7 +18,7 @@ use common::{ }; use eyre::{bail, eyre, Result}; use rand::{rngs::OsRng, Rng}; -use sn_client::{Client, Error, Files, FilesApi, WalletClient, BATCH_SIZE}; +use sn_client::{Client, Error, FilesApi, FilesDownload, FilesUpload, WalletClient}; use sn_logging::LogBuilder; use sn_protocol::{ storage::{ChunkAddress, RegisterAddress, SpendAddress}, @@ -410,12 +410,12 @@ fn store_chunks_task( let chunks_len = chunks.len(); let chunks_name = chunks.iter().map(|(name, _)| *name).collect::>(); - let mut files = Files::new(files_api.clone()).set_show_holders(true); - if let Err(err) = files.upload_chunks(chunks).await { + let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); + if let Err(err) = files_upload.upload_chunks(chunks).await { bail!("Bailing w/ new Chunk ({addr:?}) due to error: {err:?}"); } - let royalties = files.get_upload_royalty_fees(); - let storage_cost = files.get_upload_storage_cost(); + let royalties = files_upload.get_upload_royalty_fees(); + let storage_cost = files_upload.get_upload_storage_cost(); let cost = royalties .checked_add(storage_cost) .ok_or(eyre!("Total storage cost exceed possible token amount"))?; @@ -625,9 +625,9 @@ async fn query_content( } NetworkAddress::ChunkAddress(addr) => { let files_api = FilesApi::new(client.clone(), wallet_dir.to_path_buf()); - let _ = files_api - .read_bytes(*addr, None, None, false, BATCH_SIZE) - .await?; + let mut file_download = FilesDownload::new(files_api); + let _ = file_download.download_file(*addr, None).await?; + Ok(()) } _other => Ok(()), // we don't create/store any other type of content in this test yet diff --git a/sn_node/tests/nodes_rewards.rs b/sn_node/tests/nodes_rewards.rs index db927475db..5edc6f7a81 100644 --- a/sn_node/tests/nodes_rewards.rs +++ b/sn_node/tests/nodes_rewards.rs @@ -15,7 +15,7 @@ use crate::common::{ use assert_fs::TempDir; use bls::{PublicKey, SecretKey, PK_SIZE}; use eyre::{eyre, Result}; -use sn_client::{Client, ClientEvent, Files, WalletClient}; +use sn_client::{Client, ClientEvent, FilesUpload, WalletClient}; use sn_logging::LogBuilder; use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC}; use sn_protocol::safenode_proto::{ @@ -52,9 +52,9 @@ async fn nodes_rewards_for_storing_chunks() -> Result<()> { let prev_rewards_balance = current_rewards_balance()?; println!("With {prev_rewards_balance:?} current balance, paying for {} random addresses... {chunks:?}", chunks.len()); - let mut files = Files::new(files_api.clone()).set_show_holders(true); - files.upload_chunks(chunks).await?; - let storage_cost = files.get_upload_storage_cost(); + let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); + files_upload.upload_chunks(chunks).await?; + let storage_cost = files_upload.get_upload_storage_cost(); println!("Paid {storage_cost:?} total rewards for the chunks"); @@ -114,10 +114,10 @@ async fn nodes_rewards_for_chunks_notifs_over_gossipsub() -> Result<()> { tracing::info!("Paying for {num_of_chunks} random addresses..."); println!("Paying for {num_of_chunks} random addresses..."); - let mut files = Files::new(files_api.clone()).set_show_holders(true); - files.upload_chunks(chunks).await?; - let storage_cost = files.get_upload_storage_cost(); - let royalties_fees = files.get_upload_royalty_fees(); + let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); + files_upload.upload_chunks(chunks).await?; + let storage_cost = files_upload.get_upload_storage_cost(); + let royalties_fees = files_upload.get_upload_royalty_fees(); println!("Random chunks stored, paid {storage_cost}/{royalties_fees}"); @@ -209,10 +209,10 @@ async fn nodes_rewards_transfer_notifs_filter() -> Result<()> { let num_of_chunks = chunks.len(); println!("Paying for {num_of_chunks} chunks"); - let mut files = Files::new(files_api.clone()).set_show_holders(true); - files.upload_chunks(chunks).await?; - let storage_cost = files.get_upload_storage_cost(); - let royalties_fees = files.get_upload_royalty_fees(); + let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); + files_upload.upload_chunks(chunks).await?; + let storage_cost = files_upload.get_upload_storage_cost(); + let royalties_fees = files_upload.get_upload_royalty_fees(); println!("Random chunks stored, paid {storage_cost}/{royalties_fees}"); diff --git a/sn_node/tests/storage_payments.rs b/sn_node/tests/storage_payments.rs index 57c17b7832..59b7561bf6 100644 --- a/sn_node/tests/storage_payments.rs +++ b/sn_node/tests/storage_payments.rs @@ -12,7 +12,7 @@ use crate::common::{client::get_gossip_client_and_wallet, random_content}; use assert_fs::TempDir; use eyre::{eyre, Result}; use rand::Rng; -use sn_client::{Error as ClientError, Files, WalletClient, BATCH_SIZE}; +use sn_client::{Error as ClientError, FilesDownload, FilesUpload, WalletClient}; use sn_logging::LogBuilder; use sn_networking::{Error as NetworkError, GetRecordError}; use sn_protocol::{ @@ -199,12 +199,11 @@ async fn storage_payment_chunk_upload_succeeds() -> Result<()> { ) .await?; - let mut files = Files::new(files_api.clone()).set_show_holders(true); - files.upload_chunks(chunks).await?; + let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); + files_upload.upload_chunks(chunks).await?; - files_api - .read_bytes(file_addr, None, None, false, BATCH_SIZE) - .await?; + let mut files_download = FilesDownload::new(files_api); + let _ = files_download.download_file(file_addr, None).await?; Ok(()) } @@ -246,11 +245,10 @@ async fn storage_payment_chunk_upload_fails_if_no_tokens_sent() -> Result<()> { .await?; println!("Reading {content_addr:?} expected to fail"); + let mut files_download = FilesDownload::new(files_api); assert!( matches!( - files_api - .read_bytes(content_addr, None, None, false, BATCH_SIZE) - .await, + files_download.download_file(content_addr, None).await, Err(ClientError::Network(NetworkError::GetRecordError( GetRecordError::RecordNotFound ))) diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index 5d3cf6dc16..533e79c2a3 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -23,14 +23,14 @@ use libp2p::{ PeerId, }; use rand::{rngs::OsRng, Rng}; -use sn_client::{Client, Files, FilesApi}; +use sn_client::{Client, FilesApi, FilesUpload}; use sn_logging::LogBuilder; use sn_networking::{sort_peers_by_key, CLOSE_GROUP_SIZE}; use sn_protocol::{ safenode_proto::{safe_node_client::SafeNodeClient, NodeInfoRequest, RecordAddressesRequest}, storage::ChunkAddress, + NetworkAddress, PrettyPrintRecordKey, }; -use sn_protocol::{NetworkAddress, PrettyPrintRecordKey}; use std::{ collections::{BTreeSet, HashMap, HashSet}, fs::File, @@ -340,10 +340,10 @@ async fn store_chunks(client: Client, chunk_count: usize, wallet_dir: PathBuf) - let key = PrettyPrintRecordKey::from(&RecordKey::new(&head_chunk_addr.xorname())).into_owned(); - let mut files = Files::new(files_api.clone()) + let mut file_upload = FilesUpload::new(files_api.clone()) .set_show_holders(true) .set_verify_store(false); - files.upload_chunks(chunks).await?; + file_upload.upload_chunks(chunks).await?; uploaded_chunks_count += 1; println!("Stored Chunk with {head_chunk_addr:?} / {key:?}");