diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 368648f6a..c552befd1 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -482,6 +482,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com } } +TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression) +{ + tmp_dir_storage_options_.max_bagfile_size = 0; + tmp_dir_storage_options_.max_cache_size = 200; + tmp_dir_storage_options_.snapshot_mode = true; + + initializeFakeFileStorage(); + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(1); + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 1, + 1, + kDefaultCompressionQueueThreadsPriority + }; + initializeWriter(compression_options); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length); + + writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (size_t i = 0; i < 100; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + writer_->close(); + + EXPECT_THAT(closed_files.size(), 2); + EXPECT_THAT(opened_files.size(), 2); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 2); + ASSERT_EQ(closed_files.size(), 2); + + for (size_t i = 0; i < 2; i++) { + auto expected_closed = fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor); + auto expected_opened = (i == 1) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i + 1)); + ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str()); + } +} + INSTANTIATE_TEST_SUITE_P( SequentialCompressionWriterTestQueueSizes, SequentialCompressionWriterTest, diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 812e21704..4dcb0cea7 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -81,6 +81,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type bool Writer::take_snapshot() { + std::lock_guard writer_lock(writer_mutex_); return writer_impl_->take_snapshot(); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index cdbcc5b77..85f5c51a0 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -76,11 +76,13 @@ void SequentialWriter::init_metadata() metadata_.storage_identifier = storage_->get_storage_identifier(); metadata_.starting_time = std::chrono::time_point( std::chrono::nanoseconds::max()); + metadata_.duration = std::chrono::nanoseconds(0); metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())}; rosbag2_storage::FileInformation file_info{}; file_info.path = strip_parent_path(storage_->get_relative_file_path()); file_info.starting_time = std::chrono::time_point( std::chrono::nanoseconds::max()); + file_info.duration = std::chrono::nanoseconds(0); file_info.message_count = 0; metadata_.files = {file_info}; } @@ -353,7 +355,7 @@ void SequentialWriter::write(std::shared_ptrnotify_data_ready(); + split_bagfile(); return true; } @@ -454,6 +459,17 @@ void SequentialWriter::write_messages( return; } storage_->write(messages); + if (storage_options_.snapshot_mode) { + // Update FileInformation about the last file in metadata in case of snapshot mode + const auto first_msg_timestamp = std::chrono::time_point( + std::chrono::nanoseconds(messages.front()->recv_timestamp)); + const auto last_msg_timestamp = std::chrono::time_point( + std::chrono::nanoseconds(messages.back()->recv_timestamp)); + metadata_.files.back().starting_time = first_msg_timestamp; + metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp; + metadata_.files.back().message_count = messages.size(); + } + metadata_.message_count += messages.size(); std::lock_guard lock(topics_info_mutex_); for (const auto & msg : messages) { if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) { diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 6d64b1fd3..51f5da10d 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -511,6 +511,206 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } +TEST_F(SequentialWriterTest, snapshot_writes_to_new_file_with_bag_split) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + const rcutils_time_point_value_t first_msg_timestamp = 100; + const size_t num_msgs_to_write = 100; + const std::string topic_name = "test_topic"; + std::string msg_content = "Hello"; + const size_t serialized_msg_buffer_length = msg_content.length(); + const size_t num_expected_msgs = storage_options_.max_cache_size / serialized_msg_buffer_length; + const size_t expected_start_time = first_msg_timestamp + (num_msgs_to_write - num_expected_msgs); + const auto expected_last_msg_timestamp = (first_msg_timestamp + num_msgs_to_write - 1); + const size_t expected_duration = expected_last_msg_timestamp - expected_start_time; + // Prepare vector of messages + std::vector messages; + for (size_t i = 0; i < num_msgs_to_write; i++) { + auto message = std::make_shared(); + message->recv_timestamp = first_msg_timestamp + static_cast(i); + message->send_timestamp = first_msg_timestamp + static_cast(i); + message->topic_name = topic_name; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length()); + messages.push_back(message); + } + + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(1); + + ON_CALL( + *storage_, + write(An>())).WillByDefault( + [this](std::shared_ptr) { + fake_storage_size_ += 1; + }); + + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_.load(); + }); + + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + fake_metadata_ = metadata; + }); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (const auto & message : messages) { + writer_->write(message); + } + writer_->take_snapshot(); + + EXPECT_THAT(closed_files.size(), 1); + EXPECT_THAT(opened_files.size(), 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 1); + ASSERT_EQ(closed_files.size(), 1); + + const auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); + const auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1"); + ASSERT_STREQ(closed_files[0].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[0].c_str(), expected_opened.generic_string().c_str()); + + // Check metadata + ASSERT_EQ(v_intercepted_update_metadata_.size(), 3u); + // The v_intercepted_update_metadata_[0] is the very first metadata saved from the writer's + // constructor. We don't update it during the snapshot, and it doesn't make sense checking it. + // The v_intercepted_update_metadata_[1] is the metadata written right before closing the file + // with the new snapshot. + // The v_intercepted_update_metadata_[2] is the metadata written when we are opening a new file + // after switching to a new storage. + EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, num_expected_msgs); + EXPECT_EQ(v_intercepted_update_metadata_[2].message_count, num_expected_msgs); + EXPECT_EQ( + std::chrono::time_point_cast( + v_intercepted_update_metadata_[1].starting_time).time_since_epoch().count(), + first_msg_timestamp); + + ASSERT_FALSE(v_intercepted_update_metadata_[1].files.empty()); + const auto & first_file_info = v_intercepted_update_metadata_[1].files[0]; + EXPECT_STREQ(first_file_info.path.c_str(), std::string(bag_base_dir_ + "_0").c_str()); + EXPECT_EQ(first_file_info.message_count, num_expected_msgs); + EXPECT_EQ( + std::chrono::time_point_cast( + first_file_info.starting_time).time_since_epoch().count(), + expected_start_time); + EXPECT_EQ(first_file_info.duration.count(), expected_duration); +} + +TEST_F(SequentialWriterTest, snapshot_can_be_called_twice) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + const size_t num_msgs_to_write = 100; + + // Expect to call write method twice. Once per each snapshot. + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(2); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + std::string msg_content = "Hello"; + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length()); + + for (size_t i = 0; i < num_msgs_to_write / 2; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + + for (size_t i = num_msgs_to_write / 2; i < num_msgs_to_write; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + + EXPECT_THAT(closed_files.size(), 2); + EXPECT_THAT(opened_files.size(), 2); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 2); + ASSERT_EQ(closed_files.size(), 2); + + for (size_t i = 0; i < opened_files.size(); i++) { + const auto expected_closed = fs::path(storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i)); + const auto expected_opened = fs::path(storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i + 1)); + ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str()); + } +} + TEST_F(SequentialWriterTest, split_event_calls_callback) { const uint64_t max_bagfile_size = 3;