Skip to content

Commit

Permalink
Implement basic publisher introspection
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Nov 15, 2023
1 parent eb69560 commit f92dd56
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 67 deletions.
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>yaml_cpp_vendor</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
139 changes: 126 additions & 13 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,16 @@ GraphCache::remove_subscription(uint64_t handle)
///=============================================================================
namespace
{
std::vector<std::string> split_keyexpr(const std::string & keyexpr)
std::vector<std::string> split_keyexpr(
const std::string & keyexpr,
const char delim = '/')
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) {
if (*it == '/') {
if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
Expand Down Expand Up @@ -319,7 +321,6 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
return std::nullopt;
}


GraphNode node;
// Nodes with empty namespaces will contain a "_".
node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3);
Expand All @@ -333,7 +334,7 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
return std::nullopt;
}
GraphNode::PubSubData data;
data.topic = std::move(parts[5]);
data.topic = "/" + std::move(parts[5]);
data.type = std::move(parts[6]);
data.qos = std::move(parts[7]);

Expand Down Expand Up @@ -376,12 +377,12 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second) {
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.",
"rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.",
node->name.c_str());
}
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node %s to the graph.",
"rmw_zenoh_cpp", "Added node /%s to the graph.",
node->name.c_str());
return;

Expand All @@ -400,11 +401,20 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second && !node->pubs.empty()) {
// Node already exists so just append the publisher.
insertion.first->second->pubs.push_back(std::move(node->pubs[0]));
insertion.first->second->pubs.push_back(node->pubs[0]);
}
}
// Bookkeeping
// TODO(Yadunund): Be more systematic about generating the key.
std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type;
auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), 1));
if (!insertion.second) {
// Such a topic already exists so we just increment its count.
++insertion.first->second;
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added publisher to node %s in graph.",
"rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.",
node->pubs.at(0).topic.c_str(),
node->name.c_str());
return;
} else if (entity == "MS") {
Expand Down Expand Up @@ -440,7 +450,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node %s from the graph.",
"Removed node /%s from the graph.",
node->name.c_str()
);
} else if (entity == "MP") {
Expand Down Expand Up @@ -469,9 +479,22 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
if (erase_it != found_node->pubs.end()) {
found_node->pubs.erase(erase_it);
// Bookkeeping
// TODO(Yadunund): Be more systematic about generating the key.
std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type;
auto topic_it = graph_topics_.find(topic_key);
if (topic_it != graph_topics_.end()) {
if (topic_it->second == 1) {
// The last publisher was removed so we can delete this entry.
graph_topics_.erase(topic_key);
} else {
// Else we just decrement the count.
--topic_it->second;
}
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed publisher %s from node %s in the graph.",
"Removed publisher %s from node /%s in the graph.",
node->pubs.at(0).topic.c_str(),
node->name.c_str()
);
Expand Down Expand Up @@ -575,9 +598,6 @@ rmw_ret_t GraphCache::get_node_names(
std::move(free_enclaves_lambda));
}

// TODO(Yadunund): Remove this printout.
// const std::string & graph_str = YAML::Dump(graph_);
// RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str());
// Fill node names, namespaces and enclaves.
std::size_t j = 0;
for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) {
Expand Down Expand Up @@ -612,3 +632,96 @@ rmw_ret_t GraphCache::get_node_names(

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::get_topic_names_and_types(
rcutils_allocator_t * allocator,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
static_cast<void>(no_demangle);
RCUTILS_CHECK_ALLOCATOR_WITH_MSG(
allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT);

std::lock_guard<std::mutex> lock(graph_mutex_);
const size_t topic_number = graph_topics_.size();
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"topic_number %ld", topic_number);

rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator);
if (ret != RMW_RET_OK) {
return ret;
}
auto cleanup_names_and_types = rcpputils::make_scope_exit(
[topic_names_and_types] {
rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
if (fail_ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
}
});

// topic_names_and_types->names is an rcutils_string_array_t,
// while topic_names_and_types->types is an rcutils_string_array_t *

// rcutils_ret_t rcutils_ret =
// rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// return RMW_RET_BAD_ALLOC;
// }
auto free_topic_names = rcpputils::make_scope_exit(
[names = &topic_names_and_types->names]() {
rcutils_ret_t ret = rcutils_string_array_fini(names);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
return RMW_RET_BAD_ALLOC;
}
auto free_topic_types = rcpputils::make_scope_exit(
[types = topic_names_and_types->types]() {
rcutils_ret_t ret = rcutils_string_array_fini(types);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

// Fill topic names and types.
std::size_t j = 0;
for (const auto & it : graph_topics_) {
// Split based on "?".
// TODO(Yadunund): Be more systematic about this.
std::vector<std::string> parts = split_keyexpr(it.first, '?');
if (parts.size() < 2) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid topic_key %s", it.first.c_str());
return RMW_RET_INVALID_ARGUMENT;
}
topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator);
if (!topic_names_and_types->names.data[j]) {
return RMW_RET_BAD_ALLOC;
}
topic_names_and_types->types->data[j] = rcutils_strdup(
parts.at(1).c_str(), *allocator);
if (!topic_names_and_types->types->data[j]) {
return RMW_RET_BAD_ALLOC;
}
++j;
}

cleanup_names_and_types.cancel();
free_topic_names.cancel();
free_topic_types.cancel();

return RMW_RET_OK;
}
15 changes: 13 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rcutils/allocator.h"
#include "rcutils/types.h"

#include "rmw/rmw.h"
#include "rmw/names_and_types.h"

#include "yaml-cpp/yaml.h"

///=============================================================================
class GenerateToken
Expand Down Expand Up @@ -152,6 +153,11 @@ class GraphCache final
rcutils_string_array_t * enclaves,
rcutils_allocator_t * allocator) const;

rmw_ret_t get_topic_names_and_types(
rcutils_allocator_t * allocator,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types);

private:
std::mutex publishers_mutex_;
uint64_t publishers_handle_id_{0};
Expand Down Expand Up @@ -183,7 +189,12 @@ class GraphCache final
node_n:
*/
// Map namespace to a map of <node_name, GraphNodePtr>.
std::unordered_map<std::string, std::unordered_map<std::string, GraphNodePtr>> graph_;
std::unordered_map<std::string, std::unordered_map<std::string, GraphNodePtr>> graph_ = {};
// Optimize published topic lookups by caching an unordered_map<string, size_t>
// where key is topic_name+topic_type and the value the count.
// TODO(Yadunund): Use a single map for both published and subscribed topics.
// Consider changing value to pair<size_t, size_t> for pub,sub resp.
std::unordered_map<std::string, std::size_t> graph_topics_ = {};
mutable std::mutex graph_mutex_;
};

Expand Down
110 changes: 59 additions & 51 deletions rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "detail/rmw_data_types.hpp"

#include "rcutils/strdup.h"

#include "rmw/error_handling.h"
Expand All @@ -31,63 +33,69 @@ rmw_get_topic_names_and_types(
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
static_cast<void>(node);
static_cast<void>(no_demangle);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT);

return node->context->impl->graph_cache.get_topic_names_and_types(
allocator, no_demangle, topic_names_and_types);

rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
if (ret != RMW_RET_OK) {
return ret;
}
auto cleanup_names_and_types = rcpputils::make_scope_exit(
[topic_names_and_types] {
rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
if (fail_ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
}
});
// rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
// if (ret != RMW_RET_OK) {
// return ret;
// }
// auto cleanup_names_and_types = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
// }
// });

// topic_names_and_types->names is an rcutils_string_array_t,
// while topic_names_and_types->types is an rcutils_string_array_t *
// // topic_names_and_types->names is an rcutils_string_array_t,
// // while topic_names_and_types->types is an rcutils_string_array_t *

topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
if (topic_names_and_types->names.data[0] == nullptr) {
RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
return RMW_RET_BAD_ALLOC;
}
auto free_names = rcpputils::make_scope_exit(
[topic_names_and_types, allocator] {
allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
});
// topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
// if (topic_names_and_types->names.data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_names = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
// });

rcutils_ret_t rcutils_ret = rcutils_string_array_init(
topic_names_and_types->types, 1,
allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
return RMW_RET_BAD_ALLOC;
}
auto fini_string_array = rcpputils::make_scope_exit(
[topic_names_and_types] {
rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
if (fail_ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
}
});
// rcutils_ret_t rcutils_ret = rcutils_string_array_init(
// topic_names_and_types->types, 1,
// allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
// return RMW_RET_BAD_ALLOC;
// }
// auto fini_string_array = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
// }
// });

topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
if (topic_names_and_types->types[0].data[0] == nullptr) {
RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
return RMW_RET_BAD_ALLOC;
}
auto free_types = rcpputils::make_scope_exit(
[topic_names_and_types, allocator] {
allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
});
// topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
// if (topic_names_and_types->types[0].data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_types = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
// });

free_types.cancel();
fini_string_array.cancel();
free_names.cancel();
cleanup_names_and_types.cancel();
// free_types.cancel();
// fini_string_array.cancel();
// free_names.cancel();
// cleanup_names_and_types.cancel();

return RMW_RET_OK;
}
Expand Down

0 comments on commit f92dd56

Please sign in to comment.