Skip to content

Commit

Permalink
[humble] Add BagSplitInfo service call on bag close (backport #1422) (#…
Browse files Browse the repository at this point in the history
…1637)

* Add BagSplitInfo service call on bag close (#1422)

- Note: The `BagSplitInfo::opened_file` will have empty string to
indicate that it was "bag close" and not bag split event.

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit ba199d0)

# Conflicts:
#	rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp

* Fix merge conflicts

- Ensure that writer_ is destructed before intercepted fake_metadata_

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
mergify[bot] and MichaelOrlov authored May 6, 2024
1 parent eafdaa0 commit e7f7f59
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
8 changes: 8 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class EventCallbackManager
return false;
}

/**
* \brief Delete all callbacks
*/
void delete_all_callbacks()
{
callbacks_.clear();
}

/**
* \brief Execute all callbacks registered for the given event.
*
Expand Down
12 changes: 11 additions & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ SequentialWriter::SequentialWriter(

SequentialWriter::~SequentialWriter()
{
// Deleting all callbacks before calling close(). Calling callbacks from destructor is not safe.
// Callbacks likely was created after SequentialWriter object and may point to the already
// destructed objects.
callback_manager_.delete_all_callbacks();
close();
}

Expand Down Expand Up @@ -161,7 +165,13 @@ void SequentialWriter::close()
metadata_io_->write_metadata(base_folder_, metadata_);
}

storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
// bag file was closed before callback call.
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}
storage_factory_.reset();
}

Expand Down
63 changes: 62 additions & 1 deletion rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ class SequentialWriterTest : public Test
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
std::unique_ptr<MockMetadataIo> metadata_io_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;

rosbag2_storage::StorageOptions storage_options_;
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
rosbag2_storage::BagMetadata fake_metadata_;
// Ensure writer_ is destructed before intercepted fake_metadata_
std::unique_ptr<rosbag2_cpp::Writer> writer_;
std::string fake_storage_uri_;
};

Expand Down Expand Up @@ -566,6 +568,65 @@ TEST_F(SequentialWriterTest, split_event_calls_callback)
EXPECT_EQ(opened_file, fake_storage_uri_);
}

TEST_F(SequentialWriterTest, split_event_calls_on_writer_close)
{
const int message_count = 7;

ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;

bool callback_called = false;
std::string closed_file, opened_file;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_file = info.closed_file;
opened_file = info.opened_file;
callback_called = true;
};
writer_->add_event_callbacks(callbacks);

writer_->open(storage_options_, {"rmw_format", "rmw_format"});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

ASSERT_TRUE(callback_called);
auto expected_closed = rcpputils::fs::path(storage_options_.uri) / (storage_options_.uri + "_0");
EXPECT_EQ(closed_file, expected_closed.string());
EXPECT_TRUE(opened_file.empty());
}

class ManualSplitWriter : public rosbag2_cpp::writers::SequentialWriter
{
Expand Down

0 comments on commit e7f7f59

Please sign in to comment.