Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM: optimize metadata #1714

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 10 additions & 57 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo,
};

use crate::{RCodec, WCodec, Zenoh080};

impl<W> WCodec<&Descriptor, &mut W> for Zenoh080
impl<W> WCodec<&MetadataDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Descriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index_and_bitpos)?;
Ok(())
}
}

impl<W> WCodec<&HeaderDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &HeaderDescriptor) -> Self::Output {
fn write(self, writer: &mut W, x: &MetadataDescriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index)?;
Ok(())
Expand Down Expand Up @@ -84,52 +70,29 @@ where

fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output {
let ShmBufInfo {
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
metadata,
generation,
} = x;

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, metadata)?;
self.write(&mut *writer, generation)?;
Ok(())
}
}

impl<R> RCodec<Descriptor, &mut R> for Zenoh080
impl<R> RCodec<MetadataDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Descriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index_and_bitpos = self.read(&mut *reader)?;

Ok(Descriptor {
id,
index_and_bitpos,
})
}
}

impl<R> RCodec<HeaderDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<HeaderDescriptor, Self::Error> {
fn read(self, reader: &mut R) -> Result<MetadataDescriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index = self.read(&mut *reader)?;

Ok(HeaderDescriptor { id, index })
Ok(MetadataDescriptor { id, index })
}
}

Expand Down Expand Up @@ -172,21 +135,11 @@ where
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<ShmBufInfo, Self::Error> {
let data_descriptor = self.read(&mut *reader)?;
let shm_protocol = self.read(&mut *reader)?;
let data_len = self.read(&mut *reader)?;
let watchdog_descriptor = self.read(&mut *reader)?;
let header_descriptor = self.read(&mut *reader)?;
let metadata = self.read(&mut *reader)?;
let generation = self.read(&mut *reader)?;

let shm_info = ShmBufInfo::new(
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
generation,
);
let shm_info = ShmBufInfo::new(data_len, metadata, generation);
Ok(shm_info)
}
}
13 changes: 2 additions & 11 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,13 @@ fn codec_encoding() {
#[cfg(feature = "shared-memory")]
#[test]
fn codec_shm_info() {
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
};
use zenoh_shm::{metadata::descriptor::MetadataDescriptor, ShmBufInfo};

run!(ShmBufInfo, {
let mut rng = rand::thread_rng();
ShmBufInfo::new(
ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
rng.gen(),
Descriptor {
id: rng.gen(),
index_and_bitpos: rng.gen(),
},
HeaderDescriptor {
MetadataDescriptor {
id: rng.gen(),
index: rng.gen(),
},
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID};
/// Uniquely identifies the particular chunk within particular segment
#[zenoh_macros::unstable_doc]
#[derive(Clone, Debug, PartialEq, Eq)]
#[stabby::stabby]
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
Expand Down
133 changes: 52 additions & 81 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ use super::{
};
use crate::{
api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID},
header::{
allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor,
storage::GLOBAL_HEADER_STORAGE,
metadata::{
allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
storage::GLOBAL_METADATA_STORAGE,
},
watchdog::{
allocated_watchdog::AllocatedWatchdog,
confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
descriptor::Descriptor,
storage::GLOBAL_STORAGE,
validator::GLOBAL_VALIDATOR,
},
ShmBufInfo, ShmBufInner,
Expand All @@ -53,20 +50,14 @@ use crate::{
#[derive(Debug)]
struct BusyChunk {
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
_watchdog: AllocatedWatchdog,
metadata: AllocatedMetadataDescriptor,
}

impl BusyChunk {
fn new(
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
watchdog: AllocatedWatchdog,
) -> Self {
fn new(descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor) -> Self {
Self {
descriptor,
header,
_watchdog: watchdog,
metadata,
}
}
}
Expand Down Expand Up @@ -822,16 +813,10 @@ where
let len = len.try_into()?;

// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// wrap everything to ShmBufInner
let wrapped = self.wrap(
chunk,
len,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

Expand All @@ -840,7 +825,7 @@ where
#[zenoh_macros::unstable_doc]
pub fn garbage_collect(&self) -> usize {
fn is_free_chunk(chunk: &BusyChunk) -> bool {
let header = chunk.header.descriptor.header();
let header = chunk.metadata.header();
if header.refcount.load(Ordering::SeqCst) != 0 {
return header.watchdog_invalidated.load(Ordering::SeqCst);
}
Expand Down Expand Up @@ -891,7 +876,7 @@ where
Policy: AllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -902,82 +887,74 @@ where
let chunk = Policy::alloc(layout, self)?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

fn alloc_resources() -> ZResult<(
AllocatedHeaderDescriptor,
AllocatedWatchdog,
ConfirmedDescriptor,
)> {
// allocate shared header
let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?;

// allocate watchdog
let allocated_watchdog = GLOBAL_STORAGE.read().allocate_watchdog()?;
fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> {
// allocate metadata
let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;

// add watchdog to confirmator
let confirmed_watchdog = GLOBAL_CONFIRMATOR
.read()
.add_owned(&allocated_watchdog.descriptor)?;
let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());

Ok((allocated_header, allocated_watchdog, confirmed_watchdog))
Ok((allocated_metadata, confirmed_metadata))
}

fn wrap(
&self,
chunk: AllocatedChunk,
len: NonZeroUsize,
allocated_header: AllocatedHeaderDescriptor,
allocated_watchdog: AllocatedWatchdog,
confirmed_watchdog: ConfirmedDescriptor,
allocated_metadata: AllocatedMetadataDescriptor,
confirmed_metadata: ConfirmedDescriptor,
) -> ShmBufInner {
let header = allocated_header.descriptor.clone();
let descriptor = Descriptor::from(&allocated_watchdog.descriptor);
// write additional metadata
// chunk descriptor
allocated_metadata
.header()
.segment
.store(chunk.descriptor.segment, Ordering::Relaxed);
allocated_metadata
.header()
.chunk
.store(chunk.descriptor.chunk, Ordering::Relaxed);
allocated_metadata
.header()
.len
.store(chunk.descriptor.len.into(), Ordering::Relaxed);
// protocol
allocated_metadata
.header()
.protocol
.store(self.id.id(), Ordering::Relaxed);

// add watchdog to validator
let c_header = header.clone();
GLOBAL_VALIDATOR.read().add(
allocated_watchdog.descriptor.clone(),
Box::new(move || {
c_header
.header()
.watchdog_invalidated
.store(true, Ordering::SeqCst);
}),
);
GLOBAL_VALIDATOR
.read()
.add(confirmed_metadata.owned.clone());

// Create buffer's info
let info = ShmBufInfo::new(
chunk.descriptor.clone(),
self.id.id(),
len,
descriptor,
HeaderDescriptor::from(&header),
header.header().generation.load(Ordering::SeqCst),
MetadataDescriptor::from(&confirmed_metadata.owned),
allocated_metadata
.header()
.generation
.load(Ordering::SeqCst),
);

// Create buffer
let shmb = ShmBufInner {
header,
metadata: Arc::new(confirmed_metadata),
buf: chunk.data,
info,
watchdog: Arc::new(confirmed_watchdog),
};

// Create and store busy chunk
self.busy_list.lock().unwrap().push_back(BusyChunk::new(
chunk.descriptor,
allocated_header,
allocated_watchdog,
));
self.busy_list
.lock()
.unwrap()
.push_back(BusyChunk::new(chunk.descriptor, allocated_metadata));

shmb
}
Expand All @@ -998,7 +975,7 @@ where
Policy: AsyncAllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -1009,13 +986,7 @@ where
let chunk = Policy::alloc_async(backend_layout, self).await?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}
}
10 changes: 9 additions & 1 deletion commons/zenoh-shm/src/header/chunk_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};

// Chunk header
#[stabby::stabby]
Expand All @@ -25,4 +25,12 @@ pub struct ChunkHeaderType {
pub refcount: AtomicU32,
pub watchdog_invalidated: AtomicBool,
pub generation: AtomicU32,

/// Protocol identifier for particular SHM implementation
pub protocol: AtomicU32,

/// The data chunk descriptor
pub segment: AtomicU32,
pub chunk: AtomicU32,
pub len: AtomicUsize,
}
Loading
Loading