diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 83d06139..b98f0308 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp { ///============================================================================= SubscriptionData::Message::Message( - std::vector && p, + const zenoh::Bytes & p, uint64_t recv_ts, AttachmentData && attachment_) -: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } @@ -225,7 +224,7 @@ bool SubscriptionData::init() sub_data->add_new_message( std::make_unique( - sample.get_payload().as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); @@ -303,13 +302,12 @@ bool SubscriptionData::init() "Unable to obtain attachment") return; } - auto payload = sample.get_payload().clone(); auto attachment_value = attachment.value(); AttachmentData attachment_data(attachment_value); sub_data->add_new_message( std::make_unique( - payload.as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index a3fab3f9..37ab0dba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -25,7 +25,6 @@ #include #include #include -#include #include @@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this && p, + const zenoh::Bytes & bytes, uint64_t recv_ts, AttachmentData && attachment); ~Message(); - std::vector payload; + Payload payload; uint64_t recv_timestamp; AttachmentData attachment; }; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 061ceb0f..5770718b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -77,4 +77,55 @@ int64_t get_system_time_in_ns() return std::chrono::duration_cast(now).count(); } +///============================================================================= +Payload::Payload(const zenoh::Bytes & bytes) +{ + // NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of + // buffers contains exactly one element, it is not necessary to concatenate the list of buffers. + // In this case, we store a clone of the bytes object to maintain a non-zero reference-count on + // the buffer. This ensures that the slice into said buffer stays valid until we drop our copy + // of the bytes object (at the very least). This case corresponds to the `Contiguous` + // alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local" + // communication. + + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); + std::optional slice = slices.next(); + if (!slice.has_value()) { + bytes_ = nullptr; + } else { + if (!slices.next().has_value()) { + bytes_ = Contiguous {slice.value(), bytes.clone()}; + } else { + bytes_ = bytes.as_vector(); + } + } +} + +const uint8_t * Payload::data() const +{ + if (std::holds_alternative(bytes_)) { + return nullptr; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).data(); + } else { + return std::get(bytes_).slice.data; + } +} + +size_t Payload::size() const +{ + if (std::holds_alternative(bytes_)) { + return 0; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).size(); + } else { + return std::get(bytes_).slice.len; + } +} + +bool Payload::empty() const +{ + return std::holds_alternative(bytes_); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index a88b132e..dd7cff72 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include "rmw/types.h" @@ -63,6 +66,32 @@ class ZenohQuery final }; int64_t get_system_time_in_ns(); + +class Payload +{ +public: + explicit Payload(const zenoh::Bytes & bytes); + + ~Payload() = default; + + const uint8_t * data() const; + + size_t size() const; + + bool empty() const; + +private: + struct Contiguous + { + zenoh::Slice slice; + zenoh::Bytes bytes; + }; + using NonContiguous = std::vector; + using Empty = std::nullptr_t; + // Is `std::vector` in case of a non-contiguous payload + // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. + std::variant bytes_; +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_