Skip to content

Commit

Permalink
fix(decrypt): prevent extra clones while decrypting chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Dec 19, 2023
1 parent 377c072 commit f3a1b5e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 48 deletions.
43 changes: 11 additions & 32 deletions src/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,31 @@ use bytes::Bytes;
use itertools::Itertools;
use rayon::prelude::*;
use std::io::Cursor;
use std::sync::Arc;
use xor_name::XorName;

pub fn decrypt(src_hashes: Vec<XorName>, encrypted_chunks: Vec<EncryptedChunk>) -> Result<Bytes> {
let src_hashes = Arc::new(src_hashes);
pub fn decrypt(src_hashes: Vec<XorName>, encrypted_chunks: &[&EncryptedChunk]) -> Result<Bytes> {
let num_chunks = encrypted_chunks.len();
let cpus = num_cpus::get();
let batch_size = usize::max(1, (num_chunks as f64 / cpus as f64).ceil() as usize);

let raw_chunks: Vec<(usize, Bytes)> = encrypted_chunks
.chunks(batch_size)
.par_bridge()
.map(|batch| DecryptionBatch {
jobs: batch
.iter()
.map(|c| DecryptionJob {
index: c.index,
encrypted_content: c.content.clone(),
src_hashes: src_hashes.clone(),
})
.collect_vec(),
})
.map(|batch| {
batch
.jobs
let mut decrypted_batch = Vec::with_capacity(batch.len());
let iter = batch
.par_iter()
.map(|c| {
Ok::<(usize, Bytes), Error>((
c.index,
decrypt_chunk(c.index, c.encrypted_content.clone(), c.src_hashes.as_ref())?,
))
// we can pass &src_hashes since Rayon uses scopes under the hood which guarantees that threads are
// joined before src_hashes goes out of scope
let bytes = decrypt_chunk(c.index, &c.content, &src_hashes)?;
Ok::<(usize, Bytes), Error>((c.index, bytes))
})
.collect::<Vec<_>>()
.flatten();
decrypted_batch.par_extend(iter);
decrypted_batch
})
.flatten()
.flatten()
.collect();

if num_chunks > raw_chunks.len() {
Expand All @@ -66,19 +55,9 @@ pub fn decrypt(src_hashes: Vec<XorName>, encrypted_chunks: Vec<EncryptedChunk>)
Ok(raw_data)
}

struct DecryptionBatch {
jobs: Vec<DecryptionJob>,
}

struct DecryptionJob {
index: usize,
encrypted_content: Bytes,
src_hashes: Arc<Vec<XorName>>,
}

pub(crate) fn decrypt_chunk(
chunk_number: usize,
content: Bytes,
content: &Bytes,
chunk_hashes: &[XorName],
) -> Result<Bytes> {
let (pad, key, iv) = get_pad_key_and_iv(chunk_number, chunk_hashes);
Expand Down
2 changes: 1 addition & 1 deletion src/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes
)
.map_err(|_| Error::Compression)?;
let encrypted = encryption::encrypt(Bytes::from(compressed), &key, &iv)?;
Ok(xor(encrypted, &pad))
Ok(xor(&encrypted, &pad))
}
25 changes: 10 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl StreamSelfDecryptor {
pub fn next_encrypted(&mut self, encrypted_chunk: EncryptedChunk) -> Result<bool> {
if encrypted_chunk.index == self.chunk_index {
let decrypted_content =
decrypt_chunk(self.chunk_index, encrypted_chunk.content, &self.src_hashes)?;
decrypt_chunk(self.chunk_index, &encrypted_chunk.content, &self.src_hashes)?;
self.append_to_file(&decrypted_content)?;

self.chunk_index += 1;
Expand Down Expand Up @@ -344,7 +344,7 @@ impl StreamSelfDecryptor {
let _ = chunk_file.read_to_end(&mut chunk_data)?;

let decrypted_content =
decrypt_chunk(self.chunk_index, chunk_data.into(), &self.src_hashes)?;
decrypt_chunk(self.chunk_index, &chunk_data.into(), &self.src_hashes)?;
self.append_to_file(&decrypted_content)?;

self.chunk_index += 1;
Expand Down Expand Up @@ -426,12 +426,9 @@ pub fn encrypt(bytes: Bytes) -> Result<(DataMap, Vec<EncryptedChunk>)> {
/// Decrypts what is expected to be the full set of chunks covered by the data map.
pub fn decrypt_full_set(data_map: &DataMap, chunks: &[EncryptedChunk]) -> Result<Bytes> {
let src_hashes = extract_hashes(data_map);
let sorted_chunks = chunks
.iter()
.sorted_by_key(|c| c.index)
.cloned() // should not be needed, something is wrong here, the docs for sorted_by_key says it will return owned items...!
.collect_vec();
decrypt::decrypt(src_hashes, sorted_chunks)
let mut sorted_chunks = Vec::with_capacity(chunks.len());
sorted_chunks.extend(chunks.iter().sorted_by_key(|c| c.index));
decrypt::decrypt(src_hashes, &sorted_chunks)
}

/// Decrypts a range, used when seeking.
Expand All @@ -444,12 +441,10 @@ pub fn decrypt_range(
len: usize,
) -> Result<Bytes> {
let src_hashes = extract_hashes(data_map);
let encrypted_chunks = chunks
.iter()
.sorted_by_key(|c| c.index)
.cloned()
.collect_vec();
let mut bytes = decrypt::decrypt(src_hashes, encrypted_chunks)?;
let mut sorted_chunks = Vec::with_capacity(chunks.len());
sorted_chunks.extend(chunks.iter().sorted_by_key(|c| c.index));

let mut bytes = decrypt::decrypt(src_hashes, &sorted_chunks)?;

if relative_pos >= bytes.len() {
return Ok(Bytes::new());
Expand All @@ -463,7 +458,7 @@ pub fn decrypt_range(
}

/// Helper function to XOR a data with a pad (pad will be rotated to fill the length)
pub(crate) fn xor(data: Bytes, &Pad(pad): &Pad) -> Bytes {
pub(crate) fn xor(data: &Bytes, &Pad(pad): &Pad) -> Bytes {
let vec: Vec<_> = data
.iter()
.zip(pad.iter().cycle())
Expand Down

0 comments on commit f3a1b5e

Please sign in to comment.