From ebf43507f13a139759cb67ec5c9d0004c6decf09 Mon Sep 17 00:00:00 2001 From: Patrick Roncagliolo Date: Wed, 29 Nov 2023 17:14:48 +0100 Subject: [PATCH] Make `rosbag2_transport::Player::play()` run in a separate thread (#1503) * Play spawns a separate thread Signed-off-by: Patrick Roncagliolo * Address multiple race conditions in player and some renames after review Signed-off-by: Michael Orlov * Fixes in the rosbag2_transport tests - Fixed multiple race conditions in tests when wait_for_playback_to_end() calls in a separate thread before Player::play() method. Those race conditions would likely lead to a false negative or flaky tests. - Replaced player_future with wait_for_playback_to_end() to the direct call of the player_->wait_for_playback_to_end() in the most of the places. Signed-off-by: Michael Orlov * Rename wait_for_playback_to_end() to the wait_for_playback_to_finish() Signed-off-by: Michael Orlov * Use rcpputils::unique_lock for RCPPUTILS TSA guarded mutexes - Rationale: To support Clang Thread Safety Analysis - Removed locally defined TSAUniqueLock which is a duplicate of the rcpputils::unique_lock. - Also removed some unused includes in player.cpp and player.hpp Signed-off-by: Michael Orlov * Add timeout parameter to the wait_for_playback_to_finish(timeout) - Rationale: Make wait_for_playback_to_finish(..) to be optionally as a non-blocking call Signed-off-by: Michael Orlov --------- Signed-off-by: Patrick Roncagliolo Signed-off-by: Michael Orlov Co-authored-by: Michael Orlov (cherry picked from commit 1a52da8c06ec65838db6b90e3c85624197106f90) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp --- rosbag2_py/src/rosbag2_py/_transport.cpp | 7 +- .../include/rosbag2_transport/player.hpp | 11 +- .../src/rosbag2_transport/player.cpp | 559 +++++++++++++++++- .../rosbag2_play_duration_until_fixture.hpp | 1 + .../test/rosbag2_transport/test_burst.cpp | 25 +- .../test_keyboard_controls.cpp | 8 +- .../test/rosbag2_transport/test_play.cpp | 50 +- .../rosbag2_transport/test_play_callbacks.cpp | 17 +- .../rosbag2_transport/test_play_duration.cpp | 13 +- .../test/rosbag2_transport/test_play_loop.cpp | 6 +- .../test/rosbag2_transport/test_play_next.cpp | 20 +- .../test_play_publish_clock.cpp | 4 +- .../test/rosbag2_transport/test_play_seek.cpp | 20 +- .../rosbag2_transport/test_play_services.cpp | 15 +- .../rosbag2_transport/test_play_timing.cpp | 10 + .../test_play_topic_remap.cpp | 7 +- .../rosbag2_transport/test_play_until.cpp | 13 +- .../rosbag2_transport/test_player_stop.cpp | 18 +- 18 files changed, 653 insertions(+), 151 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 0237bf5cfa..7d08916b8d 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -155,6 +155,7 @@ class Player exec.spin(); }); player->play(); + player->wait_for_playback_to_finish(); exec.cancel(); spin_thread.join(); @@ -175,15 +176,11 @@ class Player [&exec]() { exec.spin(); }); - auto play_thread = std::thread( - [&player]() { - player->play(); - }); + player->play(); player->burst(num_messages); exec.cancel(); spin_thread.join(); - play_thread.join(); } }; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 97884e65f8..b2bb492b97 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -110,9 +109,19 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC virtual ~Player(); + /// \brief Start playback asynchronously in a separate thread + /// \return false if playback thread already running, otherwise true ROSBAG2_TRANSPORT_PUBLIC bool play(); + /// \brief Waits on the condition variable until the play thread finishes. + /// @param timeout Maximum time in the fraction of seconds to wait for player to finish. + /// If timeout is negative, the wait_for_playback_to_finish will be a blocking call. + /// @return true if playback finished during timeout, otherwise false. + ROSBAG2_TRANSPORT_PUBLIC + bool wait_for_playback_to_finish( + std::chrono::duration timeout = std::chrono::seconds(-1)); + /// \brief Unpause if in pause mode, stop playback and exit from play. ROSBAG2_TRANSPORT_PUBLIC void stop(); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index f767cab6ab..84147b5273 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -17,42 +17,30 @@ #include #include #include -#include #include #include #include #include +#include #include "rcl/graph.h" #include "rclcpp/rclcpp.hpp" - +#include "rcpputils/unique_lock.hpp" #include "rcutils/time.h" #include "rosbag2_cpp/clocks/time_controller_clock.hpp" #include "rosbag2_cpp/reader.hpp" -#include "rosbag2_cpp/typesupport_helpers.hpp" - #include "rosbag2_storage/storage_filter.hpp" +<<<<<<< HEAD #include "rosbag2_transport/qos.hpp" +======= +#include "rosbag2_storage/qos.hpp" +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) namespace { -/** - * Trivial std::unique_lock wrapper providing constructor that allows Clang Thread Safety Analysis. - * The std::unique_lock does not have these annotations. - */ -class RCPPUTILS_TSA_SCOPED_CAPABILITY TSAUniqueLock : public std::unique_lock -{ -public: - explicit TSAUniqueLock(std::mutex & mu) RCPPUTILS_TSA_ACQUIRE(mu) - : std::unique_lock(mu) - {} - - ~TSAUniqueLock() RCPPUTILS_TSA_RELEASE() {} -}; - /** * Determine which QoS to offer for a topic. * The priority of the profile selected is: @@ -126,7 +114,233 @@ Player::Player( node_name, node_options) {} +<<<<<<< HEAD Player::Player( +======= + bool play(); + + /// \brief Unpause if in pause mode, stop playback and exit from play. + void stop(); + + // Playback control interface + /// Pause the flow of time for playback. + virtual void pause(); + + /// Start the flow of time for playback. + virtual void resume(); + + /// Pause if time running, resume if paused. + void toggle_paused(); + + /// Return whether the playback is currently paused. + bool is_paused() const; + + /// Return current playback rate. + double get_rate() const; + + /// \brief Set the playback rate. + /// \return false if an invalid value was provided (<= 0). + virtual bool set_rate(double); + + /// \brief Playing next message from queue when in pause. + /// \details This is blocking call and it will wait until next available message will be + /// published or rclcpp context shut down. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return true if player in pause mode and successfully played next message, otherwise false. + virtual bool play_next(); + + /// \brief Burst the next \p num_messages messages from the queue when paused. + /// \param num_messages The number of messages to burst from the queue. Specifying zero means no + /// limit (i.e. burst the entire bag). + /// \details This call will play the next \p num_messages from the queue in burst mode. The + /// timing of the messages is ignored. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return The number of messages that was played. + virtual size_t burst(const size_t num_messages); + + /// \brief Advance player to the message with closest timestamp >= time_point. + /// \details This is blocking call and it will wait until current message will be published + /// and message queue will be refilled. + /// If time_point is before the beginning of the bag, then playback time will be set to the + /// beginning of the bag. + /// If time_point is after the end of the bag, playback time will be set to the end of the bag, + /// which will then end playback, or if loop is enabled then will start playing at the beginning + /// of the next loop. + /// \param time_point Time point in ROS playback timeline. + void seek(rcutils_time_point_value_t time_point); + + /// \brief Adding callable object as handler for pre-callback on play message. + /// \param callback Callable which will be called before next message will be published. + /// \note In case of registering multiple callbacks later-registered callbacks will be called + /// first. + /// \return Returns newly created callback handle if callback was successfully added, + /// otherwise returns invalid_callback_handle. + callback_handle_t add_on_play_message_pre_callback(const play_msg_callback_t & callback); + + /// \brief Adding callable object as handler for post-callback on play message. + /// \param callback Callable which will be called after next message will be published. + /// \note In case of registering multiple callbacks later-registered callbacks will be called + /// first. + /// \return Returns newly created callback handle if callback was successfully added, + /// otherwise returns invalid_callback_handle. + callback_handle_t add_on_play_message_post_callback(const play_msg_callback_t & callback); + + /// \brief Delete pre or post on play message callback from internal player lists. + /// \param handle Callback's handle returned from #add_on_play_message_pre_callback or + /// #add_on_play_message_post_callback + void delete_on_play_message_callback(const callback_handle_t & handle); + + /// \brief Getter for publishers corresponding to each topic + /// \return Hashtable representing topic to publisher map excluding inner clock_publisher + std::unordered_map> get_publishers(); + + /// \brief Getter for inner clock_publisher + /// \return Shared pointer to the inner clock_publisher + rclcpp::Publisher::SharedPtr get_clock_publisher(); + + /// \brief Blocks and wait on condition variable until first message will be taken from read + /// queue + void wait_for_playback_to_start(); + + /// \brief Waits on the condition variable until the play thread finishes. + /// @param timeout Maximum time in the fraction of seconds to wait for player to finish. + /// If timeout is negative, the wait_for_playback_to_finish will be a blocking call. + /// @return true if playback finished during timeout, otherwise false. + bool wait_for_playback_to_finish( + std::chrono::duration timeout = std::chrono::seconds(-1)); + + /// \brief Getter for the number of registered on_play_msg_pre_callbacks + /// \return Number of registered on_play_msg_pre_callbacks + size_t get_number_of_registered_on_play_msg_pre_callbacks(); + + /// \brief Getter for the number of registered on_play_msg_post_callbacks + /// \return Number of registered on_play_msg_post_callbacks + size_t get_number_of_registered_on_play_msg_post_callbacks(); + +protected: + struct play_msg_callback_data + { + callback_handle_t handle; + play_msg_callback_t callback; + }; + + std::mutex on_play_msg_callbacks_mutex_; + std::forward_list on_play_msg_pre_callbacks_; + std::forward_list on_play_msg_post_callbacks_; + + class PlayerPublisher final + { +public: + explicit PlayerPublisher( + std::shared_ptr pub, + bool disable_loan_message) + : publisher_(std::move(pub)) + { + using std::placeholders::_1; + if (disable_loan_message || !publisher_->can_loan_messages()) { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); + } + } + + ~PlayerPublisher() = default; + + void publish(const rclcpp::SerializedMessage & message) + { + publish_func_(message); + } + + std::shared_ptr generic_publisher() + { + return publisher_; + } + +private: + std::shared_ptr publisher_; + std::function publish_func_; + }; + bool is_ready_to_play_from_queue_{false}; + std::mutex ready_to_play_from_queue_mutex_; + std::condition_variable ready_to_play_from_queue_cv_; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::unordered_map> publishers_; + +private: + rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); + void load_storage_content(); + bool is_storage_completely_loaded() const; + void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void wait_for_filled_queue() const; + void play_messages_from_queue(); + void prepare_publishers(); + bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); + static callback_handle_t get_new_on_play_msg_callback_handle(); + void add_key_callback( + KeyboardHandler::KeyCode key, + const std::function & cb, + const std::string & op_name); + void add_keyboard_callbacks(); + void create_control_services(); + void configure_play_until_timestamp(); + bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + + static constexpr double read_ahead_lower_bound_percentage_ = 0.9; + static const std::chrono::milliseconds queue_read_wait_period_; + std::atomic_bool cancel_wait_for_next_message_{false}; + std::atomic_bool stop_playback_{false}; + + std::mutex reader_mutex_; + std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); + + void publish_clock_update(); + void publish_clock_update(const rclcpp::Time & time); + + Player * owner_; + rosbag2_storage::StorageOptions storage_options_; + rosbag2_transport::PlayOptions play_options_; + rcutils_time_point_value_t play_until_timestamp_ = -1; + moodycamel::ReaderWriterQueue message_queue_; + mutable std::future storage_loading_future_; + std::atomic_bool load_storage_content_{true}; + std::unordered_map topic_qos_profile_overrides_; + std::unique_ptr clock_; + std::shared_ptr clock_publish_timer_; + std::mutex skip_message_in_main_play_loop_mutex_; + bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( + skip_message_in_main_play_loop_mutex_) = false; + std::mutex is_in_playback_mutex_; + std::atomic_bool is_in_playback_{false} RCPPUTILS_TSA_GUARDED_BY(is_in_playback_mutex_); + std::thread playback_thread_; + std::condition_variable playback_finished_cv_; + + rcutils_time_point_value_t starting_time_; + + // control services + rclcpp::Service::SharedPtr srv_pause_; + rclcpp::Service::SharedPtr srv_resume_; + rclcpp::Service::SharedPtr srv_toggle_paused_; + rclcpp::Service::SharedPtr srv_is_paused_; + rclcpp::Service::SharedPtr srv_get_rate_; + rclcpp::Service::SharedPtr srv_set_rate_; + rclcpp::Service::SharedPtr srv_play_; + rclcpp::Service::SharedPtr srv_play_next_; + rclcpp::Service::SharedPtr srv_burst_; + rclcpp::Service::SharedPtr srv_seek_; + rclcpp::Service::SharedPtr srv_stop_; + + rclcpp::Publisher::SharedPtr split_event_pub_; + + // defaults + std::shared_ptr keyboard_handler_; + std::vector keyboard_callbacks_; +}; + +PlayerImpl::PlayerImpl( + Player * owner, +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) std::unique_ptr reader, std::shared_ptr keyboard_handler, const rosbag2_storage::StorageOptions & storage_options, @@ -176,6 +390,7 @@ Player::~Player() // Force to stop playback to avoid hangout in case of unexpected exception or when smart // pointer to the player object goes out of scope stop(); + // remove callbacks on key_codes to prevent race conditions // Note: keyboard_handler handles locks between removing & executing callbacks for (auto cb_handle : keyboard_callbacks_) { @@ -203,9 +418,20 @@ bool Player::is_storage_completely_loaded() const bool Player::play() { +<<<<<<< HEAD if (is_in_playback_.exchange(true)) { RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); return false; +======= + { + rcpputils::unique_lock is_in_playback_lk(is_in_playback_mutex_); + if (is_in_playback_.exchange(true)) { + RCLCPP_WARN_STREAM( + owner_->get_logger(), + "Trying to play() while in playback, dismissing request."); + return false; + } +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) } stop_playback_ = false; @@ -221,6 +447,7 @@ bool Player::play() RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp_); +<<<<<<< HEAD try { do { if (delay > rclcpp::Duration(0, 0)) { @@ -237,15 +464,56 @@ bool Player::play() storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); play_messages_from_queue(); +======= + // May need to join the previous thread if we are calling play() a second time + if (playback_thread_.joinable()) { + playback_thread_.join(); + } + playback_thread_ = std::thread( + [&, delay]() { + try { + do { + if (delay > rclcpp::Duration(0, 0)) { + RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns"); + std::chrono::nanoseconds delay_duration(delay.nanoseconds()); + std::this_thread::sleep_for(delay_duration); + } + { + std::lock_guard lk(reader_mutex_); + reader_->seek(starting_time_); + clock_->jump(starting_time_); + } + load_storage_content_ = true; + storage_loading_future_ = std::async( + std::launch::async, [this]() { + load_storage_content(); + }); + wait_for_filled_queue(); + play_messages_from_queue(); + + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue + { + std::lock_guard lk(ready_to_play_from_queue_mutex_); + is_ready_to_play_from_queue_ = false; + ready_to_play_from_queue_cv_.notify_all(); + } + } while (rclcpp::ok() && !stop_playback_ && play_options_.loop); + } catch (std::runtime_error & e) { + RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what()); + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue + } +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) - load_storage_content_ = false; - if (storage_loading_future_.valid()) {storage_loading_future_.get();} - while (message_queue_.pop()) {} // cleanup queue { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; ready_to_play_from_queue_cv_.notify_all(); } +<<<<<<< HEAD } while (rclcpp::ok() && !stop_playback_ && play_options_.loop); } catch (std::runtime_error & e) { RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); @@ -278,17 +546,63 @@ bool Player::play() "topic %s : %s", pub.first.c_str(), e.what()); +======= + + // Wait for all published messages to be acknowledged. + if (play_options_.wait_acked_timeout >= 0) { + std::chrono::milliseconds timeout(play_options_.wait_acked_timeout); + if (timeout == std::chrono::milliseconds(0)) { + timeout = std::chrono::milliseconds(-1); + } + for (const auto & pub : publishers_) { + try { + if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) { + RCLCPP_ERROR( + owner_->get_logger(), + "Timed out while waiting for all published messages to be acknowledged " + "for topic %s", pub.first.c_str()); + } + } catch (std::exception & e) { + RCLCPP_ERROR( + owner_->get_logger(), + "Exception occurred while waiting for all published messages to be acknowledged for " + "topic %s : %s", pub.first.c_str(), e.what()); + } + } +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) } - } - } - is_in_playback_ = false; + { + rcpputils::unique_lock is_in_playback_lk(is_in_playback_mutex_); + is_in_playback_ = false; + playback_finished_cv_.notify_all(); + } + }); return true; } +<<<<<<< HEAD void Player::stop() +======= +bool PlayerImpl::wait_for_playback_to_finish(std::chrono::duration timeout) +{ + rcpputils::unique_lock is_in_playback_lk(is_in_playback_mutex_); + if (timeout.count() < 0) { + playback_finished_cv_.wait(is_in_playback_lk, [this] {return !is_in_playback_.load();}); + return true; + } else { + return playback_finished_cv_.wait_for( + is_in_playback_lk, + timeout, [this] {return !is_in_playback_.load();}); + } +} + +void PlayerImpl::stop() +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) { + rcpputils::unique_lock is_in_playback_lk(is_in_playback_mutex_); if (!is_in_playback_) { +<<<<<<< HEAD return; } RCLCPP_INFO_STREAM(get_logger(), "Stopping playback."); @@ -301,13 +615,37 @@ void Player::stop() skip_message_in_main_play_loop_ = true; cancel_wait_for_next_message_ = true; } +======= + if (playback_thread_.joinable()) { + playback_thread_.join(); + } + } else { + RCLCPP_INFO_STREAM(owner_->get_logger(), "Stopping playback."); + stop_playback_ = true; + // Temporary stop playback in play_messages_from_queue() and block play_next() and seek() or + // wait until those operations will be finished with stop_playback_ = true; + { + std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); + // resume playback if it was in pause and waiting on clock in play_messages_from_queue() + skip_message_in_main_play_loop_ = true; + cancel_wait_for_next_message_ = true; + } +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) - if (clock_->is_paused()) { - clock_->resume(); // Temporary resume clock to force wakeup in clock_->sleep_until(time) - clock_->pause(); // Return in pause mode to preserve original state of the player + if (clock_->is_paused()) { + clock_->resume(); // Temporary resume clock to force wakeup in clock_->sleep_until(time) + clock_->pause(); // Return in pause mode to preserve original state of the player + } + // Note: Don't clean up message queue here. It will be cleaned up automatically in + // playback thread after finishing play_messages_from_queue(); + + // Wait for playback thread to finish. Make sure that we have unlocked + // is_in_playback_mutex_, otherwise playback_thread_ will wait forever at the end + is_in_playback_lk.unlock(); + if (playback_thread_.joinable()) { + playback_thread_.join(); + } } - // Note: Don't clean up message queue here. It will be cleaned up automatically in - // Player::play() after finishing play_messages_from_queue(); } void Player::pause() @@ -530,7 +868,7 @@ void Player::load_storage_content() auto queue_upper_boundary = play_options_.read_ahead_queue_size; while (rclcpp::ok() && load_storage_content_ && !stop_playback_) { - TSAUniqueLock lk(reader_mutex_); + rcpputils::unique_lock lk(reader_mutex_); if (!reader_->has_next()) {break;} if (message_queue_.size_approx() < queue_lower_boundary) { @@ -936,4 +1274,165 @@ void Player::publish_clock_update(const rclcpp::Time & time) } } +<<<<<<< HEAD +======= +/////////////////////////////// +// Player public interface + +Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options) +: rclcpp::Node(node_name, node_options) +{ + // TODO(karsten1987): Use this constructor later with parameter parsing. + // The reader, storage_options as well as play_options can be loaded via parameter. + // That way, the player can be used as a simple component in a component manager. + throw rclcpp::exceptions::UnimplementedError(); +} + +Player::Player( + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player(std::make_unique(), + storage_options, play_options, node_name, node_options) +{} + +Player::Player( + std::unique_ptr reader, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player(std::move(reader), + // only call KeyboardHandler when using default keyboard handler implementation +#ifndef _WIN32 + std::make_shared(false), +#else + // We don't have signal handler option in constructor for windows version + std::shared_ptr(new KeyboardHandler()), +#endif + storage_options, play_options, node_name, node_options) +{} + +Player::Player( + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), + pimpl_(std::make_unique( + this, std::move(reader), keyboard_handler, + storage_options, play_options)) +{} + +Player::~Player() = default; + + +bool Player::play() +{ + return pimpl_->play(); +} + +bool Player::wait_for_playback_to_finish(std::chrono::duration timeout) +{ + return pimpl_->wait_for_playback_to_finish(timeout); +} + +void Player::stop() +{ + pimpl_->stop(); +} + +void Player::pause() +{ + pimpl_->pause(); +} + +void Player::resume() +{ + pimpl_->resume(); +} + +void Player::toggle_paused() +{ + pimpl_->toggle_paused(); +} + +bool Player::is_paused() const +{ + return pimpl_->is_paused(); +} + +double Player::get_rate() const +{ + return pimpl_->get_rate(); +} + +bool Player::set_rate(double rate) +{ + return pimpl_->set_rate(rate); +} + +bool Player::play_next() +{ + return pimpl_->play_next(); +} + +size_t Player::burst(const size_t num_messages) +{ + return pimpl_->burst(num_messages); +} + +void Player::seek(rcutils_time_point_value_t time_point) +{ + pimpl_->seek(time_point); +} + +Player::callback_handle_t Player::add_on_play_message_pre_callback( + const play_msg_callback_t & callback) +{ + return pimpl_->add_on_play_message_pre_callback(callback); +} + +Player::callback_handle_t Player::add_on_play_message_post_callback( + const play_msg_callback_t & callback) +{ + return pimpl_->add_on_play_message_post_callback(callback); +} + +void Player::delete_on_play_message_callback(const Player::callback_handle_t & handle) +{ + pimpl_->delete_on_play_message_callback(handle); +} + +std::unordered_map> Player::get_publishers() +{ + return pimpl_->get_publishers(); +} + +rclcpp::Publisher::SharedPtr Player::get_clock_publisher() +{ + return pimpl_->get_clock_publisher(); +} + +void Player::wait_for_playback_to_start() +{ + pimpl_->wait_for_playback_to_start(); +} + +size_t Player::get_number_of_registered_on_play_msg_pre_callbacks() +{ + return pimpl_->get_number_of_registered_on_play_msg_pre_callbacks(); +} + +size_t Player::get_number_of_registered_on_play_msg_post_callbacks() +{ + return pimpl_->get_number_of_registered_on_play_msg_post_callbacks(); +} + +>>>>>>> 1a52da8 (Make `rosbag2_transport::Player::play()` run in a separate thread (#1503)) } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_duration_until_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_duration_until_fixture.hpp index 20b969da7f..2ac666aa1e 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_duration_until_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_duration_until_fixture.hpp @@ -177,6 +177,7 @@ class RosBag2PlayDurationAndUntilTestFixture : public RosBag2PlayTestFixture auto await_received_messages = test_fixture_->sub_->spin_subscriptions(); ASSERT_TRUE(test_fixture_->player_->play()); + test_fixture_->player_->wait_for_playback_to_finish(); await_received_messages.get(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp index 3e666b56ad..41628a004c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -83,7 +83,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursts_requested_messages_without_delays) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); player->wait_for_playback_to_start(); const size_t NUM_MESSAGES_TO_BURST = 4; @@ -97,7 +97,8 @@ TEST_F(RosBag2PlayTestFixture, burst_bursts_requested_messages_without_delays) { ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); + await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -136,7 +137,7 @@ TEST_F(RosBag2PlayTestFixture, burst_stops_at_end_of_file) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); player->wait_for_playback_to_start(); ASSERT_TRUE(player->is_paused()); @@ -144,7 +145,7 @@ TEST_F(RosBag2PlayTestFixture, burst_stops_at_end_of_file) { ASSERT_EQ(played_messages, messages.size()); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -183,7 +184,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_one_by_one_messages_with_the_same_ player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_EQ(player->burst(1), 1u); @@ -197,7 +198,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_one_by_one_messages_with_the_same_ ASSERT_EQ(played_messages, messages.size()); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -236,7 +237,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_burst) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); const size_t EXPECTED_BURST_COUNT = 2; ASSERT_TRUE(player->is_paused()); @@ -244,7 +245,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_burst) { ASSERT_TRUE(player->is_paused()); player->resume(); auto start = std::chrono::steady_clock::now(); - player_future.get(); + player->wait_for_playback_to_finish(); auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = @@ -290,13 +291,13 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_burst) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_EQ(player->burst(1), 1u); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -346,7 +347,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_topics) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); const size_t EXPECTED_BURST_COUNT = 3; @@ -354,7 +355,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_topics) { ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 8e91d01763..df7f20b948 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -160,8 +160,8 @@ TEST_F(RosBag2PlayTestFixture, test_keyboard_controls) keyboard_handler->simulate_key_press(play_options_.increase_rate_key); keyboard_handler->simulate_key_press(play_options_.decrease_rate_key); - // start play thread - std::thread player_thread = std::thread([player]() {player->play();}); + // start playback asynchronously in a separate thread + player->play(); // play next keyboard_handler->simulate_key_press(play_options_.play_next_key); @@ -170,9 +170,7 @@ TEST_F(RosBag2PlayTestFixture, test_keyboard_controls) keyboard_handler->simulate_key_press(play_options_.pause_resume_toggle_key); EXPECT_THAT(player->is_paused(), false); - if (player_thread.joinable()) { - player_thread.join(); - } + player->stop(); EXPECT_THAT(player->num_paused, 1); EXPECT_THAT(player->num_resumed, 1); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 7b33abbcdc..bab7f7989c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -77,10 +77,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( @@ -148,10 +147,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_with_ auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( @@ -220,10 +218,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -253,10 +250,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -286,10 +282,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -343,10 +338,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( @@ -373,10 +367,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( @@ -404,10 +397,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( @@ -439,12 +431,12 @@ TEST_F(RosBag2PlayTestFixture, player_gracefully_exit_by_rclcpp_shutdown_in_paus auto player = std::make_shared(std::move(reader), storage_options_, play_options_); player->pause(); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); player->wait_for_playback_to_start(); ASSERT_TRUE(player->is_paused()); rclcpp::shutdown(); - player_future.get(); + player->wait_for_playback_to_finish(); } class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture @@ -488,9 +480,9 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); player->play(); + player->wait_for_playback_to_finish(); const auto result = await_received_messages.wait_for(timeout); // Must EXPECT, can't ASSERT because transport needs to be shutdown if timed out if (expect_timeout) { @@ -682,7 +674,7 @@ TEST_F(RosBag2PlayTestFixture, read_split_callback_is_called) auto await_received_messages = sub_->spin_subscriptions(); player->play(); - + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_test_primitives = sub_->get_received_messages( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_callbacks.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_callbacks.cpp index 1be6eaf97e..77e6eefd74 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_callbacks.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_callbacks.cpp @@ -110,18 +110,15 @@ TEST_F(Rosbag2PlayCallbacksTestFixture, register_unregister_callbacks) { player.pause(); // Put player in pause mode before starting ASSERT_TRUE(player.is_paused()); - // Run play asynchronously in separate thread - std::future play_future_result = - std::async(std::launch::async, [&]() {player.play();}); + // Run playback asynchronously in a separate thread + player.play(); for (size_t i = 1; i < num_test_messages_; i++) { EXPECT_TRUE(player.play_next()); } player.resume(); // Resume playback for playing the last message ASSERT_FALSE(player.is_paused()); - - play_future_result.wait(); - play_future_result.get(); + player.wait_for_playback_to_finish(); EXPECT_FALSE(player.play_next()); } @@ -148,9 +145,7 @@ TEST_F(Rosbag2PlayCallbacksTestFixture, call_callbacks) { player.pause(); // Put player in pause mode before starting ASSERT_TRUE(player.is_paused()); - // Run play asynchronously in separate thread - std::future play_future_result = - std::async(std::launch::async, [&]() {player.play();}); + player.play(); for (size_t i = 1; i < num_test_messages_; i++) { EXPECT_TRUE(player.play_next()); @@ -158,8 +153,6 @@ TEST_F(Rosbag2PlayCallbacksTestFixture, call_callbacks) { player.resume(); // Resume playback for playing the last message ASSERT_FALSE(player.is_paused()); - - play_future_result.wait(); - play_future_result.get(); + player.wait_for_playback_to_finish(); EXPECT_FALSE(player.play_next()); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_duration.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_duration.cpp index 5cad8afdcb..ae63d1b6bc 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_duration.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_duration.cpp @@ -95,6 +95,7 @@ TEST_F(RosBag2PlayDurationTestFixture, play_for_none_are_played_due_to_duration) ASSERT_NE(post_callback_handle, Player::invalid_callback_handle); ASSERT_TRUE(player->play()); + player->wait_for_playback_to_finish(); } TEST_F(RosBag2PlayDurationTestFixture, play_for_less_than_the_total_duration) @@ -128,16 +129,18 @@ TEST_F(RosBag2PlayDurationTestFixture, play_for_less_than_the_total_duration) ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); auto await_received_messages = sub_->spin_subscriptions(); - ASSERT_TRUE(player_->play()); + player_->play(); + player_->wait_for_playback_to_finish(); + // Playing one more time with play_next to save time and count messages player_->pause(); - auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + player_->play(); ASSERT_TRUE(player_->play_next()); ASSERT_FALSE(player_->play_next()); player_->resume(); - player_future.get(); + player_->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(2)); @@ -198,13 +201,13 @@ TEST_F(RosBag2PlayDurationTestFixture, play_should_return_false_when_interrupted auto await_received_messages = sub_->spin_subscriptions(); player_->pause(); - auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->play(); player_->wait_for_playback_to_start(); ASSERT_TRUE(player_->is_paused()); ASSERT_FALSE(player_->play()); player_->resume(); - player_future.get(); + player_->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(1)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index c35ce0e1b6..786412cb01 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -78,8 +78,10 @@ TEST_F(RosBag2PlayTestFixture, play_bag_file_twice) { auto loop_thread = std::async( std::launch::async, [&player]() { player->play(); + player->wait_for_playback_to_finish(); // play again the same bag file player->play(); + player->wait_for_playback_to_finish(); }); await_received_messages.get(); @@ -136,11 +138,9 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto player = std::make_shared( std::move( reader), storage_options_, play_options); - std::thread loop_thread(&rosbag2_transport::Player::play, player); - + player->play(); await_received_messages.get(); rclcpp::shutdown(); - loop_thread.join(); auto replayed_test_primitives = sub_->get_received_messages( "/loop_test_topic"); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp index 8e47e2d05e..7fb30836bb 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -83,7 +83,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); player->wait_for_playback_to_start(); ASSERT_TRUE(player->is_paused()); @@ -98,7 +98,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2))); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -137,7 +137,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_TRUE(player->play_next()); @@ -151,7 +151,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa ASSERT_EQ(played_messages, messages.size()); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -190,7 +190,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_TRUE(player->play_next()); @@ -198,7 +198,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { ASSERT_TRUE(player->is_paused()); player->resume(); auto start = std::chrono::steady_clock::now(); - player_future.get(); + player->wait_for_playback_to_finish(); auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = @@ -244,13 +244,13 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_TRUE(player->play_next()); ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -300,7 +300,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); ASSERT_TRUE(player->is_paused()); ASSERT_TRUE(player->play_next()); @@ -313,7 +313,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) { ASSERT_TRUE(player->is_paused()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp index 4299d69137..662b45911f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp @@ -77,9 +77,9 @@ class ClockPublishFixture : public RosBag2PlayTestFixture auto await_received_messages = sub_->spin_subscriptions(); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); + player->wait_for_playback_to_finish(); - player_future.get(); await_received_messages.get(); exec.cancel(); spin_thread.join(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp index de3b94b893..a4a18d622a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp @@ -115,7 +115,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); EXPECT_TRUE(player->is_paused()); EXPECT_TRUE(player->play_next()); @@ -130,7 +130,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time) { EXPECT_TRUE(player->play_next()); EXPECT_TRUE(player->play_next()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -162,7 +162,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_messag player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); EXPECT_TRUE(player->is_paused()); EXPECT_TRUE(player->play_next()); @@ -173,7 +173,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_messag // shouldn't be able to keep playing since we're at end of bag EXPECT_FALSE(player->play_next()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -196,7 +196,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); EXPECT_TRUE(player->is_paused()); EXPECT_TRUE(player->play_next()); @@ -205,7 +205,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward) { player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); EXPECT_TRUE(player->play_next()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -234,7 +234,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); EXPECT_TRUE(player->is_paused()); // Play all messages from bag @@ -247,7 +247,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) { player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); EXPECT_TRUE(player->play_next()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -280,7 +280,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) { player->pause(); ASSERT_TRUE(player->is_paused()); - auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->play(); EXPECT_TRUE(player->is_paused()); // Play all messages from bag @@ -293,7 +293,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) { player->seek((start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1); EXPECT_FALSE(player->play_next()); player->resume(); - player_future.get(); + player->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp index 20c032782f..111a09e431 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp @@ -68,7 +68,6 @@ class PlaySrvsTest : public RosBag2PlayTestFixture exec_.cancel(); rclcpp::shutdown(); if (spin_thread_.joinable()) {spin_thread_.join();} - if (play_thread_.joinable()) {play_thread_.join();} } /// Use SetUp instead of ctor because we want to ASSERT some preconditions for the tests @@ -211,10 +210,7 @@ class PlaySrvsTest : public RosBag2PlayTestFixture std::move(reader), storage_options, play_options, player_name_); player_->pause(); // Start playing in pause mode. Require for play_next test. For all other // tests we will resume playback via explicit call to start_playback(). - play_thread_ = std::thread( - [this]() { - player_->play(); - }); + player_->play(); } void topic_callback(std::shared_ptr/* msg */) @@ -254,7 +250,6 @@ class PlaySrvsTest : public RosBag2PlayTestFixture // Orchestration std::thread spin_thread_; - std::thread play_thread_; rclcpp::executors::SingleThreadedExecutor exec_; std::shared_ptr player_; @@ -418,8 +413,8 @@ TEST_F(PlaySrvsTest, stop_in_pause) { // Make sure that player reached out main play loop player_->wait_for_playback_to_start(); service_call_stop(); - // play_thread_ shall successfully finish after "Stop" without rclcpp::shutdown() - if (play_thread_.joinable()) {play_thread_.join();} + // playback shall successfully finish after "Stop" without rclcpp::shutdown() + player_->wait_for_playback_to_finish(); expect_messages(false); } @@ -448,7 +443,7 @@ TEST_F(PlaySrvsTest, stop_in_active_play) { // Wait until first message is going to be published in active playback mode ASSERT_TRUE(cv.wait_for(lk, 2s, [&] {return calls == 1;})); service_call_stop(); - // play_thread_ shall successfully finish after "Stop" without rclcpp::shutdown() - if (play_thread_.joinable()) {play_thread_.join();} + // playback shall successfully finish after "Stop" without rclcpp::shutdown() + player_->wait_for_playback_to_finish(); ASSERT_EQ(calls, 1); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp index 9a8271a437..f388b4edd2 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp @@ -86,6 +86,7 @@ TEST_F(PlayerTestFixture, playing_respects_relative_timing_of_stored_messages) // messages auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); } @@ -98,6 +99,7 @@ TEST_F(PlayerTestFixture, playing_rate_2x) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference * 0.5)); @@ -112,6 +114,7 @@ TEST_F(PlayerTestFixture, playing_rate_1x) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); } @@ -124,6 +127,7 @@ TEST_F(PlayerTestFixture, playing_rate_halfx) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference * 2.0)); } @@ -137,6 +141,7 @@ TEST_F(PlayerTestFixture, playing_rate_zero) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); } @@ -150,6 +155,7 @@ TEST_F(PlayerTestFixture, playing_rate_negative) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); } @@ -167,6 +173,7 @@ TEST_F(PlayerTestFixture, playing_respects_delay) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; EXPECT_THAT(replay_time, Gt(lower_expected_duration)); @@ -185,6 +192,7 @@ TEST_F(PlayerTestFixture, play_ignores_invalid_delay) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_time = clock.now() - start; EXPECT_THAT(replay_time, Gt(lower_expected_duration)); @@ -206,6 +214,7 @@ TEST_F(PlayerTestFixture, play_respects_start_offset) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_duration = clock.now() - start; EXPECT_THAT(replay_duration, Gt(lower_expected_duration)); @@ -225,6 +234,7 @@ TEST_F(PlayerTestFixture, play_ignores_invalid_start_offset) auto start = clock.now(); player->play(); + player->wait_for_playback_to_finish(); auto replay_duration = clock.now() - start; EXPECT_THAT(replay_duration, Gt(lower_expected_duration)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp index c1cc64a523..e52f4dd2c1 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp @@ -64,9 +64,10 @@ TEST_F(RosBag2PlayTestFixture, recorded_message_is_played_on_remapped_topic) { auto await_received_messages = sub_->spin_subscriptions(); auto player = std::make_shared( - std::move( - reader), storage_options_, play_options_); - player->play(); + std::move(reader), storage_options_, play_options_); + + ASSERT_TRUE(player->play()); + player->wait_for_playback_to_finish(); await_received_messages.get(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 89aac9e6c4..edf3e90c50 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -102,6 +102,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) ASSERT_NE(post_callback_handle, Player::invalid_callback_handle); ASSERT_TRUE(player->play()); + player->wait_for_playback_to_finish(); } TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) @@ -136,15 +137,16 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); + player_->wait_for_playback_to_finish(); // Playing one more time with play_next() to save time and count messages. player_->pause(); - auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); - + player_->play(); ASSERT_TRUE(player_->play_next()); ASSERT_FALSE(player_->play_next()); player_->resume(); - player_future.get(); + player_->wait_for_playback_to_finish(); + await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(2)); @@ -206,13 +208,13 @@ TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) auto await_received_messages = sub_->spin_subscriptions(); player_->pause(); - auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->play(); player_->wait_for_playback_to_start(); ASSERT_TRUE(player_->is_paused()); ASSERT_FALSE(player_->play()); player_->resume(); - player_future.get(); + player_->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(1)); @@ -292,6 +294,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_is_equal_to_the_total_duration) auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); + player_->wait_for_playback_to_finish(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); diff --git a/rosbag2_transport/test/rosbag2_transport/test_player_stop.cpp b/rosbag2_transport/test/rosbag2_transport/test_player_stop.cpp index 19ba036423..eb127aa293 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_player_stop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_player_stop.cpp @@ -77,12 +77,10 @@ TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_in_pause_mode_explicit) { player.pause(); ASSERT_TRUE(player.is_paused()); - auto play_future_result = - std::async(std::launch::async, [&] {player.play();}); - + player.play(); EXPECT_TRUE(player.play_next()); player.stop(); - ASSERT_EQ(play_future_result.wait_for(1s), std::future_status::ready); + ASSERT_TRUE(player.wait_for_playback_to_finish(1s)); } TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_in_pause_mode_implicit) { @@ -92,9 +90,9 @@ TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_in_pause_mode_implicit) { player.pause(); ASSERT_TRUE(player.is_paused()); + player.play(); play_future_result = - std::async(std::launch::async, [&] {player.play();}); - + std::async(std::launch::async, [&] {player.wait_for_playback_to_finish();}); EXPECT_TRUE(player.play_next()); } ASSERT_EQ(play_future_result.wait_for(1s), std::future_status::ready); @@ -119,7 +117,7 @@ TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_explicit) { player.pause(); ASSERT_TRUE(player.is_paused()); - auto play_future_result = std::async(std::launch::async, [&] {player.play();}); + player.play(); player.wait_for_playback_to_start(); ASSERT_TRUE(player.is_paused()); { @@ -130,7 +128,7 @@ TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_explicit) { lk.unlock(); player.stop(); } - ASSERT_EQ(play_future_result.wait_for(1s), std::future_status::ready); + ASSERT_TRUE(player.wait_for_playback_to_finish(1s)); ASSERT_EQ(calls, 1); } @@ -155,8 +153,10 @@ TEST_F(Rosbag2PlayerStopTestFixture, stop_playback_implict) { player.pause(); ASSERT_TRUE(player.is_paused()); - play_future_result = std::async(std::launch::async, [&] {player.play();}); + player.play(); player.wait_for_playback_to_start(); + play_future_result = + std::async(std::launch::async, [&] {player.wait_for_playback_to_finish();}); ASSERT_TRUE(player.is_paused()); std::unique_lock lk{m};