-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recycle serialization buffers on transmission #342
base: rolling
Are you sure you want to change the base?
Changes from all commits
6b7cb5c
4226283
70f3a7d
52e8009
2fc9f69
6143959
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -434,3 +434,20 @@ Thus, there is no direct implementation of actions in `rmw_zenoh_cpp`. | |||||||
## Security | ||||||||
|
||||||||
TBD | ||||||||
|
||||||||
## Environment variables | ||||||||
|
||||||||
### `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` | ||||||||
|
||||||||
The RMW recycles serialization buffers on transmission using a buffer pool with bounded memory | ||||||||
usage. These buffers are returned to the pool — without being deallocated — once they cross the | ||||||||
network boundary in host-to-host communication, or after transmission in inter-process | ||||||||
communication, or upon being consumed by subscriptions in intra-process communication, etc. | ||||||||
|
||||||||
When the total size of the allocated buffers within the pool exceeds | ||||||||
`RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES`, serialization buffers are allocated using the system | ||||||||
allocator and moved to Zenoh; no recyling is performed in this case to prevent the buffer pool from | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
growing uncontrollably. | ||||||||
|
||||||||
The default value of `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` is roughly proportionate to the cache | ||||||||
size of a typical modern CPU. | ||||||||
Comment on lines
+452
to
+453
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
// Copyright 2024 Open Source Robotics Foundation, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include "buffer_pool.hpp" | ||
|
||
#include <cstddef> | ||
#include <cstdlib> | ||
#include <mutex> | ||
#include <vector> | ||
|
||
#include "rcutils/allocator.h" | ||
#include "rcutils/env.h" | ||
#include "logging_macros.hpp" | ||
|
||
namespace rmw_zenoh_cpp | ||
{ | ||
///============================================================================= | ||
BufferPool::BufferPool() | ||
: buffers_(), mutex_() | ||
{ | ||
const char * env_value; | ||
const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); | ||
fuzzypixelz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (error_str != nullptr) { | ||
RMW_ZENOH_LOG_ERROR_NAMED( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewhat pedantic, but I think this should be a WARN since we are continuing on anyway. |
||
"rmw_zenoh_cpp", | ||
"Unable to read maximum buffer pool size, falling back to default."); | ||
max_size_ = DEFAULT_MAX_SIZE; | ||
} else if (strcmp(env_value, "") == 0) { | ||
max_size_ = DEFAULT_MAX_SIZE; | ||
} else { | ||
max_size_ = std::atoll(env_value); | ||
} | ||
size_ = 0; | ||
} | ||
|
||
///============================================================================= | ||
BufferPool::~BufferPool() | ||
{ | ||
rcutils_allocator_t allocator = rcutils_get_default_allocator(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to require some additional plumbing, but I think we should respect the allocator passed in via the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should be using the allocator passed in
We also allocate many I can make a subsequent pull request addressing this issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The thing with the allocators is that always using Memory allocation is a tricky subject here. The original goal of the @Yadunund What's your thinking here? Should we give up on the |
||
|
||
for (Buffer & buffer : buffers_) { | ||
allocator.deallocate(buffer.data, allocator.state); | ||
} | ||
} | ||
|
||
///============================================================================= | ||
BufferPool::Buffer BufferPool::allocate(size_t size) | ||
{ | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
|
||
rcutils_allocator_t allocator = rcutils_get_default_allocator(); | ||
|
||
if (buffers_.empty()) { | ||
if (size_ + size > max_size_) { | ||
return {}; | ||
} else { | ||
size_ += size; | ||
} | ||
uint8_t * data = static_cast<uint8_t *>(allocator.allocate(size, allocator.state)); | ||
if (data == nullptr) { | ||
return {}; | ||
} else { | ||
return Buffer {data, size}; | ||
} | ||
} else { | ||
Buffer buffer = buffers_.back(); | ||
buffers_.pop_back(); | ||
if (buffer.size < size) { | ||
size_t size_diff = size - buffer.size; | ||
if (size_ + size_diff > max_size_) { | ||
return {}; | ||
} | ||
uint8_t * data = static_cast<uint8_t *>(allocator.reallocate( | ||
buffer.data, size, allocator.state)); | ||
if (data == nullptr) { | ||
return {}; | ||
} | ||
size_ += size_diff; | ||
buffer.data = data; | ||
buffer.size = size; | ||
} | ||
return buffer; | ||
} | ||
} | ||
|
||
///============================================================================= | ||
void BufferPool::deallocate(BufferPool::Buffer buffer) | ||
{ | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
buffers_.push_back(buffer); | ||
} | ||
|
||
} // namespace rmw_zenoh_cpp |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
// Copyright 2024 Open Source Robotics Foundation, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#ifndef DETAIL__BUFFER_POOL_HPP_ | ||
#define DETAIL__BUFFER_POOL_HPP_ | ||
|
||
#include <cstddef> | ||
#include <cstdlib> | ||
#include <mutex> | ||
#include <vector> | ||
|
||
#include "rcutils/allocator.h" | ||
#include "rcutils/env.h" | ||
#include "logging_macros.hpp" | ||
|
||
namespace rmw_zenoh_cpp | ||
{ | ||
///============================================================================= | ||
class BufferPool | ||
{ | ||
public: | ||
struct Buffer | ||
{ | ||
uint8_t * data; | ||
size_t size; | ||
}; | ||
|
||
BufferPool(); | ||
|
||
~BufferPool(); | ||
|
||
Buffer allocate(size_t size); | ||
|
||
void deallocate(Buffer buffer); | ||
|
||
private: | ||
std::vector<Buffer> buffers_; | ||
std::mutex mutex_; | ||
size_t max_size_; | ||
size_t size_; | ||
// NOTE(fuzzypixelz): Pooled buffers are recycled with the expectation that they would reside in | ||
// cache, thus this this value should be comparable to the size of a modern CPU cache. The default | ||
// value (16 MiB) is relatively conservative as CPU cache sizes range from a few MiB to a few | ||
// hundred MiB. | ||
const size_t DEFAULT_MAX_SIZE = 16 * 1024 * 1024; | ||
}; | ||
} // namespace rmw_zenoh_cpp | ||
|
||
#endif // DETAIL__BUFFER_POOL_HPP_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.