Skip to content

Commit

Permalink
Resolve merge conflicts after fixes to services
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 19, 2024
2 parents 594bb97 + 0e7a380 commit 804e915
Show file tree
Hide file tree
Showing 17 changed files with 469 additions and 316 deletions.
18 changes: 0 additions & 18 deletions rmw_zenoh_config.json5

This file was deleted.

42 changes: 5 additions & 37 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ void GraphCache::parse_put(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -173,14 +173,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
topic_data_insertion.first->second->stats_.sub_count_ += sub_count;
}
}

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added %s on topic %s with type %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
graph_node.name_.c_str());
};

// Helper lambda to convert an Entity into a GraphNode.
Expand Down Expand Up @@ -213,10 +205,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
NodeMap node_map = {
{entity.node_name(), make_graph_node(entity, *this)}};
graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
return;
}

Expand All @@ -238,14 +226,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
} else {
if (insertion_it == ns_it->second.end()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an "
Expand Down Expand Up @@ -369,8 +350,8 @@ void GraphCache::parse_del(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -409,14 +390,6 @@ void GraphCache::parse_del(const std::string & keyexpr)

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Removed %s on topic %s with type %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
graph_node.name_.c_str());
};

// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -483,11 +456,6 @@ void GraphCache::parse_del(const std::string & keyexpr)
remove_topics(graph_node->clients_, EntityType::Client, *this);
}
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
entity.node_name().c_str()
);
return;
}

Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct TopicData
using TopicDataPtr = std::shared_ptr<TopicData>;

///=============================================================================
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
Expand All @@ -82,7 +81,6 @@ struct GraphNode
// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ bool PublishToken::put(
RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str());

z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) {
Expand Down Expand Up @@ -465,7 +465,7 @@ bool PublishToken::del(
RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str());

const z_delete_options_t options = z_delete_options_default();
if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) {
RCUTILS_SET_ERROR_MSG("failed to delete liveliness key");
Expand Down
58 changes: 48 additions & 10 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include <zenoh.h>

#include <cstring>
#include <mutex>
#include <optional>
#include <utility>

#include "rcpputils/scope_exit.hpp"
Expand Down Expand Up @@ -63,12 +65,12 @@ void sub_data_handler(
sub_data->adapted_qos_profile.depth,
z_loan(keystr));

std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.back());
std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.front());
z_drop(&old->payload);
sub_data->message_queue.pop_back();
sub_data->message_queue.pop_front();
}

sub_data->message_queue.emplace_front(
sub_data->message_queue.emplace_back(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id));
Expand All @@ -81,13 +83,24 @@ void sub_data_handler(
}
}

ZenohQuery::ZenohQuery(const z_query_t * query)
{
query_ = z_query_clone(query);
}

ZenohQuery::~ZenohQuery()
{
z_drop(z_move(query_));
}

const z_query_t ZenohQuery::get_query() const
{
return z_query_loan(&query_);
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
Expand All @@ -108,7 +121,7 @@ void service_data_handler(const z_query_t * query, void * data)
// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.push_back(z_query_clone(query));
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
Expand All @@ -119,6 +132,31 @@ void service_data_handler(const z_query_t * query, void * data)
}
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
{
reply_ = *reply;
}

ZenohReply::~ZenohReply()
{
z_reply_drop(z_move(reply_));
}

std::optional<z_sample_t> ZenohReply::get_sample() const
{
if (z_reply_is_ok(&reply_)) {
return z_reply_ok(&reply_);
}

return std::nullopt;
}

size_t rmw_client_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex);
return sequence_number++;
}

//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
Expand All @@ -145,9 +183,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
Expand Down
41 changes: 35 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -147,10 +148,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data);

///==============================================================================

struct rmw_service_data_t
class ZenohQuery final
{
std::size_t get_new_uid();
public:
ZenohQuery(const z_query_t * query);

~ZenohQuery();

const z_query_t get_query() const;

private:
z_owned_query_t query_;
};

struct rmw_service_data_t
{
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

Expand All @@ -170,11 +182,11 @@ struct rmw_service_data_t
rmw_context_t * context;

// Deque to store the queries in the order they arrive.
std::deque<z_owned_query_t> query_queue;
std::deque<std::unique_ptr<ZenohQuery>> query_queue;
std::mutex query_queue_mutex;

// Map to store the sequence_number -> query_id
std::map<int64_t, z_owned_query_t> sequence_to_query_map;
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;

std::mutex internal_mutex;
Expand All @@ -183,6 +195,19 @@ struct rmw_service_data_t

///==============================================================================

class ZenohReply final
{
public:
ZenohReply(const z_owned_reply_t * reply);

~ZenohReply();

std::optional<z_sample_t> get_sample() const;

private:
z_owned_reply_t reply_;
};

struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;
Expand All @@ -195,8 +220,8 @@ struct rmw_client_data_t
// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;
std::mutex replies_mutex;
std::deque<std::unique_ptr<ZenohReply>> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand All @@ -209,6 +234,10 @@ struct rmw_client_data_t
std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

uint8_t client_guid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();
std::mutex sequence_number_mutex;
size_t sequence_number{1};
};

Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config)
if (zenoh_config_path[0] != '\0') {
// If the environment variable is set, try to read the configuration from the file.
*config = zc_config_from_file(zenoh_config_path);
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar,
zenoh_config_path);
} else {
// If the environment variable is not set use internal configuration
static const std::string path_to_config_folder =
ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/";
const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName;
*config = zc_config_from_file(default_zconfig_path.c_str());
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using default zenoh configuration file at '%s'", default_zconfig_path.c_str());
}

// Verify that the configuration is valid.
Expand Down
10 changes: 1 addition & 9 deletions rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session)
// Define callback
auto callback = [](const struct z_id_t * id, void * ctx) {
const std::string id_str = zid_to_str(*id);
RCUTILS_LOG_INFO_NAMED(
"ZenohRouterCheck",
"A Zenoh router connected to the session with id '%s'", id_str.c_str());
// Note: Callback is guaranteed to never be called
// concurrently according to z_info_routers_zid docstring
(*(static_cast<int *>(ctx)))++;
};

rmw_ret_t ret;
rmw_ret_t ret = RMW_RET_OK;
z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context);
if (z_info_routers_zid(session, z_move(router_callback))) {
RCUTILS_LOG_ERROR_NAMED(
Expand All @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session)
"ZenohRouterCheck",
"No Zenoh router connected to the session");
ret = RMW_RET_ERROR;
} else {
RCUTILS_LOG_INFO_NAMED(
"ZenohRouterCheck",
"There are %d Zenoh routers connected to the session", context);
ret = RMW_RET_OK;
}
}

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "rmw/error_handling.h"
#include "rmw/event.h"
#include "rmw/types.h"

#include "detail/identifier.hpp"

Expand Down
Loading

0 comments on commit 804e915

Please sign in to comment.