Skip to content

Commit

Permalink
Move common signal handling functionality to a separate class
Browse files Browse the repository at this point in the history
- Add DeferredSignalHandler class
- Note: Windows support will be added in a follow-up PR

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Aug 17, 2024
1 parent b43820e commit f626764
Showing 1 changed file with 130 additions and 114 deletions.
244 changes: 130 additions & 114 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,98 @@ struct OptionsWrapper : public T
typedef OptionsWrapper<rosbag2_transport::PlayOptions> PlayOptions;
typedef OptionsWrapper<rosbag2_transport::RecordOptions> RecordOptions;

/// \brief Defers signals handling until class destruction or by an explicit call to the
/// process_deferred_signals().
/// Can install an external signal handler, which will be called upon original signals arriving.
/// \note The install_signal_handlers(external_signal_handler) method shall be called to start
/// capturing deferred signals.
class DeferredSignalHandler
{
public:
using SignalHandlerType = void (*)(int);
using SignalType = int;

explicit DeferredSignalHandler(const std::initializer_list<SignalType> & signals)
{
if (sigemptyset(&DeferredSignalHandler::deferred_signals_set_) != 0) {
throw std::runtime_error(std::string("Error calling sigemptyset: ") + std::strerror(errno));
}
for (auto signal : signals) {
signals_.push_back(signal);
}
}

~DeferredSignalHandler() noexcept
{
process_deferred_signals();
uninstall_signal_handlers();
}

void install_signal_handlers(const SignalHandlerType & external_signal_handler)
{
external_signal_handler_ = external_signal_handler;
for (auto sig_num : signals_) {
old_signal_handlers_.emplace_back(
sig_num, std::signal(sig_num, &DeferredSignalHandler::on_signal));
}
}

void uninstall_signal_handlers()
{
external_signal_handler_ = SIG_ERR;
// Restore old signal handlers
for (auto [sig_num, old_signal_handler] : old_signal_handlers_) {
if (old_signal_handler != SIG_ERR) {
std::signal(sig_num, old_signal_handler);
old_signal_handler = SIG_ERR;
}
}
old_signal_handlers_.clear();
}

void process_deferred_signals()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, SignalType signal) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(signal);
}
};

for (auto [deferred_sig_num, old_signal_handler] : old_signal_handlers_) {
auto ret = sigismember(&deferred_signals_set_, deferred_sig_num);
if (ret == -1) {
throw std::runtime_error("Error. Invalid signal: " + std::to_string(deferred_sig_num));
} else if (ret == 1 && old_signal_handler != SIG_ERR) {
call_signal_handler(old_signal_handler, deferred_sig_num);
sigdelset(&deferred_signals_set_, deferred_sig_num);
}
}
}

protected:
static void on_signal(SignalType sig_num)
{
if (external_signal_handler_ != SIG_ERR) {
external_signal_handler_(sig_num);
}
if (sigaddset(&DeferredSignalHandler::deferred_signals_set_, sig_num) != 0) {
char error_msg[128];
snprintf(error_msg, sizeof(error_msg), "Failed to add deferred signal = %d\n", sig_num);
// write is signal-safe
auto written = write(STDERR_FILENO, error_msg, sizeof(error_msg) - 1);
(void)written;
}
}

static sigset_t deferred_signals_set_;
static SignalHandlerType external_signal_handler_;
std::vector<std::pair<SignalType, SignalHandlerType>> old_signal_handlers_;
std::vector<SignalType> signals_;
};

sigset_t DeferredSignalHandler::deferred_signals_set_{};
DeferredSignalHandler::SignalHandlerType DeferredSignalHandler::external_signal_handler_{SIG_ERR};

} // namespace

namespace rosbag2_py
Expand All @@ -161,8 +253,6 @@ namespace rosbag2_py
class Player
{
public:
using SignalHandlerType = void (*)(int);

explicit Player(const std::string & log_level = "info")
{
Arguments arguments({"--ros-args", "--log-level", log_level});
Expand Down Expand Up @@ -213,55 +303,21 @@ class Player
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Player::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
}
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

void play_impl(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
bool burst = false,
size_t burst_num_messages = 0,
bool enable_signal_handling = true)
{
exit_.store(false);
DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM});
if (enable_signal_handling) {
install_signal_handlers();
deferred_signal_handler.install_signal_handlers(
[](int /*signum*/) {
// Note: Using condition_variable from signal handler is not safe and shall not be used.
rosbag2_py::Player::exit_.store(true);
});
}
try {
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
Expand Down Expand Up @@ -289,8 +345,13 @@ class Player

auto wait_for_exit_thread = std::thread(
[&]() {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();});
while (!exit_.load()) {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait_for(
lock,
std::chrono::milliseconds(30),
[] {return rosbag2_py::Player::exit_.load();});
}
player->stop();
});
{
Expand All @@ -315,36 +376,29 @@ class Player
exec.remove_node(player);
} catch (...) {
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
// Process deferred signals before re-throwing exception, because we may never return to
// the deferred_signal_handler destructor after exception.
deferred_signal_handler.process_deferred_signals();
deferred_signal_handler.uninstall_signal_handlers();
}
throw;
}
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
}

static std::atomic_bool exit_;
static_assert(std::atomic_bool::is_always_lock_free);
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static volatile std::sig_atomic_t deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR};
Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR};
volatile std::sig_atomic_t Player::deferred_sig_number_{-1};
std::atomic_bool Player::exit_{false};
// exit_ shall be lock free to be able to use it from signal handler
static_assert(std::atomic_bool::is_always_lock_free);
std::condition_variable Player::wait_for_exit_cv_{};

class Recorder
{
public:
using SignalHandlerType = void (*)(int);
explicit Recorder(const std::string & log_level = "info")
{
Arguments arguments({"--ros-args", "--log-level", log_level});
Expand All @@ -370,11 +424,16 @@ class Recorder
std::string & node_name,
bool enable_signal_handling)
{
exit_.store(false);
DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM});
if (enable_signal_handling) {
install_signal_handlers();
deferred_signal_handler.install_signal_handlers(
[](int /*signum*/) {
// Note: Using condition_variable from signal handler is not safe and shall not be used.
rosbag2_py::Recorder::exit_.store(true);
});
}
try {
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
Expand Down Expand Up @@ -408,8 +467,13 @@ class Recorder
// Release the GIL for long-running record, so that calling Python code
// can use other threads
py::gil_scoped_release release;
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
while (!exit_.load()) {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait_for(
lock,
std::chrono::milliseconds(30),
[] {return rosbag2_py::Recorder::exit_.load();});
}
recorder->stop();
}
exec->cancel();
Expand All @@ -419,15 +483,13 @@ class Recorder
exec->remove_node(recorder);
} catch (...) {
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
// Process deferred signals before re-throwing exception, because we can never return to
// the deferred_signal_handler destructor after exception.
deferred_signal_handler.process_deferred_signals();
deferred_signal_handler.uninstall_signal_handlers();
}
throw;
}
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
}

static void cancel()
Expand All @@ -437,58 +499,12 @@ class Recorder
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Recorder::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
}
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

static std::atomic_bool exit_;
static_assert(std::atomic_bool::is_always_lock_free);
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static volatile std::sig_atomic_t deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR};
Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR};
volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1};
std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

Expand Down

0 comments on commit f626764

Please sign in to comment.