Skip to content

Commit

Permalink
Merge commit '2429bf27c798af7be0b759bea1d770152b0f5042'
Browse files Browse the repository at this point in the history
Conflicts:
	rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
  • Loading branch information
yellowhatter committed Jan 6, 2025
2 parents 4d9f122 + 2429bf2 commit 4fd8bce
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 195 deletions.
8 changes: 8 additions & 0 deletions rmw_zenoh_cpp/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Changelog for package rmw_zenoh_cpp
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

0.3.0 (2025-01-02)
------------------
* An alternative middleware for ROS 2 based on Zenoh.
* Contributors: Alejandro Hernández Cordero, Alex Day, Bernd Pfrommer, ChenYing Kuo (CY), Chris Lalancette, Christophe Bedard, CihatAltiparmak, Esteve Fernandez, Franco Cipollone, Geoffrey Biggs, Hans-Martin, James Mount, Julien Enoch, Morgan Quigley, Nate Koenig, Shivang Vijay, Yadunund, Yuyuan Yuan, methylDragon
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>rmw_zenoh_cpp</name>
<version>0.0.1</version>
<version>0.3.0</version>
<description>A ROS 2 middleware implementation using zenoh-cpp</description>
<maintainer email="[email protected]">Yadunund</maintainer>

Expand Down
66 changes: 33 additions & 33 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -52,6 +54,18 @@ rmw_zenoh_event_type_t zenoh_event_from_rmw_event(rmw_event_type_t rmw_event_typ
return ZENOH_EVENT_INVALID;
}

///=============================================================================
rmw_zenoh_event_status_t::rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0),
data(std::string()),
changed(false)
{
// Do nothing.
}

///=============================================================================
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
Expand Down Expand Up @@ -134,35 +148,30 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_status_t EventsManager::take_event_status(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
throw std::runtime_error("Invalid event_type");
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
// Create a copy to return before resetting counters.
auto ret = event_statuses_[event_id];
event_statuses_[event_id].current_count_change = 0;
event_statuses_[event_id].total_count_change = 0;
event_statuses_[event_id].changed = false;
return ret;
}

///=============================================================================
void EventsManager::add_new_event(
void EventsManager::update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
int32_t current_count_change)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -174,24 +183,15 @@ void EventsManager::add_new_event(

{
std::lock_guard<std::mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));
rmw_zenoh_event_status_t & status_to_update = event_statuses_[event_id];
status_to_update.total_count += std::max(0, current_count_change);
status_to_update.total_count_change += std::max(0, current_count_change);
status_to_update.current_count += current_count_change;
status_to_update.current_count_change = current_count_change;
status_to_update.changed = true;
}

// Since we added new data, trigger event callback and guard condition if they are available
// Since we updated data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}
Expand All @@ -211,7 +211,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not(

std::lock_guard<std::mutex> lock(event_condition_mutex_);

if (!event_queues_[event_id].empty()) {
if (event_statuses_[event_id].changed) {
return true;
}

Expand All @@ -235,7 +235,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty

wait_set_data_[event_id] = nullptr;

return event_queues_[event_id].empty();
return !event_statuses_[event_id].changed;
}

///=============================================================================
Expand Down
31 changes: 14 additions & 17 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ struct rmw_zenoh_event_status_t
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;
// A boolean field to indicate if the status changed since the last take.
bool changed;

rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0)
{}
// Constructor.
rmw_zenoh_event_status_t();
};

///=============================================================================
Expand Down Expand Up @@ -110,16 +108,16 @@ class EventsManager
rmw_event_callback_t callback,
const void * user_data);

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);
/// @brief Get the status for an event.
/// @param event_id the id for the event whose status should be retrieved.
rmw_zenoh_event_status_t take_event_status(rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
/// @brief Update the status for an event.
/// @param event_id the id for the event whose status should be changed.
/// @param current_count_change the change in the current count.
void update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);
int32_t current_count_change);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
Expand Down Expand Up @@ -148,9 +146,8 @@ class EventsManager
rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
// Statuses for events supported.
rmw_zenoh_event_status_t event_statuses_[ZENOH_EVENT_ID_MAX + 1];
};
} // namespace rmw_zenoh_cpp

Expand Down
Loading

0 comments on commit 4fd8bce

Please sign in to comment.