diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 1640788f..5a1937c8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -107,25 +107,25 @@ std::shared_ptr PublisherData::make( return nullptr; } - auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default(); + using AdvancedPublisherOptions = zenoh::ext::SessionExt::AdvancedPublisherOptions; + auto adv_pub_opts = AdvancedPublisherOptions::create_default(); - // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - // Retransmission can only be done if history is enabled on subscriber side. + // Allow this publisher to be detected through liveliness. adv_pub_opts.publisher_detection = true; } if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL || adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - adv_pub_opts.cache = - zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); + adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default(); adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; } if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - // Allow this publisher to be detected through liveliness. - adv_pub_opts.sample_miss_detection = true; - // Heartbeat + // Allow matching Subscribers to detect lost samples and ask for retransimission. + adv_pub_opts.sample_miss_detection = AdvancedPublisherOptions::SampleMissDetectionOptions::create_default(); + // The period of publisher heartbeats in ms, used by ``AdvancedSubscriber`` with heartbeat-based recovery enabled for missed sample retransimission. + adv_pub_opts.sample_miss_detection->heartbeat_period_ms = 1000; } zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 79a9eea1..cdf0d36c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -168,7 +168,8 @@ bool SubscriptionData::init() sess_ = context_impl->session(); - auto adv_sub_opts = zenoh::ext::SessionExt::AdvancedSubscriberOptions::create_default(); + using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions; + auto adv_sub_opts = AdvancedSubscriberOptions::create_default(); // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. @@ -179,11 +180,18 @@ bool SubscriptionData::init() adv_sub_opts.subscriber_detection = true; adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); // History can only be retransmitted by Publishers that enable caching. - adv_sub_opts.history = - zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default(); + adv_sub_opts.history = AdvancedSubscriberOptions::HistoryOptions::create_default(); + // Enable detection of late joiner publishers and query for their historical data. adv_sub_opts.history->detect_late_publishers = true; adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; - adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{}; + + } + + if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + // Retransmission of detected lost Samples. This requires publishers to enable caching and sample_miss_detection. + adv_sub_opts.recovery = AdvancedSubscriberOptions::RecoveryOptions::create_default(); + // Heartbeat-based last sample detection. + adv_sub_opts.recovery->last_sample_miss_detection = AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{}; } std::weak_ptr data_wp = shared_from_this();