diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index d03560d6..dff88118 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include +#include #include #include @@ -52,6 +54,18 @@ rmw_zenoh_event_type_t zenoh_event_from_rmw_event(rmw_event_type_t rmw_event_typ return ZENOH_EVENT_INVALID; } +///============================================================================= +rmw_zenoh_event_status_t::rmw_zenoh_event_status_t() +: total_count(0), + total_count_change(0), + current_count(0), + current_count_change(0), + data(std::string()), + changed(false) +{ + // Do nothing. +} + ///============================================================================= void DataCallbackManager::set_callback( const void * user_data, rmw_event_callback_t callback) @@ -134,7 +148,7 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id) } ///============================================================================= -std::unique_ptr EventsManager::pop_next_event( +rmw_zenoh_event_status_t EventsManager::take_event_status( rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { @@ -142,27 +156,22 @@ std::unique_ptr EventsManager::pop_next_event( "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " "Report this bug.", event_id); - return nullptr; + throw std::runtime_error("Invalid event_type"); } std::lock_guard lock(event_mutex_); - - if (event_queues_[event_id].empty()) { - // This tells rcl that the check for a new events was done, but no events have come in yet. - return nullptr; - } - - std::unique_ptr event_status = - std::move(event_queues_[event_id].front()); - event_queues_[event_id].pop_front(); - - return event_status; + // Create a copy to return before resetting counters. + auto ret = event_statuses_[event_id]; + event_statuses_[event_id].current_count_change = 0; + event_statuses_[event_id].total_count_change = 0; + event_statuses_[event_id].changed = false; + return ret; } ///============================================================================= -void EventsManager::add_new_event( +void EventsManager::update_event_status( rmw_zenoh_event_type_t event_id, - std::unique_ptr event) + int32_t current_count_change) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -174,24 +183,15 @@ void EventsManager::add_new_event( { std::lock_guard lock(event_mutex_); - - std::deque> & event_queue = event_queues_[event_id]; - if (event_queue.size() >= event_queue_depth_) { - // Log warning if message is discarded due to hitting the queue depth - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "Event queue depth of %ld reached, discarding oldest message " - "for event type %d", - event_queue_depth_, - event_id); - - event_queue.pop_front(); - } - - event_queue.emplace_back(std::move(event)); + rmw_zenoh_event_status_t & status_to_update = event_statuses_[event_id]; + status_to_update.total_count += std::max(0, current_count_change); + status_to_update.total_count_change += std::max(0, current_count_change); + status_to_update.current_count += current_count_change; + status_to_update.current_count_change = current_count_change; + status_to_update.changed = true; } - // Since we added new data, trigger event callback and guard condition if they are available + // Since we updated data, trigger event callback and guard condition if they are available trigger_event_callback(event_id); notify_event(event_id); } @@ -211,7 +211,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not( std::lock_guard lock(event_condition_mutex_); - if (!event_queues_[event_id].empty()) { + if (event_statuses_[event_id].changed) { return true; } @@ -235,7 +235,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty wait_set_data_[event_id] = nullptr; - return event_queues_[event_id].empty(); + return !event_statuses_[event_id].changed; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index f6cdb1a2..580a4304 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -63,13 +63,11 @@ struct rmw_zenoh_event_status_t int32_t current_count_change; // The data field can be used to store serialized information for more complex statuses. std::string data; + // A boolean field to indicate if the status changed since the last take. + bool changed; - rmw_zenoh_event_status_t() - : total_count(0), - total_count_change(0), - current_count(0), - current_count_change(0) - {} + // Constructor. + rmw_zenoh_event_status_t(); }; ///============================================================================= @@ -110,16 +108,16 @@ class EventsManager rmw_event_callback_t callback, const void * user_data); - /// Pop the next event in the queue. - /// @param event_id the event id whose queue should be popped. - std::unique_ptr pop_next_event( - rmw_zenoh_event_type_t event_id); + /// @brief Get the status for an event. + /// @param event_id the id for the event whose status should be retrieved. + rmw_zenoh_event_status_t take_event_status(rmw_zenoh_event_type_t event_id); - /// Add an event status for an event. - /// @param event_id the event id queue to which the status should be added. - void add_new_event( + /// @brief Update the status for an event. + /// @param event_id the id for the event whose status should be changed. + /// @param current_count_change the change in the current count. + void update_event_status( rmw_zenoh_event_type_t event_id, - std::unique_ptr event); + int32_t current_count_change); /// @brief Attach the condition variable provided by rmw_wait. /// @param condition_variable to attach. @@ -148,9 +146,8 @@ class EventsManager rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0}; - // A dequeue of events for each type of event this RMW supports. - std::deque> event_queues_[ZENOH_EVENT_ID_MAX + 1] {}; - const std::size_t event_queue_depth_ = 10; + // Statuses for events supported. + rmw_zenoh_event_status_t event_statuses_[ZENOH_EVENT_ID_MAX + 1]; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index a1af2d46..31b2f5bd 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include @@ -207,8 +206,6 @@ void GraphCache::handle_matched_events_for_put( } const liveliness::TopicInfo topic_info = entity->topic_info().value(); const bool is_pub = is_entity_pub(*entity); - // Initialize a map that will be populated with any QoS events that may be detected. - EntityEventMap local_entities_with_events = {}; // The entity added may be local with callbacks registered but there // may be other local entities in the graph that are matched. int32_t match_count_for_entity = 0; @@ -229,18 +226,16 @@ void GraphCache::handle_matched_events_for_put( if (entity->topic_info()->topic_keyexpr_ == sub_entity->topic_info().value().topic_keyexpr_) { - update_event_counters(topic_info.name_, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1); if (is_entity_local(*sub_entity)) { - local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + update_event_counters(entity, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1); } } } - // Update event counters for the new entity-> - update_event_counters(topic_info.name_, - ZENOH_EVENT_PUBLICATION_MATCHED, - match_count_for_entity); + // Update event counters for the new entity. if (is_entity_local(*entity) && match_count_for_entity > 0) { - local_entities_with_events[entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + update_event_counters(entity, + ZENOH_EVENT_PUBLICATION_MATCHED, + match_count_for_entity); } } else { // Entity is a sub. @@ -259,23 +254,20 @@ void GraphCache::handle_matched_events_for_put( if (entity->topic_info()->topic_keyexpr_ == pub_entity->topic_info().value().topic_keyexpr_) { - update_event_counters(topic_info.name_, ZENOH_EVENT_PUBLICATION_MATCHED, 1); if (is_entity_local(*pub_entity)) { - local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + update_event_counters(pub_entity, ZENOH_EVENT_PUBLICATION_MATCHED, 1); } } } - // Update event counters for the new entity-> - update_event_counters( - topic_info.name_, - ZENOH_EVENT_SUBSCRIPTION_MATCHED, - match_count_for_entity); + // Update event counters for the new entity. if (is_entity_local(*entity) && match_count_for_entity > 0) { - local_entities_with_events[entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + update_event_counters( + entity, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + match_count_for_entity); } } } - take_entities_with_events(local_entities_with_events); } ///============================================================================= @@ -290,17 +282,15 @@ void GraphCache::handle_matched_events_for_del( return; } const liveliness::TopicInfo topic_info = entity->topic_info().value(); - EntityEventMap local_entities_with_events; if (is_entity_pub(*entity)) { // Notify any local subs of a matched event with change -1. for (const auto & [_, topic_data_ptr] : topic_qos_map) { for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) { - update_event_counters( - topic_info.name_, - ZENOH_EVENT_SUBSCRIPTION_MATCHED, - static_cast(-1)); if (is_entity_local(*sub_entity)) { - local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + update_event_counters( + sub_entity, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + static_cast(-1)); } } } @@ -308,34 +298,11 @@ void GraphCache::handle_matched_events_for_del( // Notify any local pubs of a matched event with change -1. for (const auto & [_, topic_data_ptr] : topic_qos_map) { for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) { - update_event_counters( - topic_info.name_, - ZENOH_EVENT_PUBLICATION_MATCHED, - static_cast(-1)); if (is_entity_local(*pub_entity)) { - local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); - } - } - } - } - take_entities_with_events(local_entities_with_events); -} - -///============================================================================= -void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events) -{ - for (const auto & [local_entity, event_set] : entities_with_events) { - // Trigger callback set for this entity for the event type. - GraphEventCallbackMap::const_iterator event_callbacks_it = - event_callbacks_.find(local_entity->keyexpr_hash()); - if (event_callbacks_it != event_callbacks_.end()) { - for (const rmw_zenoh_event_type_t & event_type : event_set) { - GraphEventCallbacks::const_iterator callback_it = - event_callbacks_it->second.find(event_type); - if (callback_it != event_callbacks_it->second.end()) { - std::unique_ptr taken_event = - take_event_status(local_entity->topic_info()->name_, event_type); - callback_it->second(std::move(taken_event)); + update_event_counters( + pub_entity, + ZENOH_EVENT_PUBLICATION_MATCHED, + static_cast(-1)); } } } @@ -1218,7 +1185,7 @@ void GraphCache::set_qos_event_callback( const rmw_zenoh_event_type_t & event_type, GraphCacheEventCallback callback) { - std::lock_guard lock(graph_mutex_); + std::lock_guard lock(events_mutex_); if (event_type > ZENOH_EVENT_ID_MAX) { RMW_ZENOH_LOG_WARN_NAMED( @@ -1239,7 +1206,7 @@ void GraphCache::set_qos_event_callback( ///============================================================================= void GraphCache::remove_qos_event_callbacks(std::size_t entity_keyexpr_hash) { - std::lock_guard lock(graph_mutex_); + std::lock_guard lock(events_mutex_); event_callbacks_.erase(entity_keyexpr_hash); } @@ -1265,7 +1232,7 @@ bool GraphCache::is_entity_pub(const liveliness::Entity & entity) ///============================================================================= void GraphCache::update_event_counters( - const std::string & topic_name, + liveliness::ConstEntityPtr entity, const rmw_zenoh_event_type_t event_id, int32_t change) { @@ -1275,42 +1242,16 @@ void GraphCache::update_event_counters( std::lock_guard lock(events_mutex_); - auto event_statuses_it = event_statuses_.find(topic_name); - if (event_statuses_it == event_statuses_.end()) { - // Initialize statuses. - std::array status_array {}; - event_statuses_[topic_name] = std::move(status_array); - } - - rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id]; - status_to_update.total_count += std::max(0, change); - status_to_update.total_count_change += std::max(0, change); - status_to_update.current_count += change; - status_to_update.current_count_change = change; -} - -///============================================================================= -std::unique_ptr GraphCache::take_event_status( - const std::string & topic_name, - const rmw_zenoh_event_type_t event_id) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - return nullptr; - } - - std::lock_guard lock(events_mutex_); - - auto event_statuses_it = event_statuses_.find(topic_name); - if (event_statuses_it == event_statuses_.end()) { - return nullptr; + // Trigger callback set for this entity for the event type. + GraphEventCallbackMap::const_iterator event_callbacks_it = + event_callbacks_.find(entity->keyexpr_hash()); + if (event_callbacks_it != event_callbacks_.end()) { + GraphEventCallbacks::const_iterator callback_it = + event_callbacks_it->second.find(event_id); + if (callback_it != event_callbacks_it->second.end()) { + callback_it->second(change); + } } - - rmw_zenoh_event_status_t & status_to_take = event_statuses_[topic_name][event_id]; - auto result = std::make_unique(status_to_take); - // Reset changes. - status_to_take.total_count_change = 0; - status_to_take.current_count_change = 0; - return result; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 6c4f6e54..0ec37153 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -103,7 +103,7 @@ class GraphCache final public: /// @brief Signature for a function that will be invoked by the GraphCache when a QoS /// event is detected. - using GraphCacheEventCallback = std::function)>; + using GraphCacheEventCallback = std::function; /// Callback to be triggered when a publication cache is detected in the ROS Graph. using QueryingSubscriberCallback = std::function; @@ -230,15 +230,10 @@ class GraphCache final bool is_entity_local(const liveliness::Entity & entity) const; void update_event_counters( - const std::string & topic_name, + liveliness::ConstEntityPtr entity, const rmw_zenoh_event_type_t event_id, int32_t change); - // Take status and reset change counters. - std::unique_ptr take_event_status( - const std::string & topic_name, - const rmw_zenoh_event_type_t event_id); - void handle_matched_events_for_put( liveliness::ConstEntityPtr entity, const GraphNode::TopicQoSMap & topic_qos_map); @@ -247,10 +242,6 @@ class GraphCache final liveliness::ConstEntityPtr entity, const GraphNode::TopicQoSMap & topic_qos_map); - using EntityEventMap = - std::unordered_map>; - void take_entities_with_events(const EntityEventMap & entities_with_events); - std::string zid_str_; /* namespace_1: @@ -299,9 +290,6 @@ class GraphCache final // Map key expressions to a map of sub keyexpr_hash and QueryingSubscriberCallback. std::unordered_map> querying_subs_cbs_; - // Counters to track changes to event statues for each topic. - std::unordered_map> event_statuses_; std::mutex events_mutex_; // Mutex to lock before modifying the members above. diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 279d3aa6..2a070ee8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -151,7 +151,6 @@ SubscriptionData::SubscriptionData( type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), - total_messages_lost_(0), wait_set_data_(nullptr), is_shutdown_(false), initialized_(false) @@ -624,13 +623,9 @@ void SubscriptionData::add_new_message( last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; - total_messages_lost_ += num_msg_lost; - auto event_status = std::make_unique(); - event_status->total_count_change = num_msg_lost; - event_status->total_count = total_messages_lost_; - events_mgr_->add_new_event( + events_mgr_->update_event_status( ZENOH_EVENT_MESSAGE_LOST, - std::move(event_status)); + num_msg_lost); } } // Always update the last known sequence number for the publisher. diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 37ab0dba..31130785 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -153,7 +153,6 @@ class SubscriptionData final : public std::enable_shared_from_this> message_queue_; // Map GID of a subscription to the sequence number of the message it published. std::unordered_map last_known_published_msg_; - size_t total_messages_lost_; // Wait set data. rmw_wait_set_data_t * wait_set_data_; // Callback managers. diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 68117032..b362a042 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -73,15 +73,15 @@ rmw_publisher_event_init( pub_data->keyexpr_hash(), zenoh_event_type, [data_wp, - zenoh_event_type](std::unique_ptr zenoh_event) + zenoh_event_type](int32_t change) { auto pub_data = data_wp.lock(); if (pub_data == nullptr) { return; } - pub_data->events_mgr()->add_new_event( + pub_data->events_mgr()->update_event_status( zenoh_event_type, - std::move(zenoh_event)); + change); } ); @@ -132,14 +132,14 @@ rmw_subscription_event_init( sub_data->keyexpr_hash(), zenoh_event_type, [sub_data, - zenoh_event_type](std::unique_ptr zenoh_event) + zenoh_event_type](int32_t change) { if (sub_data == nullptr) { return; } - sub_data->events_mgr()->add_new_event( + sub_data->events_mgr()->update_event_status( zenoh_event_type, - std::move(zenoh_event)); + change); } ); @@ -206,30 +206,33 @@ rmw_take_event( rmw_zenoh_cpp::EventsManager * event_data = static_cast(event_handle->data); RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr st = event_data->pop_next_event( + rmw_zenoh_cpp::rmw_zenoh_event_status_t st = event_data->take_event_status( zenoh_event_type); - if (st == nullptr) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "rmw_take_event called when event queue for event type [%d] is empty", - event_handle->event_type); - return RMW_RET_ERROR; - } // Now depending on the event, populate the rmw event status. switch (zenoh_event_type) { case rmw_zenoh_cpp::ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { auto ei = static_cast(event_info); RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); - ei->total_count = static_cast(st->total_count); - ei->total_count_change = static_cast(st->total_count_change); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); *taken = true; return RMW_RET_OK; } case rmw_zenoh_cpp::ZENOH_EVENT_MESSAGE_LOST: { auto ei = static_cast(event_info); RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); - ei->total_count = static_cast(st->total_count); - ei->total_count_change = static_cast(st->total_count_change); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); + *taken = true; + return RMW_RET_OK; + } + case rmw_zenoh_cpp::ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE: + case rmw_zenoh_cpp::ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE: { + auto ei = static_cast(event_info); + RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); *taken = true; return RMW_RET_OK; } @@ -237,18 +240,18 @@ rmw_take_event( case rmw_zenoh_cpp::ZENOH_EVENT_SUBSCRIPTION_MATCHED: { auto ei = static_cast(event_info); RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); - ei->total_count = st->total_count; - ei->total_count_change = st->total_count_change; - ei->current_count = st->current_count; - ei->current_count_change = st->current_count_change; + ei->total_count = st.total_count; + ei->total_count_change = st.total_count_change; + ei->current_count = st.current_count; + ei->current_count_change = st.current_count_change; *taken = true; return RMW_RET_OK; } case rmw_zenoh_cpp::ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { auto ei = static_cast(event_info); RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); - ei->total_count = static_cast(st->total_count); - ei->total_count_change = static_cast(st->total_count_change); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); *taken = true; return RMW_RET_OK; }