From 995b2deb98b4691f420c8c5ef4f7ae17ad45c736 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 15 Nov 2023 13:55:49 +0800 Subject: [PATCH 01/12] Add storage plugin config to router config Signed-off-by: Yadunund --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 3fa61761..b35834f7 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -123,7 +123,7 @@ /// This option does not make LowLatency transport mandatory, the actual implementation of transport /// used will depend on Establish procedure and other party's settings /// - /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. + /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to /// enable 'lowlatency' you need to explicitly disable 'qos'. lowlatency: false, @@ -258,4 +258,28 @@ write: false, }, }, + + /// Plugins configurations + /// Plugins are only loaded if present in the configuration. When starting + /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. + plugins: { + + /// Configure the storage manager plugin + storage_manager: { + /// Configure the storages supported by the volumes + storages: { + ros2_lv: { + /// Storages always need to know what set of keys they must work with. These sets are defined by a key expression. + key_expr: "@ros2_lv/**", + /// Storages also need to know which volume will be used to actually store their key-value pairs. + /// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient. + volume: "memory", + /// A complete storage advertises itself as containing all the known keys matching the configured key expression. + /// If not configured, complete defaults to false. + complete: "true", + }, + }, + }, + }, + } From 5199c1d51fef66c2f9c7970511584421d2b83c27 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 15 Nov 2023 14:52:32 +0800 Subject: [PATCH 02/12] Fix non empty node namespace Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 25 ++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index a890f485..268fc774 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -44,8 +44,18 @@ static std::string generate_base_token( const std::string & name) { std::stringstream token_ss; - // TODO(Yadunund): Empty namespace will contain /. Fix non-empty namespace. - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_ << name; + token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; + // An empty namespace from rcl will contain "/"" but zenoh does not allow keys with "//". + // Hence we add an "_" to denote an empty namespace such that splitting the key + // will always result in 5 parts. + if (namespace_ == "/") { + token_ss << "_/"; + } + else { + token_ss << "/"; + } + // Finally append node name. + token_ss << name; return token_ss.str(); } @@ -274,25 +284,28 @@ static std::vector split_keyexpr(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { - // TODO(Yadunund): Validate data. std::vector parts = split_keyexpr(keyexpr); - if (parts.size() < 3) { + // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). + if (parts.size() < 5) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); return; } + // TODO(Yadunund): Validate individual parts. + // Get the entity, ie N, MP, MS, SS, SC. const std::string & entity = parts[2]; std::lock_guard lock(graph_mutex_); if (entity == "NN") { // Node RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str()); - const bool has_namespace = entity.size() == 5 ? true : false; + // Nodes with empty namespaces will contain an "_". + const bool has_namespace = parts[3] == "_" ? false : true; graph_[parts.back()] = YAML::Node(); // TODO(Yadunund): Implement enclave support. graph_[parts.back()]["enclave"] = ""; - graph_[parts.back()]["namespace"] = has_namespace ? parts.at(4) : "/"; + graph_[parts.back()]["namespace"] = has_namespace ? "/" + parts.at(3) : "/"; } else if (entity == "MP") { // Publisher } else if (entity == "MS") { From 59c28d12ce3eb44c116115674123c0046cf68778 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 15 Nov 2023 17:21:29 +0800 Subject: [PATCH 03/12] Rely on unordered_map instead of yaml for graph cache Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 179 +++++++++++++++++------ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 61 +++++--- 2 files changed, 175 insertions(+), 65 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 268fc774..44abbd3e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -50,8 +51,7 @@ static std::string generate_base_token( // will always result in 5 parts. if (namespace_ == "/") { token_ss << "_/"; - } - else { + } else { token_ss << "/"; } // Finally append node name. @@ -253,7 +253,9 @@ GraphCache::remove_subscription(uint64_t handle) } ///============================================================================= -static std::vector split_keyexpr(const std::string & keyexpr) +namespace +{ +std::vector split_keyexpr(const std::string & keyexpr) { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. @@ -280,32 +282,108 @@ static std::vector split_keyexpr(const std::string & keyexpr) result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } - ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +// Convert a liveliness token into a +std::optional> _parse_token(const std::string & keyexpr) { std::vector parts = split_keyexpr(keyexpr); // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). + // Basic validation. if (parts.size() < 5) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); - return; + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } } - // TODO(Yadunund): Validate individual parts. // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + std::string & entity = parts[2]; + if (entity != "NN" && + entity != "MP" && + entity != "MS" && + entity != "SS" && + entity != "SC") + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid entity [%s] in liveliness token", entity.c_str()); + return std::nullopt; + } + + + GraphNode node; + // Nodes with empty namespaces will contain a "_". + node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3); + node.name = std::move(parts[4]); + + if (entity != "NN") { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + GraphNode::PubSubData data; + data.topic = std::move(parts[5]); + data.type = std::move(parts[6]); + data.qos = std::move(parts[7]); + + if (entity == "MP") { + node.pubs.push_back(std::move(data)); + } else if (entity == "MS") { + node.subs.push_back(std::move(data)); + } else { + // TODO. + } + } + + return std::make_pair(std::move(entity), std::move(node)); +} +} // namespace anonymous + +///============================================================================= +void GraphCache::parse_put(const std::string & keyexpr) +{ + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. + return; + } + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); + if (entity == "NN") { // Node - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str()); - // Nodes with empty namespaces will contain an "_". - const bool has_namespace = parts[3] == "_" ? false : true; - graph_[parts.back()] = YAML::Node(); - // TODO(Yadunund): Implement enclave support. - graph_[parts.back()]["enclave"] = ""; - graph_[parts.back()]["namespace"] = has_namespace ? "/" + parts.at(3) : "/"; + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // New namespace. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.", + node->name.c_str()); + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node %s to the graph.", + node->name.c_str()); + return; + } else if (entity == "MP") { // Publisher } else if (entity == "MS") { @@ -325,25 +403,25 @@ void GraphCache::parse_put(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - // TODO(Yadunund): Validate data. - std::vector parts = split_keyexpr(keyexpr); - if (parts.size() < 3) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. return; } - // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); if (entity == "NN") { // Node + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + ns_it->second.erase(node->name); + } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removing node %s from the graph.", - parts.back().c_str() + "Removed node %s from the graph.", + node->name.c_str() ); - graph_.remove(entity.back()); } else if (entity == "MP") { // Publisher } else if (entity == "MS") { @@ -383,7 +461,14 @@ rmw_ret_t GraphCache::get_node_names( RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - size_t nodes_number = graph_.size(); + size_t nodes_number = 0; + for (auto it = graph_.begin(); it != graph_.end(); ++it) { + nodes_number += it->second.size(); + } + // TODO(Yadunund): Delete. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "nodes_number %ld", nodes_number); rcutils_ret_t rcutils_ret = rcutils_string_array_init(node_names, nodes_number, allocator); @@ -437,30 +522,32 @@ rmw_ret_t GraphCache::get_node_names( } // TODO(Yadunund): Remove this printout. - const std::string & graph_str = YAML::Dump(graph_); - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); + // const std::string & graph_str = YAML::Dump(graph_); + // RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - const auto & node_name = it->first.as(); - const auto & yaml_node = it->second; - node_names->data[j] = rcutils_strdup(node_name.c_str(), *allocator); - if (!node_names->data[j]) { - return RMW_RET_BAD_ALLOC; - } - node_namespaces->data[j] = rcutils_strdup( - yaml_node["namespace"].as().c_str(), *allocator); - if (!node_namespaces->data[j]) { - return RMW_RET_BAD_ALLOC; - } - if (enclaves) { - enclaves->data[j] = rcutils_strdup( - yaml_node["enclaves"].as().c_str(), *allocator); - if (!enclaves->data[j]) { + for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + const std::string & ns = ns_it->first; + for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { + const auto node = node_it->second; + node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } + node_namespaces->data[j] = rcutils_strdup( + ns.c_str(), *allocator); + if (!node_namespaces->data[j]) { + return RMW_RET_BAD_ALLOC; + } + if (enclaves) { + enclaves->data[j] = rcutils_strdup( + node->enclave.c_str(), *allocator); + if (!enclaves->data[j]) { + return RMW_RET_BAD_ALLOC; + } + } + ++j; } - ++j; } if (free_enclaves) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 0bb878b0..0e0c4c14 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -102,6 +102,27 @@ class SubscriptionData final char * type_name_{nullptr}; }; +///============================================================================= +// TODO(Yadunund): Expand to services and clients. +struct GraphNode +{ + + struct PubSubData + { + std::string topic; + std::string type; + std::string qos; + }; + + std::string ns; + std::string name; + // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? + std::string enclave; + std::vector pubs; + std::vector subs; +}; +using GraphNodePtr = std::shared_ptr; + ///============================================================================= class GraphCache final { @@ -141,26 +162,28 @@ class GraphCache final std::map> subscriptions_; /* - node_1: - enclave: - namespace: - publishers: [ - { - topic: - type: - qos: - } - ] - subscriptions: [ - { - topic: - type: - qos: - } - ] - node_n: + namespace_1: + node_1: + enclave: + publishers: [ + { + topic: + type: + qos: + } + ], + subscriptions: [ + { + topic: + type: + qos: + } + ], + namespace_2: + node_n: */ - YAML::Node graph_; + // Map namespace to a map of . + std::unordered_map> graph_; mutable std::mutex graph_mutex_; }; From 69d2eadac22d7c4683a755c11fa07c9d5a53c9eb Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 00:09:36 +0800 Subject: [PATCH 04/12] Update graph cache with publisher data Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 54 ++++++++++++++++++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 43 +++++++++++++++---- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 44abbd3e..99403ec8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -79,6 +79,7 @@ std::string GenerateToken::publisher( { std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); token += topic + "/" + type + "/" + qos; + printf("GenerateToken::Publisher: %s\n", token.c_str()); return token; } @@ -386,6 +387,26 @@ void GraphCache::parse_put(const std::string & keyexpr) } else if (entity == "MP") { // Publisher + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->pubs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->pubs.push_back(std::move(node->pubs[0])); + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added publisher to node %s in graph.", + node->name.c_str()); + return; } else if (entity == "MS") { // Subscription } else if (entity == "SS") { @@ -424,6 +445,39 @@ void GraphCache::parse_del(const std::string & keyexpr) ); } else if (entity == "MP") { // Publisher + if (node->pubs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of publishers and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->pubs.begin(); + for (; erase_it != found_node->pubs.end(); ++erase_it) { + const auto & pub = *erase_it; + if (pub.topic == node->pubs.at(0).topic && + pub.type == node->pubs.at(0).type && + pub.qos == node->pubs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->pubs.end()) { + found_node->pubs.erase(erase_it); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed publisher %s from node %s in the graph.", + node->pubs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "MS") { // Subscription } else if (entity == "SS") { diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5df96482..af7ca85e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -181,11 +181,11 @@ rmw_create_node( node->context = context; // Publish to the graph that a new node is in town - const bool result = PublishToken::put( + const bool pub_result = PublishToken::put( &node->context->impl->session, GenerateToken::node(context->actual_domain_id, namespace_, name) ); - if (!result) { + if (!pub_result) { return nullptr; } @@ -211,11 +211,11 @@ rmw_destroy_node(rmw_node_t * node) return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); // Publish to the graph that a node has ridden off into the sunset - const bool result = PublishToken::del( + const bool del_result = PublishToken::del( &node->context->impl->session, GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) ); - if (!result) { + if (!del_result) { return RMW_RET_ERROR; } @@ -518,6 +518,19 @@ rmw_create_publisher( // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. + const bool pub_result = PublishToken::put( + &node->context->impl->session, + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable") + ); + if (!pub_result) { + return nullptr; + } publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( rmw_publisher->topic_name, node->name, node->namespace_, @@ -558,15 +571,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a publisher has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted publisher. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(publisher->topic_name), allocator->state); - auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { + // Publish to the graph that a publisher has ridden off into the sunset + const bool del_result = PublishToken::del( + &node->context->impl->session, + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable" + ) + ); + if (!del_result) { + // TODO(Yadunund): Should this really return an error? + return RMW_RET_ERROR; + } node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); @@ -577,6 +601,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) } allocator->deallocate(publisher_data, allocator->state); } + allocator->deallocate(const_cast(publisher->topic_name), allocator->state); allocator->deallocate(publisher, allocator->state); return ret; From 5be71990418fc2eddda480f03543226f3df8b97a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 02:07:56 +0800 Subject: [PATCH 05/12] Implement basic publisher introspection Signed-off-by: Yadunund --- rmw_zenoh_cpp/package.xml | 1 - rmw_zenoh_cpp/src/detail/graph_cache.cpp | 139 ++++++++++++++++-- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 15 +- .../src/rmw_get_topic_names_and_types.cpp | 110 +++++++------- 4 files changed, 198 insertions(+), 67 deletions(-) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 259bbce6..02abec46 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,7 +19,6 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw - yaml_cpp_vendor ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 99403ec8..fde32033 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -256,14 +256,16 @@ GraphCache::remove_subscription(uint64_t handle) ///============================================================================= namespace { -std::vector split_keyexpr(const std::string & keyexpr) +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. delim_idx.push_back(-1); std::size_t idx = 0; for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == '/') { + if (*it == delim) { delim_idx.push_back(idx); } ++idx; @@ -319,7 +321,6 @@ std::optional> _parse_token(const std::string return std::nullopt; } - GraphNode node; // Nodes with empty namespaces will contain a "_". node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3); @@ -333,7 +334,7 @@ std::optional> _parse_token(const std::string return std::nullopt; } GraphNode::PubSubData data; - data.topic = std::move(parts[5]); + data.topic = "/" + std::move(parts[5]); data.type = std::move(parts[6]); data.qos = std::move(parts[7]); @@ -376,12 +377,12 @@ void GraphCache::parse_put(const std::string & keyexpr) auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); if (!insertion.second) { RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.", + "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", node->name.c_str()); } } RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node %s to the graph.", + "rmw_zenoh_cpp", "Added node /%s to the graph.", node->name.c_str()); return; @@ -400,11 +401,20 @@ void GraphCache::parse_put(const std::string & keyexpr) auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); if (!insertion.second && !node->pubs.empty()) { // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(std::move(node->pubs[0])); + insertion.first->second->pubs.push_back(node->pubs[0]); } } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), 1)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second; + } RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher to node %s in graph.", + "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", + node->pubs.at(0).topic.c_str(), node->name.c_str()); return; } else if (entity == "MS") { @@ -440,7 +450,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removed node %s from the graph.", + "Removed node /%s from the graph.", node->name.c_str() ); } else if (entity == "MP") { @@ -469,9 +479,22 @@ void GraphCache::parse_del(const std::string & keyexpr) } if (erase_it != found_node->pubs.end()) { found_node->pubs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second == 1) { + // The last publisher was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second; + } + } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removed publisher %s from node %s in the graph.", + "Removed publisher %s from node /%s in the graph.", node->pubs.at(0).topic.c_str(), node->name.c_str() ); @@ -575,9 +598,6 @@ rmw_ret_t GraphCache::get_node_names( std::move(free_enclaves_lambda)); } - // TODO(Yadunund): Remove this printout. - // const std::string & graph_str = YAML::Dump(graph_); - // RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { @@ -612,3 +632,96 @@ rmw_ret_t GraphCache::get_node_names( return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + static_cast(no_demangle); + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); + + std::lock_guard lock(graph_mutex_); + const size_t topic_number = graph_topics_.size(); + // TODO(Yadunund): Delete. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "topic_number %ld", topic_number); + + rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + auto cleanup_names_and_types = rcpputils::make_scope_exit( + [topic_names_and_types] { + rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + } + }); + + // topic_names_and_types->names is an rcutils_string_array_t, + // while topic_names_and_types->types is an rcutils_string_array_t * + + // rcutils_ret_t rcutils_ret = + // rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator); + // if (rcutils_ret != RCUTILS_RET_OK) { + // return RMW_RET_BAD_ALLOC; + // } + auto free_topic_names = rcpputils::make_scope_exit( + [names = &topic_names_and_types->names]() { + rcutils_ret_t ret = rcutils_string_array_fini(names); + if (ret != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }); + + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + return RMW_RET_BAD_ALLOC; + } + auto free_topic_types = rcpputils::make_scope_exit( + [types = topic_names_and_types->types]() { + rcutils_ret_t ret = rcutils_string_array_fini(types); + if (ret != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }); + + // Fill topic names and types. + std::size_t j = 0; + for (const auto & it : graph_topics_) { + // Split based on "?". + // TODO(Yadunund): Be more systematic about this. + std::vector parts = split_keyexpr(it.first, '?'); + if (parts.size() < 2) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid topic_key %s", it.first.c_str()); + return RMW_RET_INVALID_ARGUMENT; + } + topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator); + if (!topic_names_and_types->names.data[j]) { + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types->data[j] = rcutils_strdup( + parts.at(1).c_str(), *allocator); + if (!topic_names_and_types->types->data[j]) { + return RMW_RET_BAD_ALLOC; + } + ++j; + } + + cleanup_names_and_types.cancel(); + free_topic_names.cancel(); + free_topic_types.cancel(); + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 0e0c4c14..78336704 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -21,14 +21,15 @@ #include #include #include +#include #include #include "rcutils/allocator.h" #include "rcutils/types.h" #include "rmw/rmw.h" +#include "rmw/names_and_types.h" -#include "yaml-cpp/yaml.h" ///============================================================================= class GenerateToken @@ -152,6 +153,11 @@ class GraphCache final rcutils_string_array_t * enclaves, rcutils_allocator_t * allocator) const; + rmw_ret_t get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types); + private: std::mutex publishers_mutex_; uint64_t publishers_handle_id_{0}; @@ -183,7 +189,12 @@ class GraphCache final node_n: */ // Map namespace to a map of . - std::unordered_map> graph_; + std::unordered_map> graph_ = {}; + // Optimize published topic lookups by caching an unordered_map + // where key is topic_name+topic_type and the value the count. + // TODO(Yadunund): Use a single map for both published and subscribed topics. + // Consider changing value to pair for pub,sub resp. + std::unordered_map graph_topics_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index 7e4f2645..a70b7cb4 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "detail/rmw_data_types.hpp" + #include "rcutils/strdup.h" #include "rmw/error_handling.h" @@ -31,63 +33,69 @@ rmw_get_topic_names_and_types( bool no_demangle, rmw_names_and_types_t * topic_names_and_types) { - static_cast(node); - static_cast(no_demangle); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.get_topic_names_and_types( + allocator, no_demangle, topic_names_and_types); - rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); - if (ret != RMW_RET_OK) { - return ret; - } - auto cleanup_names_and_types = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); - } - }); + // rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); + // if (ret != RMW_RET_OK) { + // return ret; + // } + // auto cleanup_names_and_types = rcpputils::make_scope_exit( + // [topic_names_and_types] { + // rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + // if (fail_ret != RMW_RET_OK) { + // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + // } + // }); - // topic_names_and_types->names is an rcutils_string_array_t, - // while topic_names_and_types->types is an rcutils_string_array_t * + // // topic_names_and_types->names is an rcutils_string_array_t, + // // while topic_names_and_types->types is an rcutils_string_array_t * - topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); - if (topic_names_and_types->names.data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); - return RMW_RET_BAD_ALLOC; - } - auto free_names = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); - }); + // topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); + // if (topic_names_and_types->names.data[0] == nullptr) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); + // return RMW_RET_BAD_ALLOC; + // } + // auto free_names = rcpputils::make_scope_exit( + // [topic_names_and_types, allocator] { + // allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); + // }); - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - topic_names_and_types->types, 1, - allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); - return RMW_RET_BAD_ALLOC; - } - auto fini_string_array = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); - } - }); + // rcutils_ret_t rcutils_ret = rcutils_string_array_init( + // topic_names_and_types->types, 1, + // allocator); + // if (rcutils_ret != RCUTILS_RET_OK) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); + // return RMW_RET_BAD_ALLOC; + // } + // auto fini_string_array = rcpputils::make_scope_exit( + // [topic_names_and_types] { + // rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); + // if (fail_ret != RMW_RET_OK) { + // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); + // } + // }); - topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); - if (topic_names_and_types->types[0].data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); - return RMW_RET_BAD_ALLOC; - } - auto free_types = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); - }); + // topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); + // if (topic_names_and_types->types[0].data[0] == nullptr) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); + // return RMW_RET_BAD_ALLOC; + // } + // auto free_types = rcpputils::make_scope_exit( + // [topic_names_and_types, allocator] { + // allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); + // }); - free_types.cancel(); - fini_string_array.cancel(); - free_names.cancel(); - cleanup_names_and_types.cancel(); + // free_types.cancel(); + // fini_string_array.cancel(); + // free_names.cancel(); + // cleanup_names_and_types.cancel(); return RMW_RET_OK; } From aa831ef965386a97ef7441c6d60a94dcafaf1b98 Mon Sep 17 00:00:00 2001 From: Yadu Date: Thu, 16 Nov 2023 18:48:35 +0800 Subject: [PATCH 06/12] Switch to liveliness tokens (#67) * Switch to liveliness tokens Signed-off-by: Yadunund * Use zc APIs instead of macros to resolve liveliness api issues Signed-off-by: Yadunund --------- Signed-off-by: Yadunund --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 23 --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 15 +- rmw_zenoh_cpp/src/rmw_init.cpp | 33 +++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 135 +++++++++++++----- 4 files changed, 132 insertions(+), 74 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index b35834f7..506822d9 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -259,27 +259,4 @@ }, }, - /// Plugins configurations - /// Plugins are only loaded if present in the configuration. When starting - /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. - plugins: { - - /// Configure the storage manager plugin - storage_manager: { - /// Configure the storages supported by the volumes - storages: { - ros2_lv: { - /// Storages always need to know what set of keys they must work with. These sets are defined by a key expression. - key_expr: "@ros2_lv/**", - /// Storages also need to know which volume will be used to actually store their key-value pairs. - /// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient. - volume: "memory", - /// A complete storage advertises itself as containing all the known keys matching the configured key expression. - /// If not configured, complete defaults to false. - complete: "true", - }, - }, - }, - }, - } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 421932ac..d25fe777 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -58,11 +58,10 @@ struct rmw_context_impl_s ///============================================================================== struct rmw_node_data_t { - // TODO(yadunund): Add a GraphCache object. - - // Map topic name to topic types. - std::unordered_set> publishers; - std::unordered_set> subscriptions; + // TODO(Yadunund): Do we need a token at the node level? Right now I have one + // for cases where a node may spin up but does not have any publishers or subscriptions. + // Liveliness token for the node. + zc_owned_liveliness_token_t token; }; ///============================================================================== @@ -71,6 +70,9 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Liveliness token for the publisher. + zc_owned_liveliness_token_t token; + // Type support fields const void * type_support_impl; const char * typesupport_identifier; @@ -113,6 +115,9 @@ struct rmw_subscription_data_t { z_owned_subscriber_t sub; + // Liveliness token for the subscription. + zc_owned_liveliness_token_t token; + const void * type_support_impl; const char * typesupport_identifier; MessageTypeSupport * type_support; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 9b01b904..d045283c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -248,18 +248,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Setup liveliness subscriptions for discovery. const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); - // Query the router to get graph information before this session was started. - // TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available. + // Query the router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data from router...", + "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); z_owned_reply_channel_t channel = zc_reply_fifo_new(16); - z_get_options_t opts = z_get_options_default(); - z_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), - &opts); // here, the send is moved and will be dropped by zenoh when adequate + zc_liveliness_get( + z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // z_get_options_t opts = z_get_options_default(); + // z_get( + // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), + // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { if (z_reply_is_ok(&reply)) { @@ -284,14 +288,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) liveliness_str.c_str() ); - auto sub_options = z_subscriber_options_default(); - sub_options.reliability = Z_RELIABILITY_RELIABLE; + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // auto sub_options = z_subscriber_options_default(); + // sub_options.reliability = Z_RELIABILITY_RELIABLE; + // context->impl->graph_subscriber = z_declare_subscriber( + // z_loan(context->impl->session), + // z_keyexpr(liveliness_str.c_str()), + // z_move(callback), + // &sub_options); + auto sub_options = zc_liveliness_subscriber_options_null(); z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = z_declare_subscriber( + context->impl->graph_subscriber = zc_liveliness_declare_subscriber( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(callback), &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); auto undeclare_z_sub = rcpputils::make_scope_exit( [context]() { z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index af7ca85e..03dc7276 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -20,6 +20,8 @@ #include #include +#include + #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" @@ -180,12 +182,34 @@ rmw_create_node( node->implementation_identifier = rmw_zenoh_identifier; node->context = context; + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new node is in town - const bool pub_result = PublishToken::put( - &node->context->impl->session, - GenerateToken::node(context->actual_domain_id, namespace_, name) + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::node(context->actual_domain_id, namespace_, name) + // ); + // if (!pub_result) { + // return nullptr; + // } + // Initialize liveliness token for the node to advertise that a new node is in town. + rmw_node_data_t * node_data = static_cast(node->data); + node_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + NULL ); - if (!pub_result) { + auto free_token = rcpputils::make_scope_exit( + [node]() { + if (node->data != nullptr) { + rmw_node_data_t * node_data = static_cast(node->data); + z_drop(z_move(node_data->token)); + } + }); + if (!zc_liveliness_token_check(&node_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the node."); return nullptr; } @@ -193,6 +217,7 @@ rmw_create_node( free_namespace.cancel(); free_name.cancel(); free_node.cancel(); + free_token.cancel(); return node; } @@ -204,20 +229,27 @@ rmw_destroy_node(rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a node has ridden off into the sunset - const bool del_result = PublishToken::del( - &node->context->impl->session, - GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) - ); - if (!del_result) { - return RMW_RET_ERROR; - } + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // ); + // if (!del_result) { + // return RMW_RET_ERROR; + // } + + // Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset. + rmw_node_data_t * node_data = static_cast(node->data); + zc_liveliness_undeclare_token(z_move(node_data->token)); rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -516,19 +548,45 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. - const bool pub_result = PublishToken::put( - &node->context->impl->session, - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable") + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // rmw_publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable") + // ); + // if (!pub_result) { + // return nullptr; + // } + publisher_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr( + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable").c_str()), + NULL ); - if (!pub_result) { + auto free_token = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data != nullptr) { + z_drop(z_move(publisher_data->token)); + } + }); + if (!zc_liveliness_token_check(&publisher_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the publisher."); return nullptr; } @@ -540,6 +598,7 @@ rmw_create_publisher( node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); }); + free_token.cancel(); remove_from_graph_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); @@ -575,22 +634,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a publisher has ridden off into the sunset - const bool del_result = PublishToken::del( - &node->context->impl->session, - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable" - ) - ); - if (!del_result) { - // TODO(Yadunund): Should this really return an error? - return RMW_RET_ERROR; - } + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable" + // ) + // ); + // if (!del_result) { + // // TODO(Yadunund): Should this really return an error? + // return RMW_RET_ERROR; + // } + zc_liveliness_undeclare_token(z_move(publisher_data->token)); + node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); From a56eec2ed05fda70b4b33193a811e15a76c38fb3 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 18:51:04 +0800 Subject: [PATCH 07/12] Cleanup yaml-cpp Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index f403c364..d3f93e72 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,7 +21,6 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) -find_package(yaml_cpp_vendor REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -57,7 +56,6 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw - yaml-cpp zenohc::lib ) From c518431ef1e827d6d7c6b9f9a854efa14d50260c Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 18:58:52 +0800 Subject: [PATCH 08/12] Cleanup previous graph cache impl Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 109 -------------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 58 ----------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 4 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 22 ---- 4 files changed, 193 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index fde32033..692ddf63 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -144,115 +144,6 @@ bool PublishToken::del( return true; } -///============================================================================= -PublisherData::PublisherData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -: allocator_(allocator) -{ - // TODO(clalancette): Check for error - topic_name_ = rcutils_strdup(topic, *allocator); - - // TODO(clalancette): Check for error - node_name_ = rcutils_strdup(node, *allocator); - - // TODO(clalancette): Check for error - namespace_name_ = rcutils_strdup(namespace_, *allocator); - - // TODO(clalancette): Check for error - type_name_ = rcutils_strdup(type, *allocator); -} - -///============================================================================= -PublisherData::~PublisherData() -{ - allocator_->deallocate(topic_name_, allocator_->state); - allocator_->deallocate(node_name_, allocator_->state); - allocator_->deallocate(namespace_name_, allocator_->state); - allocator_->deallocate(type_name_, allocator_->state); -} - -///============================================================================= -SubscriptionData::SubscriptionData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -: allocator_(allocator) -{ - // TODO(clalancette): Check for error - topic_name_ = rcutils_strdup(topic, *allocator); - - // TODO(clalancette): Check for error - node_name_ = rcutils_strdup(node, *allocator); - - // TODO(clalancette): Check for error - namespace_name_ = rcutils_strdup(namespace_, *allocator); - - // TODO(clalancette): Check for error - type_name_ = rcutils_strdup(type, *allocator); -} - -///============================================================================= -SubscriptionData::~SubscriptionData() -{ - allocator_->deallocate(topic_name_, allocator_->state); - allocator_->deallocate(node_name_, allocator_->state); - allocator_->deallocate(namespace_name_, allocator_->state); - allocator_->deallocate(type_name_, allocator_->state); -} - -///============================================================================= -uint64_t -GraphCache::add_publisher( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -{ - std::lock_guard lck(publishers_mutex_); - uint64_t this_handle_id = publishers_handle_id_++; - publishers_.emplace( - std::make_pair( - this_handle_id, std::make_unique(topic, node, namespace_, type, allocator))); - return this_handle_id; -} - -///============================================================================= -void -GraphCache::remove_publisher(uint64_t handle) -{ - std::lock_guard lck(publishers_mutex_); - if (publishers_.count(handle) == 0) { - return; - } - - publishers_.erase(handle); -} - -///============================================================================= -uint64_t -GraphCache::add_subscription( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -{ - std::lock_guard lck(subscriptions_mutex_); - uint64_t this_handle_id = subscriptions_handle_id_++; - subscriptions_.emplace( - std::make_pair( - this_handle_id, - std::make_unique(topic, node, namespace_, type, allocator))); - return this_handle_id; -} - -///============================================================================= -void -GraphCache::remove_subscription(uint64_t handle) -{ - std::lock_guard lck(subscriptions_mutex_); - if (subscriptions_.count(handle) == 0) { - return; - } - - subscriptions_.erase(handle); -} - ///============================================================================= namespace { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 78336704..07aa1c84 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -67,42 +67,6 @@ class PublishToken const std::string & token); }; -///============================================================================= -class PublisherData final -{ -public: - PublisherData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - ~PublisherData(); - -private: - rcutils_allocator_t * allocator_; - char * topic_name_{nullptr}; - char * node_name_{nullptr}; - char * namespace_name_{nullptr}; - char * type_name_{nullptr}; -}; - -///============================================================================= -class SubscriptionData final -{ -public: - SubscriptionData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - ~SubscriptionData(); - -private: - rcutils_allocator_t * allocator_; - char * topic_name_{nullptr}; - char * node_name_{nullptr}; - char * namespace_name_{nullptr}; - char * type_name_{nullptr}; -}; - ///============================================================================= // TODO(Yadunund): Expand to services and clients. struct GraphNode @@ -128,20 +92,6 @@ using GraphNodePtr = std::shared_ptr; class GraphCache final { public: - uint64_t - add_publisher( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - void remove_publisher(uint64_t publisher_handle); - - uint64_t - add_subscription( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - void remove_subscription(uint64_t subscription_handle); - // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr); // Parse a DELETE message over a token's key-expression and update the graph. @@ -159,14 +109,6 @@ class GraphCache final rmw_names_and_types_t * topic_names_and_types); private: - std::mutex publishers_mutex_; - uint64_t publishers_handle_id_{0}; - std::map> publishers_; - - std::mutex subscriptions_mutex_; - uint64_t subscriptions_handle_id_{0}; - std::map> subscriptions_; - /* namespace_1: node_1: diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index d25fe777..d16e57ce 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -80,8 +80,6 @@ struct rmw_publisher_data_t // Context for memory allocation for messages. rmw_context_t * context; - - uint64_t graph_cache_handle; }; ///============================================================================== @@ -131,8 +129,6 @@ struct rmw_subscription_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; - - uint64_t graph_cache_handle; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 03dc7276..2c8ca5f1 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -590,16 +590,7 @@ rmw_create_publisher( return nullptr; } - publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( - rmw_publisher->topic_name, node->name, node->namespace_, - publisher_data->type_support->get_name(), allocator); - auto remove_from_graph_cache = rcpputils::make_scope_exit( - [node, publisher_data]() { - node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); - }); - free_token.cancel(); - remove_from_graph_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -654,8 +645,6 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // } zc_liveliness_undeclare_token(z_move(publisher_data->token)); - node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); - RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); if (z_undeclare_publisher(z_move(publisher_data->pub))) { @@ -1264,16 +1253,6 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town // TODO(Yadunund): Publish liveliness for the new subscription. - - sub_data->graph_cache_handle = node->context->impl->graph_cache.add_subscription( - rmw_subscription->topic_name, node->name, node->namespace_, - sub_data->type_support->get_name(), allocator); - auto remove_from_graph_cache = rcpputils::make_scope_exit( - [node, sub_data]() { - node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); - }); - - remove_from_graph_cache.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1314,7 +1293,6 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { - node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); From 559819c22ff9a19d159d549dad2ced7d7c9a776a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 19:14:21 +0800 Subject: [PATCH 09/12] Refactor topic cache Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 20 +++++++++++++++----- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 24 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 692ddf63..af51eb8e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -224,7 +224,7 @@ std::optional> _parse_token(const std::string "Received invalid liveliness token"); return std::nullopt; } - GraphNode::PubSubData data; + GraphNode::TopicData data; data.topic = "/" + std::move(parts[5]); data.type = std::move(parts[6]); data.qos = std::move(parts[7]); @@ -242,6 +242,14 @@ std::optional> _parse_token(const std::string } } // namespace anonymous +///============================================================================= +GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +: pub_count_(pub_count), + sub_count_(sub_count) +{ + // Do nothing. +} + ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { @@ -298,10 +306,12 @@ void GraphCache::parse_put(const std::string & keyexpr) // Bookkeeping // TODO(Yadunund): Be more systematic about generating the key. std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), 1)); + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); if (!insertion.second) { // Such a topic already exists so we just increment its count. - ++insertion.first->second; + ++insertion.first->second->pub_count_; + } else { + insertion.first->second = std::make_unique(1, 0); } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", @@ -375,12 +385,12 @@ void GraphCache::parse_del(const std::string & keyexpr) std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; auto topic_it = graph_topics_.find(topic_key); if (topic_it != graph_topics_.end()) { - if (topic_it->second == 1) { + if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { // The last publisher was removed so we can delete this entry. graph_topics_.erase(topic_key); } else { // Else we just decrement the count. - --topic_it->second; + --topic_it->second->pub_count_; } } RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 07aa1c84..c6ac63a5 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -72,7 +72,7 @@ class PublishToken struct GraphNode { - struct PubSubData + struct TopicData { std::string topic; std::string type; @@ -83,8 +83,8 @@ struct GraphNode std::string name; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? std::string enclave; - std::vector pubs; - std::vector subs; + std::vector pubs; + std::vector subs; }; using GraphNodePtr = std::shared_ptr; @@ -132,11 +132,19 @@ class GraphCache final */ // Map namespace to a map of . std::unordered_map> graph_ = {}; - // Optimize published topic lookups by caching an unordered_map - // where key is topic_name+topic_type and the value the count. - // TODO(Yadunund): Use a single map for both published and subscribed topics. - // Consider changing value to pair for pub,sub resp. - std::unordered_map graph_topics_ = {}; + + // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. + struct TopicStats + { + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initialized counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); + }; + using TopicStatsPtr = std::unique_ptr; + std::unordered_map graph_topics_ = {}; + mutable std::mutex graph_mutex_; }; From 428cacec83e6f103ac4f699f9b01cd25e26d5304 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 19:35:01 +0800 Subject: [PATCH 10/12] Implement liveliness tokens for subscriptions and update graph Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 96 ++++++++++++++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 8 ++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 38 ++++++++-- 3 files changed, 135 insertions(+), 7 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index af51eb8e..7ccdeccb 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -83,6 +83,21 @@ std::string GenerateToken::publisher( return token; } +///============================================================================= +std::string GenerateToken::subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos) +{ + std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); + token += topic + "/" + type + "/" + qos; + printf("GenerateToken::Subscription: %s\n", token.c_str()); + return token; +} + ///============================================================================= bool PublishToken::put( z_owned_session_t * session, @@ -301,6 +316,8 @@ void GraphCache::parse_put(const std::string & keyexpr) if (!insertion.second && !node->pubs.empty()) { // Node already exists so just append the publisher. insertion.first->second->pubs.push_back(node->pubs[0]); + } else { + return; } } // Bookkeeping @@ -320,6 +337,39 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } else if (entity == "MS") { // Subscription + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->subs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->subs.push_back(node->subs[0]); + } else { + return; + } + } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second->sub_count_; + } else { + insertion.first->second = std::make_unique(0, 1); + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str()); + return; } else if (entity == "SS") { // Service } else if (entity == "SC") { @@ -404,6 +454,52 @@ void GraphCache::parse_del(const std::string & keyexpr) } } else if (entity == "MS") { // Subscription + if (node->subs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of subscriptions and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->subs.begin(); + for (; erase_it != found_node->subs.end(); ++erase_it) { + const auto & sub = *erase_it; + if (sub.topic == node->subs.at(0).topic && + sub.type == node->subs.at(0).type && + sub.qos == node->subs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->subs.end()) { + found_node->subs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { + // The last subscription was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second->sub_count_; + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed subscription %s from node /%s in the graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "SS") { // Service } else if (entity == "SC") { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index c6ac63a5..a67458b1 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -50,6 +50,14 @@ class GenerateToken const std::string & topic, const std::string & type, const std::string & qos); + + static std::string subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 2c8ca5f1..95c59d23 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -580,7 +580,7 @@ rmw_create_publisher( auto free_token = rcpputils::make_scope_exit( [publisher_data]() { if (publisher_data != nullptr) { - z_drop(z_move(publisher_data->token)); + zc_liveliness_undeclare_token(z_move(publisher_data->token)); } }); if (!zc_liveliness_token_check(&publisher_data->token)) { @@ -608,6 +608,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1251,8 +1252,32 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town - // TODO(Yadunund): Publish liveliness for the new subscription. + sub_data->token = zc_liveliness_declare_token( + z_loan(context_impl->session), + z_keyexpr( + GenerateToken::subscription( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_subscription->topic_name, + sub_data->type_support->get_name(), + "reliable").c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [sub_data]() { + if (sub_data != nullptr) { + zc_liveliness_undeclare_token(z_move(sub_data->token)); + } + }); + if (!zc_liveliness_token_check(&sub_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); + return nullptr; + } + free_token.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1284,16 +1309,14 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a subscription has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted subscription. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(subscription->topic_name), allocator->state); - auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { + // Publish to the graph that a subscription has ridden off into the sunset + zc_liveliness_undeclare_token(z_move(sub_data->token)); + RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); @@ -1305,6 +1328,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); } + allocator->deallocate(const_cast(subscription->topic_name), allocator->state); allocator->deallocate(subscription, allocator->state); return ret; From f7711c7d3e9d967d71ff3cd7e17919f550d3a378 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 17 Nov 2023 20:29:50 +0000 Subject: [PATCH 11/12] Fix types and type mangling. When playing around with this I found that the types weren't being properly filled in, and also weren't being demangled. Fix both of those things here, as well as do a lot of cleanup. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 146 +++++++++--------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 1 - .../src/rmw_get_topic_names_and_types.cpp | 57 ------- rmw_zenoh_cpp/src/rmw_init.cpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 +- 5 files changed, 80 insertions(+), 134 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 7ccdeccb..025b0047 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -20,6 +20,7 @@ #include #include +#include "rcpputils/find_and_replace.hpp" #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" @@ -38,6 +39,19 @@ std::string GenerateToken::liveliness(size_t domain_id) } ///============================================================================= +/** + * Generate a liveliness token for the particular entity. + * + * The liveliness tokens are in the form: + * + * @ros2_lv//// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + */ static std::string generate_base_token( const std::string & entity, size_t domain_id, @@ -46,7 +60,7 @@ static std::string generate_base_token( { std::stringstream token_ss; token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; - // An empty namespace from rcl will contain "/"" but zenoh does not allow keys with "//". + // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". // Hence we add an "_" to denote an empty namespace such that splitting the key // will always result in 5 parts. if (namespace_ == "/") { @@ -79,7 +93,6 @@ std::string GenerateToken::publisher( { std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); token += topic + "/" + type + "/" + qos; - printf("GenerateToken::Publisher: %s\n", token.c_str()); return token; } @@ -94,7 +107,6 @@ std::string GenerateToken::subscription( { std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); token += topic + "/" + type + "/" + qos; - printf("GenerateToken::Subscription: %s\n", token.c_str()); return token; } @@ -213,23 +225,12 @@ std::optional> _parse_token(const std::string } } - // Get the entity, ie N, MP, MS, SS, SC. + // Get the entity, ie NN, MP, MS, SS, SC. std::string & entity = parts[2]; - if (entity != "NN" && - entity != "MP" && - entity != "MS" && - entity != "SS" && - entity != "SC") - { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid entity [%s] in liveliness token", entity.c_str()); - return std::nullopt; - } GraphNode node; // Nodes with empty namespaces will contain a "_". - node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3); + node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; node.name = std::move(parts[4]); if (entity != "NN") { @@ -248,14 +249,21 @@ std::optional> _parse_token(const std::string node.pubs.push_back(std::move(data)); } else if (entity == "MS") { node.subs.push_back(std::move(data)); + } else if (entity == "SS") { + // TODO(yadunund): Service server + } else if (entity == "SC") { + // TODO(yadunund): Service client } else { - // TODO. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid entity [%s] in liveliness token", entity.c_str()); + return std::nullopt; } } return std::make_pair(std::move(entity), std::move(node)); } -} // namespace anonymous +} // namespace ///============================================================================= GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) @@ -282,11 +290,10 @@ void GraphCache::parse_put(const std::string & keyexpr) auto ns_it = graph_.find(node->ns); if (ns_it == graph_.end()) { // New namespace. - std::string ns = node->ns; std::unordered_map map = { {node->name, node} }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); + graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); } else { auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); if (!insertion.second) { @@ -371,9 +378,9 @@ void GraphCache::parse_put(const std::string & keyexpr) node->name.c_str()); return; } else if (entity == "SS") { - // Service + // TODO(yadunund): Service server } else if (entity == "SC") { - // Client + // TODO(yadunud): Service Client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -501,9 +508,9 @@ void GraphCache::parse_del(const std::string & keyexpr) } } } else if (entity == "SS") { - // Service + // TODO(yadunund): Service server } else if (entity == "SC") { - // Client + // TODO(yadunund): Service client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -539,10 +546,6 @@ rmw_ret_t GraphCache::get_node_names( for (auto it = graph_.begin(); it != graph_.end(); ++it) { nodes_number += it->second.size(); } - // TODO(Yadunund): Delete. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "nodes_number %ld", nodes_number); rcutils_ret_t rcutils_ret = rcutils_string_array_init(node_names, nodes_number, allocator); @@ -630,6 +633,33 @@ rmw_ret_t GraphCache::get_node_names( return RMW_RET_OK; } +namespace +{ +// Shamelessly copied from https://github.com/ros2/rmw_cyclonedds/blob/f7f67bdef82f59558366aa6ce94ef9af3c5ab569/rmw_cyclonedds_cpp/src/demangle.cpp#L67 +std::string +_demangle_if_ros_type(const std::string & dds_type_string) +{ + if (dds_type_string[dds_type_string.size() - 1] != '_') { + // not a ROS type + return dds_type_string; + } + + std::string substring = "dds_::"; + size_t substring_position = dds_type_string.find(substring); + if (substring_position == std::string::npos) { + // not a ROS type + return dds_type_string; + } + + std::string type_namespace = dds_type_string.substr(0, substring_position); + type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/"); + size_t start = substring_position + substring.size(); + std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start); + return type_namespace + type_name; +} + +} // namespace + ///============================================================================= rmw_ret_t GraphCache::get_topic_names_and_types( rcutils_allocator_t * allocator, @@ -642,10 +672,6 @@ rmw_ret_t GraphCache::get_topic_names_and_types( std::lock_guard lock(graph_mutex_); const size_t topic_number = graph_topics_.size(); - // TODO(Yadunund): Delete. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "topic_number %ld", topic_number); rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator); if (ret != RMW_RET_OK) { @@ -659,44 +685,13 @@ rmw_ret_t GraphCache::get_topic_names_and_types( } }); - // topic_names_and_types->names is an rcutils_string_array_t, - // while topic_names_and_types->types is an rcutils_string_array_t * - - // rcutils_ret_t rcutils_ret = - // rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator); - // if (rcutils_ret != RCUTILS_RET_OK) { - // return RMW_RET_BAD_ALLOC; - // } - auto free_topic_names = rcpputils::make_scope_exit( - [names = &topic_names_and_types->names]() { - rcutils_ret_t ret = rcutils_string_array_fini(names); - if (ret != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - } - }); - - rcutils_ret_t rcutils_ret = - rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - return RMW_RET_BAD_ALLOC; - } - auto free_topic_types = rcpputils::make_scope_exit( - [types = topic_names_and_types->types]() { - rcutils_ret_t ret = rcutils_string_array_fini(types); - if (ret != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - } - }); - // Fill topic names and types. std::size_t j = 0; for (const auto & it : graph_topics_) { // Split based on "?". // TODO(Yadunund): Be more systematic about this. + // TODO(clalancette): Rather than doing the splitting here, should we store + // it in graph_topics_ already split? std::vector parts = split_keyexpr(it.first, '?'); if (parts.size() < 2) { RCUTILS_LOG_ERROR_NAMED( @@ -704,21 +699,30 @@ rmw_ret_t GraphCache::get_topic_names_and_types( "Invalid topic_key %s", it.first.c_str()); return RMW_RET_INVALID_ARGUMENT; } - topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator); + + topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); if (!topic_names_and_types->names.data[j]) { return RMW_RET_BAD_ALLOC; } - topic_names_and_types->types->data[j] = rcutils_strdup( - parts.at(1).c_str(), *allocator); - if (!topic_names_and_types->types->data[j]) { + + // TODO(clalancette): This won't work if there are multiple types on the same topic + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[j], 1, allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); return RMW_RET_BAD_ALLOC; } + + topic_names_and_types->types[j].data[0] = rcutils_strdup( + _demangle_if_ros_type(parts[1]).c_str(), *allocator); + if (!topic_names_and_types->types[j].data[0]) { + return RMW_RET_BAD_ALLOC; + } + ++j; } cleanup_names_and_types.cancel(); - free_topic_names.cancel(); - free_topic_types.cancel(); return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index a67458b1..f2de0018 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -79,7 +79,6 @@ class PublishToken // TODO(Yadunund): Expand to services and clients. struct GraphNode { - struct TopicData { std::string topic; diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index a70b7cb4..a4c80b4e 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -41,62 +41,5 @@ rmw_get_topic_names_and_types( return node->context->impl->graph_cache.get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); - - // rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); - // if (ret != RMW_RET_OK) { - // return ret; - // } - // auto cleanup_names_and_types = rcpputils::make_scope_exit( - // [topic_names_and_types] { - // rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); - // if (fail_ret != RMW_RET_OK) { - // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); - // } - // }); - - // // topic_names_and_types->names is an rcutils_string_array_t, - // // while topic_names_and_types->types is an rcutils_string_array_t * - - // topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); - // if (topic_names_and_types->names.data[0] == nullptr) { - // RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); - // return RMW_RET_BAD_ALLOC; - // } - // auto free_names = rcpputils::make_scope_exit( - // [topic_names_and_types, allocator] { - // allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); - // }); - - // rcutils_ret_t rcutils_ret = rcutils_string_array_init( - // topic_names_and_types->types, 1, - // allocator); - // if (rcutils_ret != RCUTILS_RET_OK) { - // RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); - // return RMW_RET_BAD_ALLOC; - // } - // auto fini_string_array = rcpputils::make_scope_exit( - // [topic_names_and_types] { - // rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); - // if (fail_ret != RMW_RET_OK) { - // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); - // } - // }); - - // topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); - // if (topic_names_and_types->types[0].data[0] == nullptr) { - // RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); - // return RMW_RET_BAD_ALLOC; - // } - // auto free_types = rcpputils::make_scope_exit( - // [topic_names_and_types, allocator] { - // allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); - // }); - - // free_types.cancel(); - // fini_string_array.cancel(); - // free_names.cancel(); - // cleanup_names_and_types.cancel(); - - return RMW_RET_OK; } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index d045283c..51f22e3c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -248,7 +248,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Setup liveliness subscriptions for discovery. const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); - // Query the router/liveliness participants to get graph information before this session was started. + // Query router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Sending Query '%s' to fetch discovery data...", diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 95c59d23..8e374753 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -15,13 +15,13 @@ #include #include +#include + #include #include #include #include -#include - #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" @@ -247,7 +247,8 @@ rmw_destroy_node(rmw_node_t * node) // return RMW_RET_ERROR; // } - // Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset. + // Undeclare liveliness token for the node to advertise that the node has ridden + // off into the sunset. rmw_node_data_t * node_data = static_cast(node->data); zc_liveliness_undeclare_token(z_move(node_data->token)); @@ -1313,7 +1314,6 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { - // Publish to the graph that a subscription has ridden off into the sunset zc_liveliness_undeclare_token(z_move(sub_data->token)); From 2d8e44603a6373b1fc8c3dc1378693ec76329427 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 18 Nov 2023 09:52:22 +0800 Subject: [PATCH 12/12] Update README Signed-off-by: Yadunund --- README.md | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 7203c35c..28814060 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Build `rmw_zenoh_cpp` ```bash mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src -git clone git@github.com:ros2/rmw_zenoh.git +git clone https://github.com/ros2/rmw_zenoh.git cd ~/ws_rmw_zenoh source /opt/ros//setup.bash # replace with ROS 2 distro of choice colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release @@ -34,25 +34,42 @@ colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release ## Test -Source workspace +Make sure to source the built workspace using the commands below prior to running any other commands. ```bash cd ~/ws_rmw_zenoh source install/setup.bash ``` -In a terminal launch Zenoh router: +### Start the zenoh router +> Note: Manually launching zenoh router won't be necessary in the future. ```bash +# terminal 1 ros2 run rmw_zenoh_cpp init_rmw_zenoh_router ``` -> Note: Manually launching zenoh router won't be necessary in the future. -In a different terminal source install folder and execute: +> Note: Without the zenoh router, nodes will not be able to discover each other since multicast discovery is disabled by default in the node's session config. Instead, nodes will receive discovery information about other peers via the zenoh router's gossip functionality. See more information on the session configs [below](#config). + +### Run the `talker` +```bash +# terminal 2 +export RMW_IMPLEMENTATION=rmw_zenoh_cpp +ros2 run demo_nodes_cpp talker +``` +> Note: Ignore all the warning printouts. +### Run the `listener` ```bash +# terminal 2 export RMW_IMPLEMENTATION=rmw_zenoh_cpp -ros2 topic pub "/chatter" std_msgs/msg/String '{data: hello}' +ros2 run demo_nodes_cpp listener ``` +The listener node should start receiving messages over the `/chatter` topic. +> Note: Ignore all the warning printouts. + +### Graph introspection +Presently we only support limited `ros2cli` commands to introspect the ROS graph such as `ros2 node list` and `ros2 topic list`. + ## Config The [default configuration](rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5) sets up the zenoh sessions with the following main characteristics: @@ -69,7 +86,7 @@ A custom configuration may be provided by setting the `RMW_ZENOH_CONFIG_FILE` en ## TODO Features - [x] Publisher -- [ ] Subscription +- [x] Subscription - [ ] Client - [ ] Service - [ ] Graph introspection