Skip to content

Commit

Permalink
chore: apply the QoS mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Jan 22, 2025
1 parent 066fce3 commit 9f72ee8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
16 changes: 8 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,25 +107,25 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default();
using AdvancedPublisherOptions = zenoh::ext::SessionExt::AdvancedPublisherOptions;
auto adv_pub_opts = AdvancedPublisherOptions::create_default();

// Create a Publication Cache if durability is transient_local.
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
// Retransmission can only be done if history is enabled on subscriber side.
// Allow this publisher to be detected through liveliness.
adv_pub_opts.publisher_detection = true;
}

if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
|| adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
adv_pub_opts.cache =
zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default();
adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default();
adv_pub_opts.cache->max_samples = adapted_qos_profile.depth;
}

if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
// Allow this publisher to be detected through liveliness.
adv_pub_opts.sample_miss_detection = true;
// Heartbeat
// Allow matching Subscribers to detect lost samples and ask for retransimission.
adv_pub_opts.sample_miss_detection = AdvancedPublisherOptions::SampleMissDetectionOptions::create_default();
// The period of publisher heartbeats in ms, used by ``AdvancedSubscriber`` with heartbeat-based recovery enabled for missed sample retransimission.
adv_pub_opts.sample_miss_detection->heartbeat_period_ms = 1000;
}

zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
Expand Down
16 changes: 12 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ bool SubscriptionData::init()

sess_ = context_impl->session();

auto adv_sub_opts = zenoh::ext::SessionExt::AdvancedSubscriberOptions::create_default();
using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions;
auto adv_sub_opts = AdvancedSubscriberOptions::create_default();

// Instantiate the subscription with suitable options depending on the
// adapted_qos_profile.
Expand All @@ -179,11 +180,18 @@ bool SubscriptionData::init()
adv_sub_opts.subscriber_detection = true;
adv_sub_opts.query_timeout_ms = std::numeric_limits<uint64_t>::max();
// History can only be retransmitted by Publishers that enable caching.
adv_sub_opts.history =
zenoh::ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions::create_default();
adv_sub_opts.history = AdvancedSubscriberOptions::HistoryOptions::create_default();
// Enable detection of late joiner publishers and query for their historical data.
adv_sub_opts.history->detect_late_publishers = true;
adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth;
adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{};

}

if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
// Retransmission of detected lost Samples. This requires publishers to enable caching and sample_miss_detection.
adv_sub_opts.recovery = AdvancedSubscriberOptions::RecoveryOptions::create_default();
// Heartbeat-based last sample detection.
adv_sub_opts.recovery->last_sample_miss_detection = AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
}

std::weak_ptr<SubscriptionData> data_wp = shared_from_this();
Expand Down

0 comments on commit 9f72ee8

Please sign in to comment.