Skip to content

Commit

Permalink
Merge pull request #2 from freedelity/hash_algo
Browse files Browse the repository at this point in the history
Add option for choosing hash algorithm
  • Loading branch information
ndusart authored Feb 5, 2024
2 parents cf44570 + 38e6bfa commit c474802
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 43 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- master
- ci

env:
RUSTFLAGS: -Dwarnings
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Unreleased

- New flag `--hash` which allow to specify the hash algorithm used for computing blocks checksums.

# 1.0.0 (2024-02-01)

Initial release
58 changes: 58 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ vendored-openssl = ["ssh2/vendored-openssl"]

[dependencies]
anyhow = "1.0"
clap = { version = "4.4", default-features = false, features = ["derive", "std", "help", "usage"] }
clap = { version = "4.4", default-features = false, features = ["derive", "std", "help", "usage", "error-context"] }
crc32c = "0.6"
daemonize = "0.5"
futures = { version = "0.3", default-features = false }
futures-core = { version = "0.3", default-features = false }
hex = { version = "0.4", default-features = false, features = ["alloc"] }
int-enum = "1.0"
md-5 = { version = "0.10", default-features = false }
num = { version = "0.4", default-features = false }
path-clean = "1.0"
Expand Down
56 changes: 35 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::hash_file::hash_file;
use crate::sync::Loan;
use crate::HashAlgorithm;
use crate::{check_status, write_string, ResumableReadString, ResumableWriteFileBlock};
use anyhow::anyhow;
use futures::future::OptionFuture;
Expand All @@ -11,51 +12,64 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio_stream::StreamExt;

pub async fn new_process(
address: String,
secret: String,
src_path: PathBuf,
dest: PathBuf,
force_truncate: bool,
workers: u8,
block_size: usize,
) -> Result<(), anyhow::Error> {
let mut stream = TcpStream::connect(address).await?;
pub struct ClientProcessOptions {
pub address: String,
pub secret: String,
pub src_path: PathBuf,
pub dest: PathBuf,
pub force_truncate: bool,
pub workers: u8,
pub block_size: usize,
pub hash_algorithm: HashAlgorithm,
}

pub async fn new_process(options: ClientProcessOptions) -> Result<(), anyhow::Error> {
let mut stream = TcpStream::connect(options.address).await?;

// first send secret
stream.write_u8(secret.len().try_into()?).await?;
stream.write_all(secret.as_bytes()).await?;
stream.write_u8(options.secret.len().try_into()?).await?;
stream.write_all(options.secret.as_bytes()).await?;
check_status(&mut stream).await?;

// send dest path
write_string(
&mut stream,
dest.to_str()
options
.dest
.to_str()
.ok_or(anyhow!("Invalid characters in destination path"))?,
)
.await?;

// open source file
let mut src = File::open(&src_path)?;
let mut src = File::open(&options.src_path)?;
let src_size = src.seek(SeekFrom::End(0))?;
src.rewind()?;

// send block size, file size and force flag
stream.write_u64(block_size.try_into()?).await?;
// send block size, file size, force flag and hash algorithm
stream.write_u64(options.block_size.try_into()?).await?;
stream.write_u64(src_size).await?;
stream.write_u8(if force_truncate { 1 } else { 0 }).await?;
stream
.write_u8(if options.force_truncate { 1 } else { 0 })
.await?;
stream.write_u8(options.hash_algorithm.into()).await?;

check_status(&mut stream).await?;

// Start the file hashing
let mut hasher = hash_file(&src_path, block_size, workers)?;
let mut hasher = hash_file(
&options.src_path,
options.block_size,
options.workers,
options.hash_algorithm,
)?;
let mut processing_hash = None;
let mut hash_comparison_over = false;

// Event loop
let mut resumable_read_string = ResumableReadString::new();
let mut block_idx = 0usize;
let end_block_idx = (src_size as f32 / block_size as f32).ceil() as usize;
let end_block_idx = (src_size as f32 / options.block_size as f32).ceil() as usize;
let mut blocks_idx_to_send = VecDeque::new();
let mut resumable_write_block: Option<ResumableWriteFileBlock> = None;
let (mut stream_rx, mut stream_tx) = tokio::io::split(stream);
Expand Down Expand Up @@ -86,7 +100,7 @@ pub async fn new_process(
if hash_comparison_over && blocks_idx_to_send.is_empty() {
break;
} else {
prepare_next_write_block(block_size, &mut blocks_idx_to_send, &mut resumable_write_block).await?;
prepare_next_write_block(options.block_size, &mut blocks_idx_to_send, &mut resumable_write_block).await?;
}
},

Expand All @@ -100,7 +114,7 @@ pub async fn new_process(
Some((_, block_data)) = hasher.recv(), if processing_hash.is_some() && !hash_comparison_over => {
if block_data.hash != processing_hash.unwrap() {
blocks_idx_to_send.push_back((block_idx, block_data.data));
prepare_next_write_block(block_size, &mut blocks_idx_to_send, &mut resumable_write_block).await?;
prepare_next_write_block(options.block_size, &mut blocks_idx_to_send, &mut resumable_write_block).await?;
}
processing_hash = None;
block_idx += 1;
Expand Down
14 changes: 9 additions & 5 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::sync::Loan;
use anyhow::{anyhow, bail};
use int_enum::IntEnum;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[repr(u8)]
#[derive(Debug)]
#[derive(Debug, IntEnum)]
pub enum StatusCode {
Ack = 0,
InvalidSecret = 1,
FileSizeDiffers = 2,
PermissionDenied = 3,
ClientAlreadyConnected = 4,
UnknownHashAlgorithm = 5,
}

pub async fn write_string<T: AsyncWriteExt + std::marker::Unpin, S: Into<String>>(
Expand All @@ -35,23 +37,25 @@ pub async fn write_status<T: AsyncWriteExt + std::marker::Unpin>(
write: &mut T,
status: StatusCode,
) -> Result<(), anyhow::Error> {
write.write_u8(status as u8).await?;
write.write_u8(status.into()).await?;
Ok(())
}

pub async fn check_status<T: AsyncReadExt + std::marker::Unpin>(
read: &mut T,
) -> Result<(), anyhow::Error> {
let status: StatusCode = unsafe { std::mem::transmute(read.read_u8().await?) };
let status = StatusCode::try_from(read.read_u8().await?)
.map_err(|_| anyhow::anyhow!("Unknown status code"))?;

#[allow(unreachable_patterns)]
match status {
StatusCode::Ack => Ok(()),
StatusCode::InvalidSecret => Err(anyhow!("Invalid Secret")),
StatusCode::FileSizeDiffers => Err(anyhow!("File size differs")),
StatusCode::PermissionDenied => Err(anyhow!("Permission denied")),
StatusCode::ClientAlreadyConnected => Err(anyhow!("A client is already connected")),
_ => Err(anyhow!("Unexpected error")),
StatusCode::UnknownHashAlgorithm => {
Err(anyhow!("Hash algorithm is not supported on remote end"))
}
}
}

Expand Down
23 changes: 16 additions & 7 deletions src/hash_file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::sync::{loan, Loan};
use crate::sync::{ordered_channel, OrderedReceiver};
use crate::HashAlgorithm;
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::path::{Path, PathBuf};
Expand All @@ -15,10 +16,20 @@ pub struct HashStream {
join_handles: Vec<std::thread::JoinHandle<()>>,
}

fn hash_data(data: &[u8], algorithm: HashAlgorithm) -> String {
use md5::{Digest, Md5};

match algorithm {
HashAlgorithm::CRC32 => hex::encode(crc32c::crc32c(data).to_le_bytes()),
HashAlgorithm::MD5 => hex::encode(Md5::digest(data)),
}
}

pub fn hash_file(
path: impl AsRef<Path>,
block_size: usize,
workers: u8,
algorithm: HashAlgorithm,
) -> Result<HashStream, anyhow::Error> {
let path = {
let mut p = PathBuf::new();
Expand Down Expand Up @@ -55,7 +66,6 @@ pub fn hash_file(
let mut buffer = vec![0; block_size];

while offset < file_size {
use md5::{Digest, Md5};
use std::io::Read;

let hash = {
Expand All @@ -75,10 +85,7 @@ pub fn hash_file(
break;
}

{
let hash = Md5::digest(buf);
hex::encode(hash)
}
hash_data(buf, algorithm)
};

let (loaned, reclaimer) = loan(buffer);
Expand Down Expand Up @@ -140,7 +147,8 @@ mod tests {
async fn hashes() {
use super::*;

let mut hasher = hash_file("test/file_to_hash.txt", 10, 2).unwrap();
let mut hasher =
hash_file("test/file_to_hash.txt", 10, 2, crate::HashAlgorithm::MD5).unwrap();

let assert_hash = |file_block_data: FileBlockData, expected_hash| {
assert_eq!(file_block_data.hash, expected_hash);
Expand Down Expand Up @@ -189,7 +197,8 @@ mod tests {
async fn dropped() {
use super::*;

let mut hasher = hash_file("test/file_to_hash.txt", 10, 2).unwrap();
let mut hasher =
hash_file("test/file_to_hash.txt", 10, 2, crate::HashAlgorithm::CRC32).unwrap();

let _ = hasher.recv().await.unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
Expand Down
Loading

0 comments on commit c474802

Please sign in to comment.