Skip to content

Commit

Permalink
Merge pull request eclipse-zenoh#231 from eclipse-zenoh/encoding_seri…
Browse files Browse the repository at this point in the history
…alization_remove

use own encoding and timestamp serializatiion
  • Loading branch information
milyin authored Oct 9, 2024
2 parents c1bdb1a + 313fe99 commit e37c3b5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 71 deletions.
92 changes: 27 additions & 65 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 57 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ use async_trait::async_trait;
use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
use tokio::sync::Mutex;
use tracing::{debug, error, trace, warn};
use uhlc::NTP64;
use zenoh::{
bytes::{Encoding, ZBytes},
internal::{bail, zenoh_home, zerror, Value},
internal::{bail, buffers::ZSlice, zenoh_home, zerror, Value},
key_expr::OwnedKeyExpr,
query::Parameters,
time::Timestamp,
time::{Timestamp, TimestampId, NTP64},
try_init_log_from_env, Error, Result as ZResult,
};
use zenoh_backend_traits::{
Expand Down Expand Up @@ -500,16 +499,68 @@ fn get_kv(db: &DB, key: Option<OwnedKeyExpr>) -> ZResult<Option<(Value, Timestam
}
}

struct DataInfo {
pub timestamp: Timestamp,
pub deleted: bool,
pub encoding: Encoding,
}

type DataInfoTuple = (u64, [u8; 16], bool, u16, Vec<u8>);

impl DataInfo {
pub fn as_tuple(&self) -> DataInfoTuple {
let timestamp_time = self.timestamp.get_time().as_u64();
let timestamp_id = self.timestamp.get_id().to_le_bytes();
let encoding_id = self.encoding.id();
let encoding_schema = self
.encoding
.schema()
.map(|s| s.to_vec())
.unwrap_or_default();
let deleted = self.deleted;
(
timestamp_time,
timestamp_id,
deleted,
encoding_id,
encoding_schema,
)
}
pub fn from_tuple(
(timestamp_time, timestamp_id, deleted, encoding_id, encoding_schema): DataInfoTuple,
) -> ZResult<Self> {
let timestamp_id = TimestampId::try_from(timestamp_id)?;
let timestamp = Timestamp::new(NTP64(timestamp_time), timestamp_id);
let encoding_schema = if encoding_schema.is_empty() {
None
} else {
Some(ZSlice::from(encoding_schema))
};
let encoding = Encoding::new(encoding_id, encoding_schema);
Ok(DataInfo {
timestamp,
deleted,
encoding,
})
}
}

fn encode_data_info(encoding: Encoding, timestamp: &Timestamp, deleted: bool) -> ZResult<Vec<u8>> {
let bytes = z_serialize(&(encoding, deleted, timestamp));
let data_info = DataInfo {
timestamp: *timestamp,
deleted,
encoding,
};
let bytes = z_serialize(&data_info.as_tuple());
Ok(bytes.to_bytes().into_owned())
}

fn decode_data_info(buf: &[u8]) -> ZResult<(Encoding, Timestamp, bool)> {
let bytes = ZBytes::from(buf);
let (encoding, deleted, timestamp) = z_deserialize(&bytes)
let tuple: DataInfoTuple = z_deserialize(&bytes)
.map_err(|_| zerror!("Failed to decode data-info (encoding, deleted, timestamp)"))?;
Ok((encoding, timestamp, deleted))
let data_info = DataInfo::from_tuple(tuple)?;
Ok((data_info.encoding, data_info.timestamp, data_info.deleted))
}

fn rocksdb_err_to_zerr(err: rocksdb::Error) -> Error {
Expand Down

0 comments on commit e37c3b5

Please sign in to comment.