diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 4fc96805..da2571ce 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -23,41 +23,33 @@ #include "rmw/types.h" #include "attachment_helpers.hpp" +#include "liveliness_utils.hpp" namespace rmw_zenoh_cpp { - -attachment_data_t::attachment_data_t( - const int64_t _sequence_number, - const int64_t _source_timestamp, - const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]) -{ - sequence_number = _sequence_number; - source_timestamp = _source_timestamp; - memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE); -} - -attachment_data_t::attachment_data_t(attachment_data_t && data) +///============================================================================= +AttachmentData::AttachmentData( + const int64_t sequence_number, + const int64_t source_timestamp, + const uint8_t source_gid[RMW_GID_STORAGE_SIZE]) +: sequence_number_(sequence_number), + source_timestamp_(source_timestamp) { - sequence_number = std::move(data.sequence_number); - source_timestamp = std::move(data.source_timestamp); - memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE); + memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE); + gid_hash_ = hash_gid(source_gid_); } -void attachment_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) +///============================================================================= +AttachmentData::AttachmentData(AttachmentData && data) +: sequence_number_(std::move(data.sequence_number_)), + source_timestamp_(std::move(data.source_timestamp_)), + gid_hash_(std::move(data.gid_hash_)) { - ze_owned_serializer_t serializer; - ze_serializer_empty(&serializer); - ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); - ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE); - ze_serializer_finish(z_move(serializer), attachment); + memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE); } -attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) +///============================================================================= +AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment) { ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); z_owned_string_t key; @@ -71,7 +63,7 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) throw std::runtime_error("sequence_number is not found in the attachment."); } z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) { + if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) { throw std::runtime_error("Failed to deserialize the sequence_number."); } @@ -84,7 +76,7 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) throw std::runtime_error("source_timestamp is not found in the attachment"); } z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) { + if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) { throw std::runtime_error("Failed to deserialize the source_timestamp."); } @@ -101,7 +93,46 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) { throw std::runtime_error("The length of source_gid mismatched."); } - memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); + memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); z_drop(z_move(slice)); + gid_hash_ = hash_gid(this->source_gid_); +} + +///============================================================================= +int64_t AttachmentData::sequence_number() const +{ + return sequence_number_; +} + +///============================================================================= +int64_t AttachmentData::source_timestamp() const +{ + return source_timestamp_; +} + +///============================================================================= +void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +{ + memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE); +} + +///============================================================================= +size_t AttachmentData::gid_hash() const +{ + return gid_hash_; +} + +///============================================================================= +void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment) +{ + ze_owned_serializer_t serializer; + ze_serializer_empty(&serializer); + ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); + ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE); + ze_serializer_finish(z_move(serializer), attachment); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 7d7af015..ec460620 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -21,22 +21,29 @@ namespace rmw_zenoh_cpp { - -class attachment_data_t final +///============================================================================= +class AttachmentData final { public: - explicit attachment_data_t( - const int64_t _sequence_number, - const int64_t _source_timestamp, - const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); - explicit attachment_data_t(const z_loaned_bytes_t *); - explicit attachment_data_t(attachment_data_t && data); - - int64_t sequence_number; - int64_t source_timestamp; - uint8_t source_gid[RMW_GID_STORAGE_SIZE]; + AttachmentData( + const int64_t sequence_number, + const int64_t source_timestamp, + const uint8_t source_gid[RMW_GID_STORAGE_SIZE]); + explicit AttachmentData(const z_loaned_bytes_t *); + explicit AttachmentData(AttachmentData && data); + + int64_t sequence_number() const; + int64_t source_timestamp() const; + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + size_t gid_hash() const; void serialize_to_zbytes(z_owned_bytes_t *); + +private: + int64_t sequence_number_; + int64_t source_timestamp_; + uint8_t source_gid_[RMW_GID_STORAGE_SIZE]; + size_t gid_hash_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 1966a3d1..a95c98a2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -80,8 +80,6 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - z_owned_reply_t owned_reply; - z_reply_clone(&owned_reply, reply); client_data->add_new_reply( std::make_unique(reply, received_timestamp)); } @@ -227,6 +225,7 @@ ClientData::ClientData( wait_set_data_(nullptr), sequence_number_(1), is_shutdown_(false), + initialized_(false), num_in_flight_(0) { // Do nothing. @@ -235,8 +234,10 @@ ClientData::ClientData( ///============================================================================= bool ClientData::init(const z_loaned_session_t * session) { - std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; - if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) { + if (z_keyexpr_from_str( + &this->keyexpr_, + this->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) + { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } @@ -260,8 +261,15 @@ bool ClientData::init(const z_loaned_session_t * session) "Unable to create liveliness token for the client."); return false; } + auto free_token = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->token_)); + }); + + initialized_ = true; free_ros_keyexpr.cancel(); + free_token.cancel(); return true; } @@ -273,6 +281,16 @@ liveliness::TopicInfo ClientData::topic_info() const return entity_->topic_info().value(); } +///============================================================================= +bool ClientData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. + return !is_shutdown_; +} + ///============================================================================= void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { @@ -353,18 +371,18 @@ rmw_ret_t ClientData::take_response( } // Fill in the request_header - rmw_zenoh_cpp::attachment_data_t attachment(z_sample_attachment(sample)); - request_header->request_id.sequence_number = attachment.sequence_number; + AttachmentData attachment(z_sample_attachment(sample)); + request_header->request_id.sequence_number = attachment.sequence_number(); if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - request_header->source_timestamp = attachment.source_timestamp; + request_header->source_timestamp = attachment.source_timestamp(); if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); + attachment.copy_gid(request_header->request_id.writer_guid); request_header->received_timestamp = latest_reply->get_received_timestamp(); z_drop(z_move(payload)); @@ -426,7 +444,7 @@ rmw_ret_t ClientData::send_request( z_owned_bytes_t attachment; uint8_t local_gid[RMW_GID_STORAGE_SIZE]; entity_->copy_gid(local_gid); - rmw_zenoh_cpp::create_map_and_set_sequence_num( + create_map_and_set_sequence_num( &attachment, *sequence_id, local_gid); opts.attachment = z_move(attachment); @@ -441,7 +459,6 @@ rmw_ret_t ClientData::send_request( // which optimizes bandwidth. The default is "None", which imples replies may come in any order // and any number. opts.consolidation = z_query_consolidation_latest(); - z_owned_bytes_t payload; z_bytes_copy_from_buf( &payload, reinterpret_cast(request_bytes), data_length); @@ -513,9 +530,10 @@ void ClientData::_shutdown() } // Unregister this node from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); - - z_drop(z_move(keyexpr_)); + if (initialized_) { + z_liveliness_undeclare_token(z_move(token_)); + z_drop(z_move(keyexpr_)); + } is_shutdown_ = true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index b2cbbbdf..bc737435 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -60,6 +60,9 @@ class ClientData final : public std::enable_shared_from_this // Get a copy of the TopicInfo of this ClientData. liveliness::TopicInfo topic_info() const; + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + // Copy the GID of this ClientData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; @@ -147,6 +150,8 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed3bd9a9..9458eb45 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -210,9 +210,14 @@ class rmw_context_impl_s::Data final RMW_SET_ERROR_MSG("unable to create zenoh subscription"); throw std::runtime_error("Unable to subscribe to ROS graph updates."); } + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); close_session.cancel(); free_shm_provider.cancel(); + undeclare_z_sub.cancel(); } // Shutdown the Zenoh session. @@ -304,7 +309,7 @@ class rmw_context_impl_s::Data final } // Check that the Zenoh session is still valid. - if (!session_is_valid()) { + if (z_session_is_closed(z_loan(session_))) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create NodeData as Zenoh session is invalid."); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 8a8e0440..cb4817a5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -67,7 +67,12 @@ std::shared_ptr NodeData::make( "Unable to create liveliness token for the node."); return nullptr; } + auto free_token = rcpputils::make_scope_exit( + [&token]() { + z_drop(z_move(token)); + }); + free_token.cancel(); return std::shared_ptr( new NodeData{ node, diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 622de2f9..d1449b63 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -181,9 +181,14 @@ std::shared_ptr PublisherData::make( "Unable to create liveliness token for the publisher."); return nullptr; } + auto free_token = rcpputils::make_scope_exit( + [&token]() { + z_drop(z_move(token)); + }); undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); + free_token.cancel(); return std::shared_ptr( new PublisherData{ @@ -343,9 +348,7 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); - const size_t data_length = ser.get_serialized_data_length(); - // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. @@ -355,9 +358,7 @@ rmw_ret_t PublisherData::publish_serialized_message( entity_->copy_gid(local_gid); z_owned_bytes_t attachment; create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - z_owned_bytes_t payload; z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); @@ -397,6 +398,16 @@ void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const entity_->copy_gid(out_gid); } +///============================================================================= +bool PublisherData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. + return !is_shutdown_; +} + ///============================================================================= std::shared_ptr PublisherData::events_mgr() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 21bbe620..bde0c13b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -70,6 +70,9 @@ class PublisherData final // Copy the GID of this PublisherData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + // Get the events manager of this PublisherData. std::shared_ptr events_mgr() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index a31cdf2b..c7be982c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -158,21 +158,29 @@ std::shared_ptr ServiceData::make( // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_query_t callback; - z_closure(&callback, service_data_handler, nullptr, service_data.get()); - z_view_keyexpr_t service_ke; - service_data->keyexpr_ = service_data->entity_->topic_info()->topic_keyexpr_; - if (z_view_keyexpr_from_str(&service_ke, service_data->keyexpr_.c_str()) != Z_OK) { + if (z_keyexpr_from_str( + &service_data->keyexpr_, + service_data->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) + { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [service_data]() { + if (service_data != nullptr) { + z_drop(z_move(service_data->keyexpr_)); + } + }); + + z_owned_closure_query_t callback; + z_closure(&callback, service_data_handler, nullptr, service_data.get()); // Configure the queryable to process complete queries. z_queryable_options_t qable_options; z_queryable_options_default(&qable_options); qable_options.complete = true; if (z_declare_queryable( - session, &service_data->qable_, z_loan(service_ke), + session, &service_data->qable_, z_loan(service_data->keyexpr_), z_move(callback), &qable_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); @@ -180,7 +188,9 @@ std::shared_ptr ServiceData::make( } auto undeclare_z_queryable = rcpputils::make_scope_exit( [service_data]() { - z_undeclare_queryable(z_move(service_data->qable_)); + if (service_data != nullptr) { + z_undeclare_queryable(z_move(service_data->qable_)); + } }); std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); @@ -205,6 +215,9 @@ std::shared_ptr ServiceData::make( return nullptr; } + service_data->initialized_ = true; + + free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); free_token.cancel(); @@ -226,7 +239,8 @@ ServiceData::ServiceData( request_type_support_(std::move(request_type_support)), response_type_support_(std::move(response_type_support)), wait_set_data_(nullptr), - is_shutdown_(false) + is_shutdown_(false), + initialized_(false) { // Do nothing. } @@ -238,6 +252,16 @@ liveliness::TopicInfo ServiceData::topic_info() const return entity_->topic_info().value(); } +///============================================================================= +bool ServiceData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. + return !is_shutdown_; +} + ///============================================================================= void ServiceData::add_new_query(std::unique_ptr query) { @@ -255,12 +279,15 @@ void ServiceData::add_new_query(std::unique_ptr query) query_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth + z_view_string_t keystr; + z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " - "for service for %s", + "for service for %.*s", adapted_qos_profile.depth, - keyexpr_.c_str()); + static_cast(z_string_len(z_loan(keystr))), + z_string_data(z_loan(keystr))); query_queue_.pop_front(); } query_queue_.emplace_back(std::move(query)); @@ -313,17 +340,17 @@ rmw_ret_t ServiceData::take_request( // Fill in the request header. // Get the sequence_number out of the attachment - rmw_zenoh_cpp::attachment_data_t attachment(z_query_attachment(loaned_query)); + AttachmentData attachment(z_query_attachment(loaned_query)); - request_header->request_id.sequence_number = attachment.sequence_number; + request_header->request_id.sequence_number = attachment.sequence_number(); if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); + attachment.copy_gid(request_header->request_id.writer_guid); - request_header->source_timestamp = attachment.source_timestamp; + request_header->source_timestamp = attachment.source_timestamp(); if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; @@ -433,10 +460,8 @@ rmw_ret_t ServiceData::send_response( z_owned_bytes_t payload; z_bytes_copy_from_buf( &payload, reinterpret_cast(response_bytes), data_length); - z_view_keyexpr_t service_ke; - z_view_keyexpr_from_str(&service_ke, keyexpr_.c_str()); z_query_reply( - loaned_query, z_loan(service_ke), z_move(payload), &options); + loaned_query, z_loan(keyexpr_), z_move(payload), &options); return RMW_RET_OK; } @@ -495,8 +520,11 @@ rmw_ret_t ServiceData::shutdown() } // Unregister this node from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); - z_undeclare_queryable(z_move(qable_)); + if (initialized_) { + z_drop(z_move(keyexpr_)); + z_liveliness_undeclare_token(z_move(token_)); + z_undeclare_queryable(z_move(qable_)); + } is_shutdown_ = true; return RMW_RET_OK; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index ba36e6e5..75467494 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -59,6 +59,9 @@ class ServiceData final // Get a copy of the TopicInfo of this ServiceData. liveliness::TopicInfo topic_info() const; + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + // Add a new ZenohQuery to the queue. void add_new_query(std::unique_ptr query); @@ -107,8 +110,8 @@ class ServiceData final const rmw_node_t * rmw_node_; // The Entity generated for the service. std::shared_ptr entity_; - // The keyexpr string. - std::string keyexpr_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. @@ -129,6 +132,8 @@ class ServiceData final DataCallbackManager data_callback_mgr_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; }; using ServiceDataPtr = std::shared_ptr; using ServiceDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 41e9bf2d..f0036606 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -60,7 +60,7 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) return; } - attachment_data_t attachment(z_sample_attachment(sample)); + AttachmentData attachment(z_sample_attachment(sample)); const z_loaned_bytes_t * payload = z_sample_payload(sample); z_owned_slice_t slice; @@ -81,7 +81,7 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) SubscriptionData::Message::Message( z_owned_slice_t p, uint64_t recv_ts, - attachment_data_t && attachment_) + AttachmentData && attachment_) : payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } @@ -355,6 +355,16 @@ liveliness::TopicInfo SubscriptionData::topic_info() const return entity_->topic_info().value(); } +///============================================================================= +bool SubscriptionData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. + return !is_shutdown_; +} + ///============================================================================= std::shared_ptr SubscriptionData::events_mgr() const { @@ -483,13 +493,13 @@ rmw_ret_t SubscriptionData::take_one_message( } if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number(); // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); + msg_data->attachment.copy_gid(message_info->publisher_gid.data); message_info->from_intra_process = false; } @@ -530,13 +540,13 @@ rmw_ret_t SubscriptionData::take_serialized_message( *taken = true; if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number(); // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); + msg_data->attachment.copy_gid(message_info->publisher_gid.data); message_info->from_intra_process = false; } @@ -574,11 +584,11 @@ void SubscriptionData::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = hash_gid(msg->attachment.source_gid); + const size_t gid_hash = msg->attachment.gid_hash(); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( - msg->attachment.sequence_number - + msg->attachment.sequence_number() - last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; @@ -592,7 +602,7 @@ void SubscriptionData::add_new_message( } } // Always update the last known sequence number for the publisher. - last_known_published_msg_[gid_hash] = msg->attachment.sequence_number; + last_known_published_msg_[gid_hash] = msg->attachment.sequence_number(); message_queue_.emplace_back(std::move(msg)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 3d1ccede..84d9a488 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -51,13 +51,13 @@ class SubscriptionData final : public std::enable_shared_from_this events_mgr() const; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 2537466b..f295befc 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -55,7 +55,6 @@ rmw_ret_t _get_z_config( "rmw_zenoh_cpp", "Envar %s cannot be read.", envar_name); return RMW_RET_ERROR; } - // If the environment variable is set, try to read the configuration from the file, // if the environment variable is not set use internal configuration configured_uri = envar_uri[0] != '\0' ? envar_uri : default_uri; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index d1ab4651..77bb4dc2 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -32,7 +32,7 @@ void create_map_and_set_sequence_num( auto now_ns = std::chrono::duration_cast(now); int64_t source_timestamp = now_ns.count(); - rmw_zenoh_cpp::attachment_data_t data(sequence_number, source_timestamp, gid); + AttachmentData data(sequence_number, source_timestamp, gid); data.serialize_to_zbytes(out_bytes); } @@ -52,7 +52,10 @@ std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const } ///============================================================================= -ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));} +ZenohQuery::~ZenohQuery() +{ + z_drop(z_move(query_)); +} ///============================================================================= const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} @@ -67,7 +70,10 @@ ZenohReply::ZenohReply( } ///============================================================================= -ZenohReply::~ZenohReply() {z_drop(z_move(reply_));} +ZenohReply::~ZenohReply() +{ + z_drop(z_move(reply_)); +} ///============================================================================= std::optional ZenohReply::get_sample() const diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index bab759a5..1e13a710 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include + #include #include @@ -750,7 +751,7 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (pub_data->is_shutdown()) { + if (!pub_data->liveliness_is_valid()) { return RMW_RET_ERROR; }