Skip to content

Commit

Permalink
Minor tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Dec 6, 2024
1 parent 84d1267 commit 08b6f53
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 98 deletions.
89 changes: 60 additions & 29 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,33 @@
#include "rmw/types.h"

#include "attachment_helpers.hpp"
#include "liveliness_utils.hpp"

namespace rmw_zenoh_cpp
{

attachment_data_t::attachment_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE);
}

attachment_data_t::attachment_data_t(attachment_data_t && data)
///=============================================================================
AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE])
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp)
{
sequence_number = std::move(data.sequence_number);
source_timestamp = std::move(data.source_timestamp);
memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE);
memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE);
gid_hash_ = hash_gid(source_gid_);
}

void attachment_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment)
///=============================================================================
AttachmentData::AttachmentData(AttachmentData && data)
: sequence_number_(std::move(data.sequence_number_)),
source_timestamp_(std::move(data.source_timestamp_)),
gid_hash_(std::move(data.gid_hash_))
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE);
}

attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
///=============================================================================
AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment)
{
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;
Expand All @@ -71,7 +63,7 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
throw std::runtime_error("sequence_number is not found in the attachment.");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) {
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

Expand All @@ -84,7 +76,7 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
throw std::runtime_error("source_timestamp is not found in the attachment");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) {
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

Expand All @@ -101,7 +93,46 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));
gid_hash_ = hash_gid(this->source_gid_);
}

///=============================================================================
int64_t AttachmentData::sequence_number() const
{
return sequence_number_;
}

///=============================================================================
int64_t AttachmentData::source_timestamp() const
{
return source_timestamp_;
}

///=============================================================================
void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE);
}

///=============================================================================
size_t AttachmentData::gid_hash() const
{
return gid_hash_;
}

///=============================================================================
void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment)
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
}
} // namespace rmw_zenoh_cpp
31 changes: 19 additions & 12 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@

namespace rmw_zenoh_cpp
{

class attachment_data_t final
///=============================================================================
class AttachmentData final
{
public:
explicit attachment_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);
explicit attachment_data_t(const z_loaned_bytes_t *);
explicit attachment_data_t(attachment_data_t && data);

int64_t sequence_number;
int64_t source_timestamp;
uint8_t source_gid[RMW_GID_STORAGE_SIZE];
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE]);
explicit AttachmentData(const z_loaned_bytes_t *);
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number() const;
int64_t source_timestamp() const;
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
size_t gid_hash() const;

void serialize_to_zbytes(z_owned_bytes_t *);

private:
int64_t sequence_number_;
int64_t source_timestamp_;
uint8_t source_gid_[RMW_GID_STORAGE_SIZE];
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp

Expand Down
44 changes: 31 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

z_owned_reply_t owned_reply;
z_reply_clone(&owned_reply, reply);
client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}
Expand Down Expand Up @@ -227,6 +225,7 @@ ClientData::ClientData(
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
{
// Do nothing.
Expand All @@ -235,8 +234,10 @@ ClientData::ClientData(
///=============================================================================
bool ClientData::init(const z_loaned_session_t * session)
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) {
if (z_keyexpr_from_str(
&this->keyexpr_,
this->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK)
{
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return false;
}
Expand All @@ -260,8 +261,15 @@ bool ClientData::init(const z_loaned_session_t * session)
"Unable to create liveliness token for the client.");
return false;
}
auto free_token = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->token_));
});

initialized_ = true;

free_ros_keyexpr.cancel();
free_token.cancel();

return true;
}
Expand All @@ -273,6 +281,16 @@ liveliness::TopicInfo ClientData::topic_info() const
return entity_->topic_info().value();
}

///=============================================================================
bool ClientData::liveliness_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
// The z_check function is now internal in zenoh-1.0.0 so we assume
// the liveliness token is still initialized as long as this entity has
// not been shutdown.
return !is_shutdown_;
}

///=============================================================================
void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
Expand Down Expand Up @@ -353,18 +371,18 @@ rmw_ret_t ClientData::take_response(
}

// Fill in the request_header
rmw_zenoh_cpp::attachment_data_t attachment(z_sample_attachment(sample));
request_header->request_id.sequence_number = attachment.sequence_number;
AttachmentData attachment(z_sample_attachment(sample));
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp;
request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE);
attachment.copy_gid(request_header->request_id.writer_guid);
request_header->received_timestamp = latest_reply->get_received_timestamp();

z_drop(z_move(payload));
Expand Down Expand Up @@ -426,7 +444,7 @@ rmw_ret_t ClientData::send_request(
z_owned_bytes_t attachment;
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
rmw_zenoh_cpp::create_map_and_set_sequence_num(
create_map_and_set_sequence_num(
&attachment, *sequence_id,
local_gid);
opts.attachment = z_move(attachment);
Expand All @@ -441,7 +459,6 @@ rmw_ret_t ClientData::send_request(
// which optimizes bandwidth. The default is "None", which imples replies may come in any order
// and any number.
opts.consolidation = z_query_consolidation_latest();

z_owned_bytes_t payload;
z_bytes_copy_from_buf(
&payload, reinterpret_cast<const uint8_t *>(request_bytes), data_length);
Expand Down Expand Up @@ -513,9 +530,10 @@ void ClientData::_shutdown()
}

// Unregister this node from the ROS graph.
z_liveliness_undeclare_token(z_move(token_));

z_drop(z_move(keyexpr_));
if (initialized_) {
z_liveliness_undeclare_token(z_move(token_));
z_drop(z_move(keyexpr_));
}

is_shutdown_ = true;
}
Expand Down
5 changes: 5 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Get a copy of the TopicInfo of this ClientData.
liveliness::TopicInfo topic_info() const;

// Returns true if liveliness token is still valid.
bool liveliness_is_valid() const;

// Copy the GID of this ClientData into an rmw_gid_t.
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

Expand Down Expand Up @@ -147,6 +150,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
size_t sequence_number_;
// Shutdown flag.
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,14 @@ class rmw_context_impl_s::Data final
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
auto undeclare_z_sub = rcpputils::make_scope_exit(
[this]() {
z_undeclare_subscriber(z_move(this->graph_subscriber_));
});

close_session.cancel();
free_shm_provider.cancel();
undeclare_z_sub.cancel();
}

// Shutdown the Zenoh session.
Expand Down Expand Up @@ -304,7 +309,7 @@ class rmw_context_impl_s::Data final
}

// Check that the Zenoh session is still valid.
if (!session_is_valid()) {
if (z_session_is_closed(z_loan(session_))) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
Expand Down
5 changes: 5 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ std::shared_ptr<NodeData> NodeData::make(
"Unable to create liveliness token for the node.");
return nullptr;
}
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});

free_token.cancel();
return std::shared_ptr<NodeData>(
new NodeData{
node,
Expand Down
19 changes: 15 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,14 @@ std::shared_ptr<PublisherData> PublisherData::make(
"Unable to create liveliness token for the publisher.");
return nullptr;
}
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});

undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();
free_token.cancel();

return std::shared_ptr<PublisherData>(
new PublisherData{
Expand Down Expand Up @@ -343,9 +348,7 @@ rmw_ret_t PublisherData::publish_serialized_message(

std::lock_guard<std::mutex> lock(mutex_);


const size_t data_length = ser.get_serialized_data_length();

// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
Expand All @@ -355,9 +358,7 @@ rmw_ret_t PublisherData::publish_serialized_message(
entity_->copy_gid(local_gid);
z_owned_bytes_t attachment;
create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid);

options.attachment = z_move(attachment);

z_owned_bytes_t payload;
z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length);

Expand Down Expand Up @@ -397,6 +398,16 @@ void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
entity_->copy_gid(out_gid);
}

///=============================================================================
bool PublisherData::liveliness_is_valid() const
{
std::lock_guard<std::mutex> lock(mutex_);
// The z_check function is now internal in zenoh-1.0.0 so we assume
// the liveliness token is still initialized as long as this entity has
// not been shutdown.
return !is_shutdown_;
}

///=============================================================================
std::shared_ptr<EventsManager> PublisherData::events_mgr() const
{
Expand Down
Loading

0 comments on commit 08b6f53

Please sign in to comment.