From 06ad64c5855235a5012197c4bfc26566e0bdd37e Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Thu, 14 Nov 2024 13:06:44 -0800 Subject: [PATCH] Switch to std::priority_queue Signed-off-by: Christophe Bedard --- rosbag2_transport/CMakeLists.txt | 10 - rosbag2_transport/package.xml | 1 - .../locked_priority_queue.hpp | 106 ++++++++ .../rosbag2_transport/meta_priority_queue.hpp | 220 --------------- .../src/rosbag2_transport/player.cpp | 255 +++++++++++------- .../test_meta_priority_queue.cpp | 124 --------- .../test/rosbag2_transport/test_play_seek.cpp | 8 +- 7 files changed, 265 insertions(+), 459 deletions(-) create mode 100644 rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp delete mode 100644 rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp delete mode 100644 rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 07d6749a5..a3fc776d7 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -115,7 +115,6 @@ if(BUILD_TESTING) add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources") find_package(ament_cmake_gmock REQUIRED) - find_package(ament_cmake_gtest REQUIRED) find_package(ament_index_cpp REQUIRED) find_package(ament_lint_auto REQUIRED) find_package(composition_interfaces REQUIRED) @@ -537,15 +536,6 @@ if(BUILD_TESTING) rcpputils::rcpputils rosbag2_test_common::rosbag2_test_common ) - - ament_add_gtest(test_meta_priority_queue - test/rosbag2_transport/test_meta_priority_queue.cpp) - target_include_directories(test_meta_priority_queue PRIVATE - $) - target_link_libraries(test_meta_priority_queue - rcpputils::rcpputils - shared_queues_vendor::singleproducerconsumer - ) endif() ament_package() diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index 08264d209..2009da93c 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -27,7 +27,6 @@ keyboard_handler ament_cmake_gmock - ament_cmake_gtest ament_index_cpp ament_lint_auto ament_lint_common diff --git a/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp b/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp new file mode 100644 index 000000000..5bebb14f8 --- /dev/null +++ b/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp @@ -0,0 +1,106 @@ +// Copyright 2024 Apex.AI, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ +#define ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ + +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" +#include "rcpputils/unique_lock.hpp" + +/// \brief `std::priority_queue` wrapper with locks. +/// \tparam T the element type +/// \tparam Container the underlying container type +/// \tparam Compare the comparator +/// \see std::priority_queue +template +class LockedPriorityQueue +{ +public: + /// \brief Constructor. + /// \param compare the comparator object + /// \param empty_element the element to return when trying to take with an empty queue + LockedPriorityQueue(const Compare & compare, const T & empty_element) + : queue_(compare), + empty_element_{empty_element} + {} + + LockedPriorityQueue() = delete; + LockedPriorityQueue(const LockedPriorityQueue &) = delete; + LockedPriorityQueue & operator=(const LockedPriorityQueue &) = delete; + LockedPriorityQueue(const LockedPriorityQueue &&) = delete; + LockedPriorityQueue & operator=(const LockedPriorityQueue &&) = delete; + + /// \brief Insert element and sort queue. + /// \param element the element + void push(const T & element) + { + rcpputils::unique_lock lk(queue_mutex_); + queue_.push(element); + } + + /// \brief Remove the top element. + void pop() + { + rcpputils::unique_lock lk(queue_mutex_); + queue_.pop(); + } + + /// \brief Check if the queue is empty. + /// \return whether the queue is empty + bool empty() const + { + rcpputils::unique_lock lk(queue_mutex_); + return queue_.empty(); + } + + /// \brief Get the number of elements. + /// \return the number of elements + std::size_t size() const + { + rcpputils::unique_lock lk(queue_mutex_); + return queue_.size(); + } + + /// Remove all elements from the queue. + void purge() + { + rcpputils::unique_lock lk(queue_mutex_); + while (!queue_.empty()) { + queue_.pop(); + } + } + + /// \brief Try to take the top element from the queue. + /// \return the top element, or `empty_element` if the queue is empty + T take() + { + rcpputils::unique_lock lk(queue_mutex_); + if (queue_.empty()) { + return empty_element_; + } + const T e = queue_.top(); + queue_.pop(); + return e; + } + +private: + mutable std::mutex queue_mutex_; + std::priority_queue queue_ RCPPUTILS_TSA_GUARDED_BY(queue_mutex_); + const T empty_element_; +}; + +#endif // ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp b/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp deleted file mode 100644 index 4155d8a43..000000000 --- a/rosbag2_transport/src/rosbag2_transport/meta_priority_queue.hpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2024 Apex.AI, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ -#define ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ - -#include -#include -#include -#include -#include - -#include "moodycamel/readerwriterqueue.h" - -#include "rcpputils/thread_safety_annotations.hpp" - -/// Priority queue of queues. -/** - * Contains N queues. - * Elements can be enqueued into a specific queue. - * Assumes that items are enqueued into the same queue in order of priority. - * The priority value of an element is computed from a provided function. - * Highest priority means a lower priority value. - * Returns the next element from the highest-priority queue, which is determined by the queue with - * the highest-priority first element, since the first element of each queue is the highest-priority - * element of that queue. - * - * This does impose some more restrictions on concurrency compared to the underlying queue - * implementation. However, if there is only 1 underlying queue, then there is no locking. - * - * \tparam T the element type - * \tparam P the priority value type - * \see moodycamel::ReaderWriterQueue for the underlying queue - */ -template -class MetaPriorityQueue -{ -public: - /** - * Constructor. - * - * \param num_queues the number of priority queues, greather than zero - * \param element_priority_value a function to extract the priority value from an element - */ - MetaPriorityQueue( - const std::size_t num_queues, - const std::function && element_priority_value) - : queues_(num_queues), - element_priority_value_(std::move(element_priority_value)), - queue_priorities_(num_queues), - // When all queues are empty, the index value doesn't matter - priority_queue_index_(0u) - { - assert(num_queues > 0u); - // Set all queue priority values to the lowest priority, i.e., highest value - for (std::size_t i = 0; i < queues_.size(); i++) { - queue_priorities_[i] = lowest_priority_value; - } - } - - /// Enqueue element into a given queue. - /** - * Should only be called by 1 producer thread. - * Elements must be enqueued into the same queue in order of priority for elements to be - * peeked/popped in order of priority. - * No bounds checking is performed for the queue index. - * - * \param element the element to enqueue - * \param queue_index the queue to enqueue it into - * \return `true` if the element was enqueued, `false` otherwise - * \see moodycamel::ReaderWriterQueue::enqueue - */ - bool enqueue(const T & element, const std::size_t queue_index) - { - // Trivial case: 1 underlying queue - if (1u == queues_.size()) { - return queues_[queue_index].enqueue(element); - } - std::lock_guard lk(mutex_priority_); - const bool enqueued = queues_[queue_index].enqueue(element); - // If this queue was empty, update the queue's priority value with the new element - // Otherwise, it's not necessary, since the new element is supposed to be lower-priority - if (enqueued && 1u == size_approx(queue_index)) { - update_priorities(&element, queue_index); - } - return enqueued; - } - - /// Peek at the next item from the highest-priority queue. - /** - * Should only be called by 1 consumer thread. - * - * \return the next item from the highest-priority queue, or `nullptr` if there is none - * \see moodycamel::ReaderWriterQueue::peek - */ - T * peek() - { - return queues_[priority_queue_index_.load()].peek(); - } - - /// Pop the next item from the highest-priority queue. - /** - * Should only be called by 1 consumer thread. - * - * \return `true` if there was an item, `false` otherwise - * \see moodycamel::ReaderWriterQueue::pop - */ - bool pop() - { - // Trivial case: 1 underlying queue - if (1u == queues_.size()) { - return queues_[0u].pop(); - } - std::lock_guard lk(mutex_priority_); - const std::size_t priority_index = priority_queue_index_.load(); - const bool popped = queues_[priority_index].pop(); - // If we pop()ed, update priority - if (popped) { - // peek() and pop() should only be called by the 1 consumer thread, so this is OK - const T * element = queues_[priority_index].peek(); - update_priorities(element, priority_index); - } - return popped; - } - - /// Get the approximate size of the given queue. - /** - * Can be called by both the 1 producer thread and the 1 consumer thread. - * No bounds checking is performed for the queue index. - * - * \param queue_index the index of the queue - * \return the approximate size - * \see moodycamel::ReaderWriterQueue::size_approx - */ - inline std::size_t size_approx(const std::size_t queue_index) const - { - return queues_[queue_index].size_approx(); - } - - /// Get the approximate total size of all queues. - /** - * Can be called by both the 1 producer thread and the 1 consumer thread. - * The size of the underlying queues may change during this call. - * - * \return the approximate total size - * \see ::size_approx - */ - inline std::size_t size_approx() const - { - std::size_t size = 0; - for (std::size_t i = 0; i < queues_.size(); i++) { - size += size_approx(i); - } - return size; - } - -private: - /// Update the queue priority values and the priority queue index. - /** - * Update when: - * 1. Enqueuing an element into a queue that was empty - * 2. Popping an element from a queue - * - * \param element the new element (newly-enqueued or newly-front of queue), or `nullptr` if the - * queue is now empty - * \param queue_index the index of the corresponding queue - */ - void update_priorities(const T * element, const std::size_t queue_index) - RCPPUTILS_TSA_REQUIRES(priority_queue_index_) - { - // Update priority value for the given queue - if (nullptr != element) { - const P priority_value = element_priority_value_(*element); - queue_priorities_[queue_index].store(priority_value); - } else { - // Reset priority value if the queue is now empty - queue_priorities_[queue_index].store(lowest_priority_value); - } - - // Find the index of the highest-priority queue, i.e., to be used next - // Notes: - // 1. If multiple queues have the same priority value, this does favor the first queue - // 2. If all queues are empty, this will return the index of the first queue, and - // popping/peeking will just return nothing - std::size_t priority_queue_index = 0; - for (std::size_t i = 1; i < queues_.size(); i++) { - if (queue_priorities_[i] < queue_priorities_[priority_queue_index].load()) { - priority_queue_index = i; - } - } - priority_queue_index_.store(priority_queue_index); - } - - // Underlying queues - std::vector> queues_; - // Element priority value extraction function - const std::function element_priority_value_; - // Priority values of each queue - std::vector> queue_priorities_; - // Index of the current highest-priority queue - std::atomic_size_t priority_queue_index_; - // Mutex for priority queue tracking - std::mutex mutex_priority_; - - /// Priority value corresponding to the lowest priority. - static constexpr P lowest_priority_value = std::numeric_limits

::max(); -}; - -#endif // ROSBAG2_TRANSPORT__META_PRIORITY_QUEUE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index a2e5b452a..a683309c1 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -42,7 +43,7 @@ #include "rosbag2_transport/reader_writer_factory.hpp" #include "logging.hpp" -#include "meta_priority_queue.hpp" +#include "locked_priority_queue.hpp" namespace { @@ -76,6 +77,17 @@ rclcpp::QoS publisher_qos_for_topic( topic.name, rosbag2_storage::from_rclcpp_qos_vector(topic.offered_qos_profiles)); } + +/// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. +struct BagMessageRecvTimestampComparator +{ + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->recv_timestamp > r->recv_timestamp; + } +}; } // namespace namespace rosbag2_transport @@ -280,10 +292,12 @@ class PlayerImpl std::unordered_map service_clients_; private: - rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); + rosbag2_storage::SerializedBagMessageSharedPtr take_next_message_from_queue(); void load_storage_content(); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(const size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void enqueue_up_to_boundary( + const size_t boundary, + const size_t message_queue_size) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; void play_messages_from_queue(); void prepare_publishers(); @@ -312,9 +326,14 @@ class PlayerImpl Player * owner_; rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; - MetaPriorityQueue< + using BagMessageComparator = std::function< + int( + const rosbag2_storage::SerializedBagMessageSharedPtr &, + const rosbag2_storage::SerializedBagMessageSharedPtr &)>; + LockedPriorityQueue< rosbag2_storage::SerializedBagMessageSharedPtr, - rcutils_time_point_value_t> message_queue_; + std::vector, + BagMessageComparator> message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; @@ -328,6 +347,15 @@ class PlayerImpl std::thread playback_thread_; std::condition_variable playback_finished_cv_; + // Request to play next + std::atomic_bool play_next_{false}; + // Whether we're done trying to play next + std::atomic_bool finished_play_next_{false}; + std::mutex finished_play_next_mutex_; + std::condition_variable finished_play_next_cv_; + // Whether we successfully played next + std::atomic_bool play_next_result_{false}; + rcutils_time_point_value_t starting_time_; // control services @@ -360,11 +388,8 @@ PlayerImpl::PlayerImpl( : input_bags_(std::move(input_bags)), owner_(owner), play_options_(play_options), - message_queue_( - input_bags_.size(), - [](const rosbag2_storage::SerializedBagMessageSharedPtr & msg) { - return msg->recv_timestamp; - }), + // Return empty shared_ptr when taking from an empty queue + message_queue_(BagMessageRecvTimestampComparator{}, nullptr), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { @@ -515,11 +540,12 @@ bool PlayerImpl::play() load_storage_content(); }); wait_for_filled_queue(); + play_messages_from_queue(); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} - while (message_queue_.pop()) {} // cleanup queue + message_queue_.purge(); { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -530,7 +556,7 @@ bool PlayerImpl::play() RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what()); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} - while (message_queue_.pop()) {} // cleanup queue + message_queue_.purge(); } { @@ -658,10 +684,10 @@ bool PlayerImpl::set_rate(double rate) return ok; } -rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_from_queue() +rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::take_next_message_from_queue() { - rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr_ptr = message_queue_.peek(); - while (!stop_playback_ && message_ptr_ptr == nullptr && + // Wait until there's a message in the queue, unless there are no new messages or we have to stop + while (!stop_playback_ && message_queue_.empty() && !is_storage_completely_loaded() && rclcpp::ok()) { RCLCPP_WARN_THROTTLE( @@ -670,21 +696,10 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro 1000, "Message queue starved. Messages will be delayed. Consider " "increasing the --read-ahead-queue-size option."); - std::this_thread::sleep_for(std::chrono::microseconds(100)); - message_ptr_ptr = message_queue_.peek(); - } - - // Workaround for race condition between peek and is_storage_completely_loaded() - // Don't sync with mutex for the sake of the performance - if (message_ptr_ptr == nullptr) { - message_ptr_ptr = message_queue_.peek(); } - - if (message_ptr_ptr != nullptr) { - return *message_ptr_ptr; - } - return nullptr; + // Note: this only returns nullptr if no more messages + return message_queue_.take(); } bool PlayerImpl::play_next() @@ -697,15 +712,6 @@ bool PlayerImpl::play_next() // Use RCLCPP_DEBUG_STREAM to avoid delays in the burst mode RCLCPP_DEBUG_STREAM(owner_->get_logger(), "Playing next message."); - // Temporary take over playback from play_messages_from_queue() - std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); - // Check one more time that we are in pause mode after waiting on mutex. Someone could call - // resume() or stop() from another thread while we were waiting on mutex. - if (!clock_->is_paused()) { - RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); - return false; - } - skip_message_in_main_play_loop_ = true; // Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to // be called if not yet and queue to be filled with messages. { @@ -713,19 +719,19 @@ bool PlayerImpl::play_next() ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); } - rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = peek_next_message_from_queue(); - - bool next_message_published = false; - while (rclcpp::ok() && !next_message_published && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) - { - next_message_published = publish_message(message_ptr); - clock_->jump(message_ptr->recv_timestamp); + // Request to play next + play_next_ = true; - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); - } - return next_message_published; + // Wait for play next to be done, and then return the result + std::unique_lock lk(finished_play_next_mutex_); + // Temporarily resume clock to force wakeup in clock_->sleep_until(time), + // then return in pause mode to preserve original state of the player + clock_->resume(); + clock_->pause(); + finished_play_next_cv_.wait(lk, [this] {return finished_play_next_.load();}); + finished_play_next_ = false; + play_next_ = false; + return play_next_result_.exchange(false); } size_t PlayerImpl::burst(const size_t num_messages) @@ -769,7 +775,7 @@ void PlayerImpl::seek(rcutils_time_point_value_t time_point) { std::lock_guard lk(reader_mutex_); // Purge current messages in queue. - while (message_queue_.pop()) {} + message_queue_.purge(); for (const auto & [reader, _] : input_bags_) { reader->seek(time_point); } @@ -902,17 +908,8 @@ Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle() void PlayerImpl::wait_for_filled_queue() const { - const auto read_ahead_queues_filled = - [this]() { - for (std::size_t i = 0; i < input_bags_.size(); i++) { - if (message_queue_.size_approx(i) < play_options_.read_ahead_queue_size) { - return false; - } - } - return true; - }; while ( - !read_ahead_queues_filled() && + message_queue_.size() < (input_bags_.size() * play_options_.read_ahead_queue_size) && !is_storage_completely_loaded() && rclcpp::ok() && !stop_playback_) { std::this_thread::sleep_for(queue_read_wait_period_); @@ -921,7 +918,8 @@ void PlayerImpl::wait_for_filled_queue() const void PlayerImpl::load_storage_content() { - // Multiply the boundary parameters by the number of input bags + // Multiply the boundary parameters by the number of input bags, since we want enough messages + // from each bags, not just enough messages overall auto queue_lower_boundary = static_cast( input_bags_.size() * play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = input_bags_.size() * play_options_.read_ahead_queue_size; @@ -935,8 +933,11 @@ void PlayerImpl::load_storage_content() if (no_messages) { break; } - if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(queue_upper_boundary); + + // The message queue size may get smaller after this, but that's OK + const size_t message_queue_size = message_queue_.size(); + if (message_queue_size < queue_lower_boundary) { + enqueue_up_to_boundary(queue_upper_boundary, message_queue_size); } else { lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -944,17 +945,15 @@ void PlayerImpl::load_storage_content() } } -void PlayerImpl::enqueue_up_to_boundary(const size_t boundary) +void PlayerImpl::enqueue_up_to_boundary(const size_t boundary, const size_t message_queue_size) { - rosbag2_storage::SerializedBagMessageSharedPtr message; // Read messages from input bags in a round robin way size_t input_bag_index = 0u; - for (size_t i = message_queue_.size_approx(); i < boundary; i++) { + for (size_t i = message_queue_size; i < boundary; i++) { const auto & reader = input_bags_[input_bag_index].first; // We are supposed to have at least one bag with messages to read if (reader->has_next()) { - message = reader->read_next(); - message_queue_.enqueue(message, input_bag_index); + message_queue_.push(reader->read_next()); } input_bag_index = (input_bag_index + 1) % input_bags_.size(); } @@ -962,44 +961,100 @@ void PlayerImpl::enqueue_up_to_boundary(const size_t boundary) void PlayerImpl::play_messages_from_queue() { - // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) - // to support play_next() API logic. - rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = peek_next_message_from_queue(); - { // Notify play_next() that we are ready for playback - // Note: We should do notification that we are ready for playback after peeking pointer to - // the next message. message_queue_.peek() is not allowed to be called from more than one - // thread concurrently. + { // Notify play_next()/seek() that we are ready for playback std::lock_guard lk(ready_to_play_from_queue_mutex_); - is_ready_to_play_from_queue_ = true; - ready_to_play_from_queue_cv_.notify_all(); + if (!is_ready_to_play_from_queue_) { + is_ready_to_play_from_queue_ = true; + ready_to_play_from_queue_cv_.notify_all(); + } } - while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) - { - // Do not move on until sleep_until returns true - // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message_ptr->recv_timestamp)) { - if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { - break; + rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = take_next_message_from_queue(); + + // Whether we should try playing the next message + bool do_play_next = false; + + // While we haven't stopped playing + while (rclcpp::ok() && !stop_playback_) { + // If we had run out of messages before but are starting to play next again, e.g., after a + // seek() + do_play_next |= play_next_.exchange(false); + if (do_play_next) { + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + skip_message_in_main_play_loop_ = false; + cancel_wait_for_next_message_ = false; + // Only take again if we need + if (nullptr == message_ptr) { + message_ptr = take_next_message_from_queue(); } } - std::lock_guard lk(skip_message_in_main_play_loop_mutex_); - if (rclcpp::ok()) { - if (skip_message_in_main_play_loop_) { - skip_message_in_main_play_loop_ = false; - cancel_wait_for_next_message_ = false; - message_ptr = peek_next_message_from_queue(); - continue; + + // While there's a message to play and we haven't reached the end timestamp yet + while (rclcpp::ok() && !stop_playback_ && + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) + { + // Sleep until the message's replay time, do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + // However, skip sleeping if we're trying to play the next message + while (rclcpp::ok() && !stop_playback_ && !(do_play_next || play_next_.load()) && + !clock_->sleep_until(message_ptr->recv_timestamp)) + { + // Stop sleeping if cancelled + if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { + break; + } } - publish_message(message_ptr); + + do_play_next |= play_next_.exchange(false); + + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + if (rclcpp::ok()) { + // This means that the queue's top() has changed, so we need to update our message + if (skip_message_in_main_play_loop_) { + skip_message_in_main_play_loop_ = false; + cancel_wait_for_next_message_ = false; + message_ptr = take_next_message_from_queue(); + continue; + } + + const bool message_published = publish_message(message_ptr); + // If we tried to publish because of play_next(), jump the clock + if (do_play_next) { + clock_->jump(message_ptr->recv_timestamp); + // If we successfully played next, notify that we're done, otherwise keep trying + if (message_published) { + do_play_next = false; + + std::lock_guard lk(finished_play_next_mutex_); + finished_play_next_ = true; + play_next_result_ = true; + finished_play_next_cv_.notify_all(); + } + } + } + message_ptr = take_next_message_from_queue(); + } + + // At this point, we're at the end of the queue, there are no more messages to play + // If we're still trying to play next or just got a request, we did not succeed + if (play_next_.exchange(false) || do_play_next) { + do_play_next = false; + + std::lock_guard lk(finished_play_next_mutex_); + finished_play_next_ = true; + play_next_result_ = false; + finished_play_next_cv_.notify_all(); + } + + // While we're in pause state, make sure we don't return + // However, start playing over if we're trying to play next + while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) { + clock_->sleep_until(clock_->now()); + } + // If we ran out of messages and are not in pause state, it means we're done playing, + // unless play_next() is resuming and pausing the clock in order to wake us up + if (!is_paused() && !play_next_.load()) { + break; } - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); - } - // while we're in pause state, make sure we don't return - // if we happen to be at the end of queue - while (!stop_playback_ && is_paused() && rclcpp::ok()) { - clock_->sleep_until(clock_->now()); } } diff --git a/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp b/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp deleted file mode 100644 index b6070c9b7..000000000 --- a/rosbag2_transport/test/rosbag2_transport/test_meta_priority_queue.cpp +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2024 Apex.AI, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "meta_priority_queue.hpp" - -TEST(meta_priority_queue, test_queue_trivial) -{ - MetaPriorityQueue queue(1u, [](const int & v){return v;}); - - EXPECT_TRUE(queue.enqueue(1, 0u)); - EXPECT_TRUE(queue.enqueue(2, 0u)); - EXPECT_TRUE(queue.enqueue(3, 0u)); - EXPECT_TRUE(queue.enqueue(4, 0u)); - - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(2, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(3, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(4, *queue.peek()); - EXPECT_TRUE(queue.pop()); -} - -TEST(meta_priority_queue, test_queue_multi) -{ - MetaPriorityQueue queue(3u, [](const int & v){return v;}); - - // Empty - EXPECT_EQ(nullptr, queue.peek()); - EXPECT_FALSE(queue.pop()); - EXPECT_EQ(nullptr, queue.peek()); - EXPECT_EQ(0u, queue.size_approx()); - EXPECT_EQ(0u, queue.size_approx(0u)); - EXPECT_EQ(0u, queue.size_approx(1u)); - EXPECT_EQ(0u, queue.size_approx(2u)); - - // First new element - EXPECT_TRUE(queue.enqueue(2, 1u)); - EXPECT_EQ(2, *queue.peek()); - EXPECT_EQ(1u, queue.size_approx()); - EXPECT_EQ(0u, queue.size_approx(0u)); - EXPECT_EQ(1u, queue.size_approx(1u)); - EXPECT_EQ(0u, queue.size_approx(2u)); - EXPECT_EQ(2, *queue.peek()); - - // New element in different queue - EXPECT_TRUE(queue.enqueue(1, 2u)); - EXPECT_EQ(2u, queue.size_approx()); - EXPECT_EQ(0u, queue.size_approx(0u)); - EXPECT_EQ(1u, queue.size_approx(1u)); - EXPECT_EQ(1u, queue.size_approx(2u)); - EXPECT_EQ(1, *queue.peek()); - - // New element in same queue - EXPECT_TRUE(queue.enqueue(3, 2u)); - - // Pop - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(2, *queue.peek()); - EXPECT_TRUE(queue.pop()); - std::cout << "line 57" << std::endl; - EXPECT_EQ(3, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(nullptr, queue.peek()); - EXPECT_FALSE(queue.pop()); - EXPECT_EQ(0u, queue.size_approx()); - - // Enqueue then dequeue a bunch - EXPECT_TRUE(queue.enqueue(1, 0u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(4, 2u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(6, 2u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(2, 1u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(3, 0u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(5, 0u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(3, 1u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.enqueue(7, 1u)); - EXPECT_EQ(1, *queue.peek()); - EXPECT_EQ(8u, queue.size_approx()); - EXPECT_EQ(3u, queue.size_approx(0u)); - EXPECT_EQ(3u, queue.size_approx(1u)); - EXPECT_EQ(2u, queue.size_approx(2u)); - - EXPECT_EQ(1, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(2, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(3, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(3, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(4, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(5, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(6, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(7, *queue.peek()); - EXPECT_TRUE(queue.pop()); - EXPECT_EQ(nullptr, queue.peek()); - EXPECT_FALSE(queue.pop()); - EXPECT_EQ(0u, queue.size_approx()); -} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp index 65359904e..347a3e67c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp @@ -103,7 +103,7 @@ class RosBag2PlaySeekTestFixture }; TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time) { - const size_t expected_number_of_messages = num_msgs_in_bag_ + 2; + const size_t expected_number_of_messages = num_msgs_in_bag_ + 4; auto player = std::make_shared(std::move(reader_), storage_options_, play_options_); sub_ = std::make_shared(); @@ -212,7 +212,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); EXPECT_EQ(replayed_topic1[0]->int32_value, 1); EXPECT_EQ(replayed_topic1[1]->int32_value, 3); @@ -254,7 +254,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); for (size_t i = 0; i < num_msgs_in_bag_; i++) { EXPECT_EQ(replayed_topic1[i]->int32_value, static_cast(i + 1)) << "i=" << i; @@ -300,7 +300,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); for (size_t i = 0; i < replayed_topic1.size(); i++) { EXPECT_EQ(replayed_topic1[i]->int32_value, static_cast(i + 1)) << "i=" << i;