Skip to content

Commit

Permalink
Create querying subscriber if qos is transient local
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 11, 2024
1 parent 8281049 commit b7e329f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 36 deletions.
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(rmw_dds_common REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

Expand Down Expand Up @@ -57,6 +58,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
rmw_dds_common::rmw_dds_common_library
zenohc::lib
)

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>rmw_dds_common</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
4 changes: 3 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <variant>
#include <vector>

#include "rcutils/allocator.h"
Expand Down Expand Up @@ -110,7 +111,8 @@ struct saved_msg_data
///==============================================================================
struct rmw_subscription_data_t
{
z_owned_subscriber_t sub;
// An owned subscriber or querying_subscriber depending on the QoS settings.
std::variant<z_owned_subscriber_t, ze_owned_querying_subscriber_t> sub;

// Liveliness token for the subscription.
zc_owned_liveliness_token_t token;
Expand Down
106 changes: 71 additions & 35 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
#include "rmw/validate_namespace.h"
#include "rmw/validate_node_name.h"

#include "rmw_dds_common/qos.hpp"

namespace
{

Expand Down Expand Up @@ -1136,20 +1138,15 @@ rmw_create_subscription(
}
RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr);
// Adapt any 'best available' QoS options
// rmw_qos_profile_t adapted_qos_profile = *qos_profile;
// rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription(
// node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic);
// if (RMW_RET_OK != ret) {
// return nullptr;
// }
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription(
node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic);
if (RMW_RET_OK != ret) {
RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile.");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);

// Check RMW QoS
// if (!is_valid_qos(*qos_profile)) {
// RMW_SET_ERROR_MSG("create_subscription() called with invalid QoS");
// return nullptr;
// }

const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports);
if (type_support == nullptr) {
// error was already set by find_message_type_support
Expand Down Expand Up @@ -1215,15 +1212,6 @@ rmw_create_subscription(
rmw_subscription_data_t);
});

// Set the reliability of the subscription options based on qos_profile.
// The default options will be initialized with Best Effort reliability.
auto sub_options = z_subscriber_options_default();
sub_data->reliable = false;
if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
}

sub_data->typesupport_identifier = type_support->typesupport_identifier;
sub_data->type_support_impl = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
Expand Down Expand Up @@ -1290,19 +1278,54 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return nullptr;
}
sub_data->sub = z_declare_subscriber(
z_loan(context_impl->session),
z_loan(keyexpr),
z_move(callback),
&sub_options
);
if (!z_check(sub_data->sub)) {
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
// Instantiate the subscription with suitable options depending on the
// adapted_qos_profile.
// TODO(Yadunund): Rely on a separate function to return the sub
// as we start supporting more qos settings.
sub_data->reliable = false;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default();
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE;
}
sub_data->sub = ze_declare_querying_subscriber(
z_loan(context_impl->session),
z_loan(keyexpr),
z_move(callback),
&sub_options
);
if (!z_check(std::get<ze_owned_querying_subscriber_t>(sub_data->sub))) {
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
}
// Create a regular subscriber for all other durability settings.
else {
z_subscriber_options_t sub_options = z_subscriber_options_default();
if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
}
sub_data->sub = z_declare_subscriber(
z_loan(context_impl->session),
z_loan(keyexpr),
z_move(callback),
&sub_options
);
if (!z_check(std::get<z_owned_subscriber_t>(sub_data->sub))) {
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
}

auto undeclare_z_sub = rcpputils::make_scope_exit(
[sub_data]() {
z_undeclare_subscriber(z_move(sub_data->sub));
// TODO(Yadunund): Check if this is okay or if it is better
// to cast into explicit types and call appropriate undeclare method.
// See rmw_destroy_subscription()
z_drop(z_move(sub_data->sub));
});

// Publish to the graph that a new subscription is in town
Expand Down Expand Up @@ -1379,11 +1402,22 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
allocator->deallocate(sub_data->type_support, allocator->state);

if (z_undeclare_subscriber(z_move(sub_data->sub))) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
ret = RMW_RET_ERROR;
z_owned_subscriber_t * sub = std::get_if<z_owned_subscriber_t>(&sub_data->sub);
if (sub != nullptr) {
if (z_undeclare_subscriber(sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
ret = RMW_RET_ERROR;
}
} else {
ze_owned_querying_subscriber_t * querying_sub =
std::get_if<ze_owned_querying_subscriber_t>(&sub_data->sub);
if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
ret = RMW_RET_ERROR;
}
}


RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
allocator->deallocate(sub_data, allocator->state);
}
Expand Down Expand Up @@ -1428,6 +1462,8 @@ rmw_subscription_get_actual_qos(

qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE :
RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
qos->durability = std::holds_alternative<ze_owned_querying_subscriber_t>(sub_data->sub) ?
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE;
return RMW_RET_OK;
}

Expand Down Expand Up @@ -2030,7 +2066,7 @@ rmw_send_request(

// Send request
z_get_options_t opts = z_get_options_default();
opts.target = Z_QUERY_TARGET_ALL;
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data);
Expand Down

0 comments on commit b7e329f

Please sign in to comment.