diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 176e491aa..e8b78e97b 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -153,6 +153,98 @@ struct OptionsWrapper : public T typedef OptionsWrapper PlayOptions; typedef OptionsWrapper RecordOptions; +/// \brief Defers signals handling until class destruction or by an explicit call to the +/// process_deferred_signals(). +/// Can install an external signal handler, which will be called upon original signals arriving. +/// \note The install_signal_handlers(external_signal_handler) method shall be called to start +/// capturing deferred signals. +class DeferredSignalHandler +{ +public: + using SignalHandlerType = void (*)(int); + using SignalType = int; + + explicit DeferredSignalHandler(const std::initializer_list & signals) + { + if (sigemptyset(&DeferredSignalHandler::deferred_signals_set_) != 0) { + throw std::runtime_error(std::string("Error calling sigemptyset: ") + std::strerror(errno)); + } + for (auto signal : signals) { + signals_.push_back(signal); + } + } + + ~DeferredSignalHandler() noexcept + { + process_deferred_signals(); + uninstall_signal_handlers(); + } + + void install_signal_handlers(const SignalHandlerType & external_signal_handler) + { + external_signal_handler_ = external_signal_handler; + for (auto sig_num : signals_) { + old_signal_handlers_.emplace_back( + sig_num, std::signal(sig_num, &DeferredSignalHandler::on_signal)); + } + } + + void uninstall_signal_handlers() + { + external_signal_handler_ = SIG_ERR; + // Restore old signal handlers + for (auto [sig_num, old_signal_handler] : old_signal_handlers_) { + if (old_signal_handler != SIG_ERR) { + std::signal(sig_num, old_signal_handler); + old_signal_handler = SIG_ERR; + } + } + old_signal_handlers_.clear(); + } + + void process_deferred_signals() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, SignalType signal) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(signal); + } + }; + + for (auto [deferred_sig_num, old_signal_handler] : old_signal_handlers_) { + auto ret = sigismember(&deferred_signals_set_, deferred_sig_num); + if (ret == -1) { + throw std::runtime_error("Error. Invalid signal: " + std::to_string(deferred_sig_num)); + } else if (ret == 1 && old_signal_handler != SIG_ERR) { + call_signal_handler(old_signal_handler, deferred_sig_num); + sigdelset(&deferred_signals_set_, deferred_sig_num); + } + } + } + +protected: + static void on_signal(SignalType sig_num) + { + if (external_signal_handler_ != SIG_ERR) { + external_signal_handler_(sig_num); + } + if (sigaddset(&DeferredSignalHandler::deferred_signals_set_, sig_num) != 0) { + char error_msg[128]; + snprintf(error_msg, sizeof(error_msg), "Failed to add deferred signal = %d\n", sig_num); + // write is signal-safe + auto written = write(STDERR_FILENO, error_msg, sizeof(error_msg) - 1); + (void)written; + } + } + + static sigset_t deferred_signals_set_; + static SignalHandlerType external_signal_handler_; + std::vector> old_signal_handlers_; + std::vector signals_; +}; + +sigset_t DeferredSignalHandler::deferred_signals_set_{}; +DeferredSignalHandler::SignalHandlerType DeferredSignalHandler::external_signal_handler_{SIG_ERR}; + } // namespace namespace rosbag2_py @@ -161,8 +253,6 @@ namespace rosbag2_py class Player { public: - using SignalHandlerType = void (*)(int); - explicit Player(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -213,46 +303,6 @@ class Player } protected: - static void signal_handler(int sig_num) - { - if (sig_num == SIGINT || sig_num == SIGTERM) { - deferred_sig_number_ = sig_num; - rosbag2_py::Player::cancel(); - } - } - - static void install_signal_handlers() - { - deferred_sig_number_ = -1; - old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler); - old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler); - } - - static void uninstall_signal_handlers() - { - if (old_sigterm_handler_ != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler_); - } - if (old_sigint_handler_ != SIG_ERR) { - std::signal(SIGINT, old_sigint_handler_); - } - } - - static void process_deferred_signal() - { - auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { - if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { - signal_handler(sig_num); - } - }; - - if (deferred_sig_number_ == SIGINT) { - call_signal_handler(old_sigint_handler_, deferred_sig_number_); - } else if (deferred_sig_number_ == SIGTERM) { - call_signal_handler(old_sigterm_handler_, deferred_sig_number_); - } - } - void play_impl( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options, @@ -260,8 +310,14 @@ class Player size_t burst_num_messages = 0, bool enable_signal_handling = true) { + exit_.store(false); + DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM}); if (enable_signal_handling) { - install_signal_handlers(); + deferred_signal_handler.install_signal_handlers( + [](int /*signum*/) { + // Note: Using condition_variable from signal handler is not safe and shall not be used. + rosbag2_py::Player::exit_.store(true); + }); } try { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); @@ -289,8 +345,13 @@ class Player auto wait_for_exit_thread = std::thread( [&]() { - std::unique_lock lock(wait_for_exit_mutex_); - wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();}); + while (!exit_.load()) { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait_for( + lock, + std::chrono::milliseconds(30), + [] {return rosbag2_py::Player::exit_.load();}); + } player->stop(); }); { @@ -315,36 +376,29 @@ class Player exec.remove_node(player); } catch (...) { if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); + // Process deferred signals before re-throwing exception, because we may never return to + // the deferred_signal_handler destructor after exception. + deferred_signal_handler.process_deferred_signals(); + deferred_signal_handler.uninstall_signal_handlers(); } throw; } - if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); - } } static std::atomic_bool exit_; static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; - static SignalHandlerType old_sigint_handler_; - static SignalHandlerType old_sigterm_handler_; - static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; -Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; -Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; -volatile std::sig_atomic_t Player::deferred_sig_number_{-1}; std::atomic_bool Player::exit_{false}; +// exit_ shall be lock free to be able to use it from signal handler +static_assert(std::atomic_bool::is_always_lock_free); std::condition_variable Player::wait_for_exit_cv_{}; class Recorder { public: - using SignalHandlerType = void (*)(int); explicit Recorder(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -370,11 +424,16 @@ class Recorder std::string & node_name, bool enable_signal_handling) { + exit_.store(false); + DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM}); if (enable_signal_handling) { - install_signal_handlers(); + deferred_signal_handler.install_signal_handlers( + [](int /*signum*/) { + // Note: Using condition_variable from signal handler is not safe and shall not be used. + rosbag2_py::Recorder::exit_.store(true); + }); } try { - exit_ = false; auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); @@ -408,8 +467,13 @@ class Recorder // Release the GIL for long-running record, so that calling Python code // can use other threads py::gil_scoped_release release; - std::unique_lock lock(wait_for_exit_mutex_); - wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + while (!exit_.load()) { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait_for( + lock, + std::chrono::milliseconds(30), + [] {return rosbag2_py::Recorder::exit_.load();}); + } recorder->stop(); } exec->cancel(); @@ -419,15 +483,13 @@ class Recorder exec->remove_node(recorder); } catch (...) { if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); + // Process deferred signals before re-throwing exception, because we can never return to + // the deferred_signal_handler destructor after exception. + deferred_signal_handler.process_deferred_signals(); + deferred_signal_handler.uninstall_signal_handlers(); } throw; } - if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); - } } static void cancel() @@ -437,58 +499,12 @@ class Recorder } protected: - static void signal_handler(int sig_num) - { - if (sig_num == SIGINT || sig_num == SIGTERM) { - deferred_sig_number_ = sig_num; - rosbag2_py::Recorder::cancel(); - } - } - - static void install_signal_handlers() - { - deferred_sig_number_ = -1; - old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler); - old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler); - } - - static void uninstall_signal_handlers() - { - if (old_sigterm_handler_ != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler_); - } - if (old_sigint_handler_ != SIG_ERR) { - std::signal(SIGINT, old_sigint_handler_); - } - } - - static void process_deferred_signal() - { - auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { - if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { - signal_handler(sig_num); - } - }; - - if (deferred_sig_number_ == SIGINT) { - call_signal_handler(old_sigint_handler_, deferred_sig_number_); - } else if (deferred_sig_number_ == SIGTERM) { - call_signal_handler(old_sigterm_handler_, deferred_sig_number_); - } - } - static std::atomic_bool exit_; static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; - static SignalHandlerType old_sigint_handler_; - static SignalHandlerType old_sigterm_handler_; - static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; -Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; -Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; -volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{};