diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.cpp b/rmw_zenoh_cpp/src/detail/buffer_pool.cpp new file mode 100644 index 00000000..c95a9865 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.cpp @@ -0,0 +1,104 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "buffer_pool.hpp" + +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/env.h" +#include "logging_macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +BufferPool::BufferPool() +: buffers_(), mutex_() +{ + const char * env_value; + const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); + if (error_str != nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to read maximum buffer pool size, falling back to default."); + max_size_ = DEFAULT_MAX_SIZE; + } else { + max_size_ = std::atoll(env_value); + } + size_ = 0; +} + +///============================================================================= +BufferPool::~BufferPool() +{ + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + for (Buffer & buffer : buffers_) { + allocator.deallocate(buffer.data, allocator.state); + } +} + +///============================================================================= +BufferPool::Buffer BufferPool::allocate(size_t size) +{ + std::lock_guard guard(mutex_); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + if (buffers_.empty()) { + if (size_ + size > max_size_) { + return {}; + } else { + size_ += size; + } + uint8_t * data = static_cast(allocator.allocate(size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + return Buffer {data, size}; + } + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + size_t size_diff = size - buffer.size; + if (size_ + size_diff > max_size_) { + return {}; + } else { + size_ += size_diff; + } + uint8_t * data = static_cast(allocator.reallocate( + buffer.data, size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + buffer.data = data; + buffer.size = size; + } + } + return buffer; + } +} + +///============================================================================= +void BufferPool::deallocate(BufferPool::Buffer buffer) +{ + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); +} + +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..b586ac8b --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,125 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/env.h" +#include "logging_macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +class BufferPool +{ +public: + struct Buffer + { + uint8_t * data; + size_t size; + }; + + BufferPool() + : buffers_(), mutex_() + { + const char * env_value; + const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); + if (error_str != nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to read maximum buffer pool size, falling back to default."); + max_size_ = DEFAULT_MAX_SIZE; + } else { + max_size_ = std::atoll(env_value); + } + size_ = 0; + } + + ~BufferPool() + { + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + for (Buffer & buffer : buffers_) { + allocator.deallocate(buffer.data, allocator.state); + } + } + + Buffer allocate(size_t size) + { + std::lock_guard guard(mutex_); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + if (buffers_.empty()) { + if (size_ + size > max_size_) { + return {}; + } else { + size_ += size; + } + uint8_t * data = static_cast(allocator.allocate(size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + return Buffer {data, size}; + } + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + size_t size_diff = size - buffer.size; + if (size_ + size_diff > max_size_) { + return {}; + } else { + size_ += size_diff; + } + uint8_t * data = static_cast(allocator.reallocate( + buffer.data, size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + buffer.data = data; + buffer.size = size; + } + } + return buffer; + } + } + + void + deallocate(Buffer buffer) + { + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); + } + +private: + std::vector buffers_; + std::mutex mutex_; + size_t max_size_; + size_t size_; + // NOTE(fuzzypixelz): Pooled buffers are recycled with the expectation that they would reside in + // cache, thus this this value should be comparable to the size of a modern CPU cache. The default + // value (16 MiB) is relatively conservative as CPU cache sizes range from a few MiB to a few + // hundred MiB. + const size_t DEFAULT_MAX_SIZE = 16 * 1024 * 1024; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..17932867 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f960b879..32e4ea78 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -216,24 +216,26 @@ rmw_ret_t PublisherData::publish( type_support_impl_); // To store serialized message byte array. - char * msg_bytes = nullptr; - - rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; - - auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); - - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); + uint8_t * msg_bytes = nullptr; + + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // Try to get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool.allocate(max_data_length); + if (serialization_buffer.data == nullptr) { + void * data = allocator.allocate(max_data_length, allocator.state); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "failed to allocate serialization buffer", return RMW_RET_BAD_ALLOC); + msg_bytes = static_cast(data); + } else { + msg_bytes = serialization_buffer.data; + } // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -258,10 +260,19 @@ rmw_ret_t PublisherData::publish( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload; + if (serialization_buffer.data == nullptr) { + std::vector raw_data( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + payload = zenoh::Bytes(std::move(raw_data)); + } else { + auto delete_bytes = [buffer_pool = &context_impl->serialization_buffer_pool, + buffer = serialization_buffer](uint8_t *){ + buffer_pool->deallocate(buffer); + }; + payload = zenoh::Bytes(msg_bytes, data_length, delete_bytes); + } TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp);