Skip to content

Commit

Permalink
Update graph cache with publisher data
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Nov 15, 2023
1 parent a6ce6e5 commit eb69560
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 9 deletions.
54 changes: 54 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ std::string GenerateToken::publisher(
{
std::string token = generate_base_token("MP", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Publisher: %s\n", token.c_str());
return token;
}

Expand Down Expand Up @@ -386,6 +387,26 @@ void GraphCache::parse_put(const std::string & keyexpr)

} else if (entity == "MP") {
// Publisher
auto ns_it = graph_.find(node->ns);
if (ns_it == graph_.end()) {
// Potential edge case where a liveliness update for a node creation was missed.
// So we add the node here.
std::string ns = node->ns;
std::unordered_map<std::string, GraphNodePtr> map = {
{node->name, node}
};
graph_.insert(std::make_pair(std::move(ns), std::move(map)));
} else {
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second && !node->pubs.empty()) {
// Node already exists so just append the publisher.
insertion.first->second->pubs.push_back(std::move(node->pubs[0]));
}
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added publisher to node %s in graph.",
node->name.c_str());
return;
} else if (entity == "MS") {
// Subscription
} else if (entity == "SS") {
Expand Down Expand Up @@ -424,6 +445,39 @@ void GraphCache::parse_del(const std::string & keyexpr)
);
} else if (entity == "MP") {
// Publisher
if (node->pubs.empty()) {
// This should never happen but we make sure _parse_token() has no error.
return;
}
auto ns_it = graph_.find(node->ns);
if (ns_it != graph_.end()) {
auto node_it = ns_it->second.find(node->name);
if (node_it != ns_it->second.end()) {
const auto found_node = node_it->second;
// Here we iterate throught the list of publishers and remove the one
// with matching name, type and qos.
// TODO(Yadunund): This can be more optimal than O(n) with some caching.
auto erase_it = found_node->pubs.begin();
for (; erase_it != found_node->pubs.end(); ++erase_it) {
const auto & pub = *erase_it;
if (pub.topic == node->pubs.at(0).topic &&
pub.type == node->pubs.at(0).type &&
pub.qos == node->pubs.at(0).qos)
{
break;
}
}
if (erase_it != found_node->pubs.end()) {
found_node->pubs.erase(erase_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed publisher %s from node %s in the graph.",
node->pubs.at(0).topic.c_str(),
node->name.c_str()
);
}
}
}
} else if (entity == "MS") {
// Subscription
} else if (entity == "SS") {
Expand Down
43 changes: 34 additions & 9 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ rmw_create_node(
node->context = context;

// Publish to the graph that a new node is in town
const bool result = PublishToken::put(
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::node(context->actual_domain_id, namespace_, name)
);
if (!result) {
if (!pub_result) {
return nullptr;
}

Expand All @@ -211,11 +211,11 @@ rmw_destroy_node(rmw_node_t * node)
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

// Publish to the graph that a node has ridden off into the sunset
const bool result = PublishToken::del(
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
);
if (!result) {
if (!del_result) {
return RMW_RET_ERROR;
}

Expand Down Expand Up @@ -518,6 +518,19 @@ rmw_create_publisher(

// Publish to the graph that a new publisher is in town
// TODO(Yadunund): Publish liveliness for the new publisher.
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable")
);
if (!pub_result) {
return nullptr;
}

publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher(
rmw_publisher->topic_name, node->name, node->namespace_,
Expand Down Expand Up @@ -558,15 +571,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)

rmw_ret_t ret = RMW_RET_OK;

// Publish to the graph that a publisher has ridden off into the sunset
// TODO(Yadunund): Publish liveliness for the deleted publisher.

rcutils_allocator_t * allocator = &node->context->options.allocator;

allocator->deallocate(const_cast<char *>(publisher->topic_name), allocator->state);

auto publisher_data = static_cast<rmw_publisher_data_t *>(publisher->data);
if (publisher_data != nullptr) {
// Publish to the graph that a publisher has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable"
)
);
if (!del_result) {
// TODO(Yadunund): Should this really return an error?
return RMW_RET_ERROR;
}
node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);

RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
Expand All @@ -577,6 +601,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
}
allocator->deallocate(publisher_data, allocator->state);
}
allocator->deallocate(const_cast<char *>(publisher->topic_name), allocator->state);
allocator->deallocate(publisher, allocator->state);

return ret;
Expand Down

0 comments on commit eb69560

Please sign in to comment.