From b43ca8c31a3d535dad7ba816e580f0a860023586 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 8 Nov 2024 16:17:15 -0800 Subject: [PATCH] Address more minor feedback Signed-off-by: Christophe Bedard --- rosbag2_py/src/rosbag2_py/_transport.cpp | 7 +-- .../src/rosbag2_transport/player.cpp | 47 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 42aabab52..b50f6a47f 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -254,9 +254,10 @@ class Player { install_signal_handlers(); try { - std::vector readers{}; + std::vector readers_with_options{}; for (const auto & options : storage_options) { - readers.emplace_back(rosbag2_transport::ReaderWriterFactory::make_reader(options), options); + readers_with_options.emplace_back( + rosbag2_transport::ReaderWriterFactory::make_reader(options), options); } std::shared_ptr keyboard_handler; if (!play_options.disable_keyboard_controls) { @@ -270,7 +271,7 @@ class Player #endif } auto player = std::make_shared( - std::move(readers), std::move(keyboard_handler), play_options); + std::move(readers_with_options), std::move(keyboard_handler), play_options); rclcpp::executors::SingleThreadedExecutor exec; exec.add_node(player); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 9b613c207..a2e5b452a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -283,9 +283,7 @@ class PlayerImpl rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); void load_storage_content(); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary( - const size_t boundary, - const size_t bag_index) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void enqueue_up_to_boundary(const size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; void play_messages_from_queue(); void prepare_publishers(); @@ -904,7 +902,8 @@ Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle() void PlayerImpl::wait_for_filled_queue() const { - const auto read_ahead_queues_filled = [this]() -> bool { + const auto read_ahead_queues_filled = + [this]() { for (std::size_t i = 0; i < input_bags_.size(); i++) { if (message_queue_.size_approx(i) < play_options_.read_ahead_queue_size) { return false; @@ -922,9 +921,10 @@ void PlayerImpl::wait_for_filled_queue() const void PlayerImpl::load_storage_content() { - auto queue_lower_boundary = - static_cast(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); - auto queue_upper_boundary = play_options_.read_ahead_queue_size; + // Multiply the boundary parameters by the number of input bags + auto queue_lower_boundary = static_cast( + input_bags_.size() * play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); + auto queue_upper_boundary = input_bags_.size() * play_options_.read_ahead_queue_size; while (rclcpp::ok() && load_storage_content_ && !stop_playback_) { rcpputils::unique_lock lk(reader_mutex_); @@ -935,30 +935,28 @@ void PlayerImpl::load_storage_content() if (no_messages) { break; } - for (size_t bag_index = 0; bag_index < input_bags_.size(); bag_index++) { - if (!input_bags_[bag_index].first->has_next()) { - continue; - } - - if (message_queue_.size_approx(bag_index) < queue_lower_boundary) { - enqueue_up_to_boundary(queue_upper_boundary, bag_index); - } else { - lk.unlock(); - } + if (message_queue_.size_approx() < queue_lower_boundary) { + enqueue_up_to_boundary(queue_upper_boundary); + } else { + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } -void PlayerImpl::enqueue_up_to_boundary(const size_t boundary, const size_t bag_index) +void PlayerImpl::enqueue_up_to_boundary(const size_t boundary) { rosbag2_storage::SerializedBagMessageSharedPtr message; - for (size_t i = message_queue_.size_approx(bag_index); i < boundary; i++) { - if (!input_bags_[bag_index].first->has_next()) { - break; + // Read messages from input bags in a round robin way + size_t input_bag_index = 0u; + for (size_t i = message_queue_.size_approx(); i < boundary; i++) { + const auto & reader = input_bags_[input_bag_index].first; + // We are supposed to have at least one bag with messages to read + if (reader->has_next()) { + message = reader->read_next(); + message_queue_.enqueue(message, input_bag_index); } - message = input_bags_[bag_index].first->read_next(); - message_queue_.enqueue(message, bag_index); + input_bag_index = (input_bag_index + 1) % input_bags_.size(); } } @@ -1132,6 +1130,7 @@ void PlayerImpl::prepare_publishers() } // Create topic publishers + // We could have duplicate topic names here, but we correctly handle it when creating publishers std::vector topics{}; for (const auto & [reader, _] : input_bags_) { auto bag_topics = reader->get_all_topics_and_types();