Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl RleV1Encoder for integer #37

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl ByteRleEncoder {
self.tail_run_length = 1;
} else if let Some(run_value) = self.run_value {
// Run mode

if value == run_value {
// Continue buffering for Run sequence, flushing if reaching max length
self.num_literals += 1;
Expand Down
10 changes: 9 additions & 1 deletion src/encoding/integer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,15 @@ pub trait VarintSerde: PrimInt + CheckedShl + BitOrAssign + Signed {
/// Helps generalise the decoder efforts to be specific to supported integers.
/// (Instead of decoding to u64/i64 for all then downcasting).
pub trait NInt:
VarintSerde + ShlAssign<usize> + fmt::Debug + fmt::Display + fmt::Binary + Send + Sync + 'static
VarintSerde
+ ShlAssign<usize>
+ fmt::Debug
+ fmt::Display
+ fmt::Binary
+ Default
+ Send
+ Sync
+ 'static
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
{
type Bytes: AsRef<[u8]> + AsMut<[u8]> + Default + Clone + Copy + fmt::Debug;

Expand Down
193 changes: 175 additions & 18 deletions src/encoding/integer/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@

use std::{io::Read, marker::PhantomData};

use bytes::{BufMut, BytesMut};
use snafu::OptionExt;

use crate::{
encoding::{
rle::GenericRle,
util::{read_u8, try_read_u8},
PrimitiveValueEncoder,
},
error::{OutOfSpecSnafu, Result},
memory::EstimateMemory,
};

use super::{util::read_varint_zigzagged, EncodingSign, NInt};
use super::{
util::{read_varint_zigzagged, write_varint_zigzagged},
EncodingSign, NInt,
};

const MAX_RUN_LENGTH: usize = 130;
const MIN_RUN_LENGTH: usize = 3;
const MAX_RUN_LENGTH: usize = 127 + MIN_RUN_LENGTH;
const MAX_LITERAL_LENGTH: usize = 128;
const MAX_DELTA: i64 = 127;
const MIN_DELTA: i64 = -128;
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum EncodingType {
Expand All @@ -57,6 +67,16 @@ impl EncodingType {
};
Ok(opt_encoding)
}

/// Encode header to write
fn to_header(&self, writer: &mut BytesMut) {
if let Self::Run { length, delta } = *self {
writer.put_u8(length as u8 - 3);
writer.put_u8(delta as u8);
} else if let Self::Literals { length } = *self {
writer.put_u8(-(length as i8) as u8);
}
}
}

/// Decodes a stream of Integer Run Length Encoded version 1 bytes.
Expand Down Expand Up @@ -147,6 +167,139 @@ impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S>
}
}

pub struct RleV1Encoder<N: NInt, S: EncodingSign> {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
writer: BytesMut,
/// Literal values to encode.
literals: [N; MAX_LITERAL_LENGTH],
/// Represents the number of elements currently in `literals` if Literals,
/// otherwise represents the length of the Run.
num_literals: usize,
/// Tracks if current Literal sequence will turn into a Run sequence due to
/// repeated values at the end of the value sequence.
tail_run_length: usize,
/// If in Run sequence or not, and keeps the corresponding value.
run_value: Option<N>,
/// The delta value keeped now
run_delta: i64,
sign: PhantomData<S>,
}

impl<N: NInt, S: EncodingSign> RleV1Encoder<N, S> {
// Algorithm adapted from:
// https://github.com/apache/orc/blob/main/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
fn process_value(&mut self, value: N) {
if self.num_literals == 0 {
self.literals[0] = value;
self.num_literals = 1;
self.tail_run_length = 1;
} else if let Some(run_value) = self.run_value {
if value.as_i64() == run_value.as_i64() + self.run_delta * self.num_literals as i64 {
self.num_literals += 1;
if self.num_literals == MAX_RUN_LENGTH {
self.flush();
}
} else {
self.flush();
self.literals[self.num_literals] = value;
self.num_literals += 1;
self.tail_run_length = 1;
}
} else {
if self.tail_run_length == 1 {
self.run_delta = (value - self.literals[self.num_literals - 1]).as_i64();
if self.run_delta < MIN_DELTA || self.run_delta > MAX_DELTA {
self.tail_run_length = 1;
} else {
self.tail_run_length = 2;
}
} else if value.as_i64()
== self.literals[self.num_literals - 1].as_i64() + self.run_delta
{
self.tail_run_length += 1;
} else {
self.run_delta = (value - self.literals[self.num_literals - 1]).as_i64();
if self.run_delta < MIN_DELTA || self.run_delta > MAX_DELTA {
self.tail_run_length = 1;
} else {
self.tail_run_length = 2;
}
}
if self.tail_run_length == MIN_RUN_LENGTH {
if self.num_literals + 1 == MIN_RUN_LENGTH {
self.run_value = Some(self.literals[0]);
self.num_literals += 1;
} else {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
self.num_literals -= MIN_RUN_LENGTH - 1;
let run_value = self.literals[self.num_literals];
self.flush();
self.run_value = Some(run_value);
self.num_literals = MIN_RUN_LENGTH;
}
} else {
self.literals[self.num_literals] = value;
self.num_literals += 1;
if self.num_literals == MAX_LITERAL_LENGTH {
self.flush();
}
}
}
}

// Flush values to witer and reset state
fn flush(&mut self) {
if self.num_literals > 0 {
if let Some(run_value) = self.run_value {
let header = EncodingType::Run {
length: self.num_literals,
delta: self.run_delta as i8,
};
header.to_header(&mut self.writer);
write_varint_zigzagged::<_, S>(&mut self.writer, run_value);
} else {
let header = EncodingType::Literals {
length: self.num_literals,
};
header.to_header(&mut self.writer);
for i in 0..self.num_literals {
write_varint_zigzagged::<_, S>(&mut self.writer, self.literals[i]);
}
}
}
self.run_value = None;
self.num_literals = 0;
self.tail_run_length = 0;
}
}

impl<N: NInt, S: EncodingSign> EstimateMemory for RleV1Encoder<N, S> {
fn estimate_memory_size(&self) -> usize {
self.writer.len()
}
}

impl<N: NInt, S: EncodingSign> PrimitiveValueEncoder<N> for RleV1Encoder<N, S> {
fn new() -> Self {
Self {
writer: BytesMut::new(),
sign: Default::default(),
literals: [N::default(); MAX_LITERAL_LENGTH],
num_literals: 0,
tail_run_length: 0,
run_value: None,
run_delta: 0,
}
}

fn write_one(&mut self, value: N) {
self.process_value(value);
}

fn take_inner(&mut self) -> bytes::Bytes {
self.flush();
std::mem::take(&mut self.writer).into()
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand All @@ -155,32 +308,36 @@ mod tests {

use super::*;

fn test_helper(data: &[u8], expected: &[i64]) {
let mut reader = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(data));
let mut actual = vec![0; expected.len()];
reader.decode(&mut actual).unwrap();
assert_eq!(actual, expected);
fn test_helper(original: &[i64], encoded: &[u8]) {
let mut encoder = RleV1Encoder::<i64, UnsignedEncoding>::new();
encoder.write_slice(original);
encoder.flush();
let actual_encoded = encoder.take_inner();
assert_eq!(actual_encoded, encoded);

let mut decoder = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(encoded));
let mut actual_decoded = vec![0; original.len()];
decoder.decode(&mut actual_decoded).unwrap();
assert_eq!(actual_decoded, original);
}

#[test]
fn test_run() -> Result<()> {
let data = [0x61, 0x00, 0x07];
let expected = [7; 100];
test_helper(&data, &expected);

let data = [0x61, 0xff, 0x64];
let expected = (1..=100).rev().collect::<Vec<_>>();
test_helper(&data, &expected);
let original = [7; 100];
let encoded = [0x61, 0x00, 0x07];
test_helper(&original, &encoded);

let original = (1..=100).rev().collect::<Vec<_>>();
let encoded = [0x61, 0xff, 0x64];
test_helper(&original, &encoded);
Ok(())
}

#[test]
fn test_literal() -> Result<()> {
let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
let expected = vec![2, 3, 6, 7, 11];
test_helper(&data, &expected);

let original = vec![2, 3, 6, 7, 11];
let encoded = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
test_helper(&original, &encoded);
Ok(())
}
}