From e45cc17d023fbcd49e9586c98e047b8a2cd61c30 Mon Sep 17 00:00:00 2001 From: detule Date: Tue, 28 May 2024 15:15:31 -0400 Subject: [PATCH] Feature/interruptible execution clean2 (#796) * odbc_result: re-org execution code-path * feature: interruptible execution * ci: mysql: a more up-to-date driver * back to exceptions from nanodbc; pretty printed after caught * code-review: feature controlled by global option odbc.interruptible * code-review: Note about mysql driver and NEWS entry * code-review: enable by default only in interactive sessions * fixup: unit tests are run with interruptible execution enabled --------- Co-authored-by: simonpcouch --- .github/odbc/install-mariadb-driver.sh | 11 ++++ .github/odbc/odbcinst.ini | 2 +- .github/workflows/db-windows.yml | 2 +- .github/workflows/db.yaml | 4 +- NEWS.md | 7 +++ R/RcppExports.R | 8 +-- R/dbi-driver.R | 10 +++- R/odbc-connection.R | 4 +- man/OdbcResult.Rd | 2 +- man/dbConnect-OdbcDriver-method.Rd | 9 ++- src/RcppExports.cpp | 22 ++----- src/connection.cpp | 6 +- src/nanodbc/nanodbc.cpp | 10 +--- src/nanodbc/nanodbc.h | 1 - src/odbc_connection.cpp | 11 ++-- src/odbc_connection.h | 4 +- src/odbc_result.cpp | 67 +++++++++++++-------- src/odbc_result.h | 12 +++- src/result.cpp | 3 - src/utils.cpp | 80 +++++++++++++++++++++++++- src/utils.h | 35 +++++++++++ tests/testthat/_snaps/utils.md | 5 +- 22 files changed, 236 insertions(+), 79 deletions(-) create mode 100755 .github/odbc/install-mariadb-driver.sh diff --git a/.github/odbc/install-mariadb-driver.sh b/.github/odbc/install-mariadb-driver.sh new file mode 100755 index 00000000..8b500302 --- /dev/null +++ b/.github/odbc/install-mariadb-driver.sh @@ -0,0 +1,11 @@ +# See #796 for some reasoning behind not using the canned mariadb-odbc-driver (v3.1.15), at time of writing +# and needing to install v3.1.17 from source +sudo apt-get install -y cmake +cd /tmp && git clone https://github.com/MariaDB/mariadb-connector-odbc.git connector +cd connector && git checkout 3.1.17 +mkdir build && cd build +cmake ../ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCONC_WITH_UNIT_TESTS=Off -DCMAKE_INSTALL_PREFIX=/usr/local -DWITH_SSL=OPENSSL +cmake --build . --config RelWithDebInfo +sudo make install +sudo ln -s /usr/local/lib/mariadb/libmariadb.so.3 /usr/local/lib/ +sudo ldconfig diff --git a/.github/odbc/odbcinst.ini b/.github/odbc/odbcinst.ini index 849d43d9..fd6a5498 100644 --- a/.github/odbc/odbcinst.ini +++ b/.github/odbc/odbcinst.ini @@ -7,7 +7,7 @@ FileUsage=1 Threading=2 [MySQL Driver] -Driver=/usr/lib/x86_64-linux-gnu/odbc/libmaodbc.so +Driver=/usr/local/lib/mariadb/libmaodbc.so UsageCount = 1 Threading=2 diff --git a/.github/workflows/db-windows.yml b/.github/workflows/db-windows.yml index 4a093777..0edd14a9 100644 --- a/.github/workflows/db-windows.yml +++ b/.github/workflows/db-windows.yml @@ -93,5 +93,5 @@ jobs: - name: Test run: | - testthat::test_local(reporter = testthat::ProgressReporter$new(max_failures = Inf, update_interval = Inf)) + options("odbc.interruptible"=TRUE);testthat::test_local(reporter = testthat::ProgressReporter$new(max_failures = Inf, update_interval = Inf)) shell: Rscript {0} diff --git a/.github/workflows/db.yaml b/.github/workflows/db.yaml index 39feaf8e..66a6b1f1 100644 --- a/.github/workflows/db.yaml +++ b/.github/workflows/db.yaml @@ -62,7 +62,7 @@ jobs: run: | sudo systemctl start mysql.service mysql -uroot -h127.0.0.1 -proot -e 'CREATE DATABASE `test`;' - sudo apt-get install -y odbc-mariadb + .github/odbc/install-mariadb-driver.sh echo "ODBC_CS_MYSQL=dsn=MySQL" >> $GITHUB_ENV - name: Install SQLite Driver @@ -118,5 +118,5 @@ jobs: - name: Test run: | - testthat::test_local(reporter = testthat::ProgressReporter$new(max_failures = Inf, update_interval = Inf)) + options("odbc.interruptible"=TRUE);testthat::test_local(reporter = testthat::ProgressReporter$new(max_failures = Inf, update_interval = Inf)) shell: Rscript {0} diff --git a/NEWS.md b/NEWS.md index 4cb77ebe..8d5366e7 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,12 @@ # odbc (development version) +* Long running queries can now be interrupted using Ctrl-C. This + feature is enabled by default in interactive sessions. It can be + controlled by the `interruptible` argument to `dbConnect`, or by the + global option `odbc.interruptible`. Should be considered experimental - + if you experience problems please file an issue on the package github + repository (#796). + * Raises "Cancelling previous query" warnings from R rather than from Rcpp when a connection has a current result to avoid possible incorrect resource unwinds with `options(warn = 2)` (#797). diff --git a/R/RcppExports.R b/R/RcppExports.R index ffa33bda..aa78a162 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -9,8 +9,8 @@ list_data_sources_ <- function() { .Call(`_odbc_list_data_sources_`) } -odbc_connect <- function(connection_string, timezone = "", timezone_out = "", encoding = "", bigint = 0L, timeout = 0L, r_attributes_ = NULL) { - .Call(`_odbc_odbc_connect`, connection_string, timezone, timezone_out, encoding, bigint, timeout, r_attributes_) +odbc_connect <- function(connection_string, timezone = "", timezone_out = "", encoding = "", bigint = 0L, timeout = 0L, r_attributes = NULL, interruptible_execution = TRUE) { + .Call(`_odbc_odbc_connect`, connection_string, timezone, timezone_out, encoding, bigint, timeout, r_attributes, interruptible_execution) } has_result <- function(p) { @@ -105,10 +105,6 @@ result_bind <- function(r, params, batch_rows) { invisible(.Call(`_odbc_result_bind`, r, params, batch_rows)) } -result_execute <- function(r) { - invisible(.Call(`_odbc_result_execute`, r)) -} - result_insert_dataframe <- function(r, df, batch_rows) { invisible(.Call(`_odbc_result_insert_dataframe`, r, df, batch_rows)) } diff --git a/R/dbi-driver.R b/R/dbi-driver.R index 00a81712..dc69df98 100644 --- a/R/dbi-driver.R +++ b/R/dbi-driver.R @@ -63,8 +63,13 @@ setMethod("show", "OdbcDriver", #' name for the OdbcConnection object returned from [dbConnect()]. However, if #' the driver does not return a valid value, it can be set manually with this #' parameter. -#' @param attributes An S4 object of connection attributes that are passed +#' @param attributes A list of connection attributes that are passed #' prior to the connection being established. See \link{ConnectionAttributes}. +#' @param interruptible Logical. If `TRUE` calls to `SQLExecute` and +#' `SQLExecuteDirect` can be interrupted when the user sends SIGINT ( ctrl-c ). +#' Otherwise, they block. Defaults to `TRUE` in interactive sessions, and +#' `FALSE` otherwise. It can be set explicitly either by manipulating this +#' argument, or by setting the global option `odbc.interruptible`. #' @param ... Additional ODBC keywords. These will be joined with the other #' arguments to form the final connection string. #' @@ -167,6 +172,7 @@ setMethod("dbConnect", "OdbcDriver", pwd = NULL, dbms.name = NULL, attributes = NULL, + interruptible = getOption("odbc.interruptible", interactive()), .connection_string = NULL) { check_string(dsn, allow_null = TRUE) check_string(timezone, allow_null = TRUE) @@ -180,6 +186,7 @@ setMethod("dbConnect", "OdbcDriver", check_string(uid, allow_null = TRUE) check_string(pwd, allow_null = TRUE) check_string(dbms.name, allow_null = TRUE) + check_bool(interruptible) con <- OdbcConnection( dsn = dsn, @@ -196,6 +203,7 @@ setMethod("dbConnect", "OdbcDriver", pwd = pwd, dbms.name = dbms.name, attributes = attributes, + interruptible = interruptible, .connection_string = .connection_string ) diff --git a/R/odbc-connection.R b/R/odbc-connection.R index 167e628e..1be6eb5b 100644 --- a/R/odbc-connection.R +++ b/R/odbc-connection.R @@ -14,6 +14,7 @@ OdbcConnection <- function( timeout = Inf, dbms.name = NULL, attributes = NULL, + interruptible = getOption("odbc.interruptible", interactive()), .connection_string = NULL, call = caller_env(2) ) { @@ -37,7 +38,8 @@ OdbcConnection <- function( encoding = encoding, bigint = bigint, timeout = timeout, - r_attributes_ = attributes + r_attributes = attributes, + interruptible_execution = interruptible ), error = function(cnd) { check_quoting(args) diff --git a/man/OdbcResult.Rd b/man/OdbcResult.Rd index d801d851..150005a9 100644 --- a/man/OdbcResult.Rd +++ b/man/OdbcResult.Rd @@ -54,7 +54,7 @@ with one column per query parameter.} \item{batch_rows}{The number of rows to retrieve. Defaults to \code{NA}, which is set dynamically to the minimum of 1024 and the size of the input. -Depending on the database, driver, dataset and free memory setting this +Depending on the database, driver, dataset and free memory, setting this to a lower value may improve performance.} } \description{ diff --git a/man/dbConnect-OdbcDriver-method.Rd b/man/dbConnect-OdbcDriver-method.Rd index a5f28c05..52e23f57 100644 --- a/man/dbConnect-OdbcDriver-method.Rd +++ b/man/dbConnect-OdbcDriver-method.Rd @@ -24,6 +24,7 @@ odbc() pwd = NULL, dbms.name = NULL, attributes = NULL, + interruptible = getOption("odbc.interruptible", interactive()), .connection_string = NULL ) } @@ -86,9 +87,15 @@ name for the OdbcConnection object returned from \code{\link[=dbConnect]{dbConne the driver does not return a valid value, it can be set manually with this parameter.} -\item{attributes}{An S4 object of connection attributes that are passed +\item{attributes}{A list of connection attributes that are passed prior to the connection being established. See \link{ConnectionAttributes}.} +\item{interruptible}{Logical. If \code{TRUE} calls to \code{SQLExecute} and +\code{SQLExecuteDirect} can be interrupted when the user sends SIGINT ( ctrl-c ). +Otherwise, they block. Defaults to \code{TRUE} in interactive sessions, and +\code{FALSE} otherwise. It can be set explicitly either by manipulating this +argument, or by setting the global option \code{odbc.interruptible}.} + \item{.connection_string}{A complete connection string, useful if you are copy pasting it from another source. If this argument is used, any additional arguments will be appended to this string.} diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 01ccae6b..9d18270a 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -32,8 +32,8 @@ BEGIN_RCPP END_RCPP } // odbc_connect -connection_ptr odbc_connect(std::string const& connection_string, std::string const& timezone, std::string const& timezone_out, std::string const& encoding, int bigint, long timeout, Rcpp::Nullable const& r_attributes_); -RcppExport SEXP _odbc_odbc_connect(SEXP connection_stringSEXP, SEXP timezoneSEXP, SEXP timezone_outSEXP, SEXP encodingSEXP, SEXP bigintSEXP, SEXP timeoutSEXP, SEXP r_attributes_SEXP) { +connection_ptr odbc_connect(std::string const& connection_string, std::string const& timezone, std::string const& timezone_out, std::string const& encoding, int bigint, long timeout, Rcpp::Nullable const& r_attributes, bool const& interruptible_execution); +RcppExport SEXP _odbc_odbc_connect(SEXP connection_stringSEXP, SEXP timezoneSEXP, SEXP timezone_outSEXP, SEXP encodingSEXP, SEXP bigintSEXP, SEXP timeoutSEXP, SEXP r_attributesSEXP, SEXP interruptible_executionSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; @@ -43,8 +43,9 @@ BEGIN_RCPP Rcpp::traits::input_parameter< std::string const& >::type encoding(encodingSEXP); Rcpp::traits::input_parameter< int >::type bigint(bigintSEXP); Rcpp::traits::input_parameter< long >::type timeout(timeoutSEXP); - Rcpp::traits::input_parameter< Rcpp::Nullable const& >::type r_attributes_(r_attributes_SEXP); - rcpp_result_gen = Rcpp::wrap(odbc_connect(connection_string, timezone, timezone_out, encoding, bigint, timeout, r_attributes_)); + Rcpp::traits::input_parameter< Rcpp::Nullable const& >::type r_attributes(r_attributesSEXP); + Rcpp::traits::input_parameter< bool const& >::type interruptible_execution(interruptible_executionSEXP); + rcpp_result_gen = Rcpp::wrap(odbc_connect(connection_string, timezone, timezone_out, encoding, bigint, timeout, r_attributes, interruptible_execution)); return rcpp_result_gen; END_RCPP } @@ -306,16 +307,6 @@ BEGIN_RCPP return R_NilValue; END_RCPP } -// result_execute -void result_execute(result_ptr const& r); -RcppExport SEXP _odbc_result_execute(SEXP rSEXP) { -BEGIN_RCPP - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< result_ptr const& >::type r(rSEXP); - result_execute(r); - return R_NilValue; -END_RCPP -} // result_insert_dataframe void result_insert_dataframe(result_ptr const& r, DataFrame const& df, size_t batch_rows); RcppExport SEXP _odbc_result_insert_dataframe(SEXP rSEXP, SEXP dfSEXP, SEXP batch_rowsSEXP) { @@ -375,7 +366,7 @@ END_RCPP static const R_CallMethodDef CallEntries[] = { {"_odbc_list_drivers_", (DL_FUNC) &_odbc_list_drivers_, 0}, {"_odbc_list_data_sources_", (DL_FUNC) &_odbc_list_data_sources_, 0}, - {"_odbc_odbc_connect", (DL_FUNC) &_odbc_odbc_connect, 7}, + {"_odbc_odbc_connect", (DL_FUNC) &_odbc_odbc_connect, 8}, {"_odbc_has_result", (DL_FUNC) &_odbc_has_result, 1}, {"_odbc_connection_info", (DL_FUNC) &_odbc_connection_info, 1}, {"_odbc_connection_quote", (DL_FUNC) &_odbc_connection_quote, 1}, @@ -399,7 +390,6 @@ static const R_CallMethodDef CallEntries[] = { {"_odbc_result_fetch", (DL_FUNC) &_odbc_result_fetch, 2}, {"_odbc_result_column_info", (DL_FUNC) &_odbc_result_column_info, 1}, {"_odbc_result_bind", (DL_FUNC) &_odbc_result_bind, 3}, - {"_odbc_result_execute", (DL_FUNC) &_odbc_result_execute, 1}, {"_odbc_result_insert_dataframe", (DL_FUNC) &_odbc_result_insert_dataframe, 3}, {"_odbc_result_describe_parameters", (DL_FUNC) &_odbc_result_describe_parameters, 2}, {"_odbc_result_rows_affected", (DL_FUNC) &_odbc_result_rows_affected, 1}, diff --git a/src/connection.cpp b/src/connection.cpp index 0eae2f53..1aeb58f2 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -53,7 +53,8 @@ connection_ptr odbc_connect( std::string const& encoding = "", int bigint = 0, long timeout = 0, - Rcpp::Nullable const& r_attributes_ = R_NilValue) { + Rcpp::Nullable const& r_attributes = R_NilValue, + bool const& interruptible_execution = true) { return connection_ptr( new std::shared_ptr(new odbc_connection( connection_string, @@ -62,7 +63,8 @@ connection_ptr odbc_connect( encoding, static_cast(bigint), timeout, - r_attributes_))); + r_attributes, + interruptible_execution))); } std::string get_info_or_empty(connection_ptr const& p, short type) { diff --git a/src/nanodbc/nanodbc.cpp b/src/nanodbc/nanodbc.cpp index f87e7c3a..10626602 100644 --- a/src/nanodbc/nanodbc.cpp +++ b/src/nanodbc/nanodbc.cpp @@ -505,19 +505,13 @@ const std::string database_error::state() const NANODBC_NOEXCEPT return sql_state; } -void database_error::rethrow() { - Rcpp::Environment pkg = Rcpp::Environment::namespace_env("odbc"); - Rcpp::Function rethrow_database_error = pkg["rethrow_database_error"]; - rethrow_database_error(message); -} - } // namespace nanodbc // Throwing exceptions using NANODBC_THROW_DATABASE_ERROR enables file name // and line numbers to be inserted into the error message. Useful for debugging. #define NANODBC_THROW_DATABASE_ERROR(handle, handle_type) \ - nanodbc::database_error( \ - handle, handle_type, __FILE__ ":" NANODBC_STRINGIZE(__LINE__) ": ").rethrow() /**/ + throw nanodbc::database_error( \ + handle, handle_type, __FILE__ ":" NANODBC_STRINGIZE(__LINE__) ": ") /**/ // clang-format off // 8888888b. 888 d8b 888 diff --git a/src/nanodbc/nanodbc.h b/src/nanodbc/nanodbc.h index 1be42129..e994952e 100644 --- a/src/nanodbc/nanodbc.h +++ b/src/nanodbc/nanodbc.h @@ -267,7 +267,6 @@ class database_error : public std::runtime_error const char* what() const NANODBC_NOEXCEPT; long native() const NANODBC_NOEXCEPT; const std::string state() const NANODBC_NOEXCEPT; - void rethrow(); private: long native_error; diff --git a/src/odbc_connection.cpp b/src/odbc_connection.cpp index 5dad3614..e17bb19f 100644 --- a/src/odbc_connection.cpp +++ b/src/odbc_connection.cpp @@ -33,11 +33,13 @@ odbc_connection::odbc_connection( std::string encoding, bigint_map_t bigint_mapping, long timeout, - Rcpp::Nullable const& r_attributes_) + Rcpp::Nullable const& r_attributes, + bool const& interruptible_execution) : current_result_(nullptr), timezone_out_str_(timezone_out), encoding_(encoding), - bigint_mapping_(bigint_mapping) { + bigint_mapping_(bigint_mapping), + interruptible_execution_(interruptible_execution) { if (!cctz::load_time_zone(timezone, &timezone_)) { Rcpp::stop("Error loading time zone (%s)", timezone); @@ -53,10 +55,11 @@ odbc_connection::odbc_connection( std::list< nanodbc::connection::attribute > attributes; std::list< std::shared_ptr< void > > buffer_context; utils::prepare_connection_attributes( - timeout, r_attributes_, attributes, buffer_context ); + timeout, r_attributes, attributes, buffer_context ); c_ = std::make_shared(connection_string, attributes); } catch (const nanodbc::database_error& e) { - throw Rcpp::exception(e.what(), FALSE); + Iconv encoder(this->encoding(), "UTF-8"); + utils::raise_error(odbc_error(e, "", encoder)); } } diff --git a/src/odbc_connection.h b/src/odbc_connection.h index e018614a..0a3bbbe5 100644 --- a/src/odbc_connection.h +++ b/src/odbc_connection.h @@ -26,7 +26,8 @@ class odbc_connection { std::string encoding = "", bigint_map_t bigint_mapping = i64_to_integer64, long timeout = 0, - Rcpp::Nullable const& r_attributes_ = R_NilValue); + Rcpp::Nullable const& r_attributes = R_NilValue, + bool const& interruptible_execution = true); std::shared_ptr connection() const; @@ -57,5 +58,6 @@ class odbc_connection { std::string timezone_out_str_; std::string encoding_; bigint_map_t bigint_mapping_; + bool interruptible_execution_; }; } // namespace odbc diff --git a/src/odbc_result.cpp b/src/odbc_result.cpp index 3c4270a9..fc81b513 100644 --- a/src/odbc_result.cpp +++ b/src/odbc_result.cpp @@ -1,11 +1,16 @@ #include "odbc_result.h" #include "integer64.h" #include "time_zone.h" +#include "utils.h" #include #include namespace odbc { +using odbc::utils::run_interruptible; +using odbc::utils::raise_message; +using odbc::utils::raise_warning; +using odbc::utils::raise_error; odbc_result::odbc_result( std::shared_ptr c, std::string sql, bool immediate) : c_(c), @@ -14,26 +19,25 @@ odbc_result::odbc_result( num_columns_(0), complete_(0), bound_(false), + immediate_(immediate), output_encoder_(Iconv(c_->encoding(), "UTF-8")) { c_->cancel_current_result(); - if (immediate) { - s_ = std::make_shared(); - bound_ = true; - r_ = std::make_shared( - s_->execute_direct(*c_->connection(), sql_)); - num_columns_ = r_->columns(); - c_->set_current_result(this); + if (c_->interruptible_execution_) { + auto exec_fn = std::mem_fn(&odbc_result::execute); + auto cleanup_fn = [this]() { + this->c_->set_current_result(nullptr); + this->s_->close(); + this->s_.reset(); + }; + run_interruptible(std::bind(exec_fn, this), cleanup_fn); } else { - prepare(); - c_->set_current_result(this); - if (s_->parameters() == 0) { - bound_ = true; - execute(); - } + this->execute(); } + return; } + std::shared_ptr odbc_result::connection() const { return std::shared_ptr(c_); } @@ -43,21 +47,34 @@ std::shared_ptr odbc_result::statement() const { std::shared_ptr odbc_result::result() const { return std::shared_ptr(r_); } -void odbc_result::prepare() { - s_ = std::make_shared(*c_->connection(), sql_); -} + void odbc_result::execute() { - if (!r_) { - try { - r_ = std::make_shared(s_->execute()); + try { + c_->set_current_result(this); + s_ = std::make_shared(); + if (!this->immediate_) s_->prepare(*c_->connection(), sql_); + if (this->immediate_ || (s_->parameters() == 0)) { + bound_ = true; + r_ = std::make_shared( + this->immediate_ ? s_->execute_direct(*c_->connection(), sql_) : + s_->execute()); num_columns_ = r_->columns(); - } catch (const nanodbc::database_error& e) { - c_->set_current_result(nullptr); + } + } catch (const nanodbc::database_error& e) { + c_->set_current_result(nullptr); + if (c_->interruptible_execution_) { + // Executing in a thread away from main. Signal + // that we have encountered an error using an exception. + // Main thread will raise the formatted [R] error. throw odbc_error(e, sql_, output_encoder_); - } catch (...) { - c_->set_current_result(nullptr); - throw; + } else { + // Executing on the main thread. Raise the + // formatted [R] errpr ourselves. + raise_error(odbc_error(e, sql_, output_encoder_)); } + } catch (...) { + c_->set_current_result(nullptr); + throw; } } @@ -194,7 +211,7 @@ void odbc_result::unbind_if_needed() { } } } catch (const nanodbc::database_error& e) { - Rcpp::warning("Was unable to unbind some nanodbc buffers"); + raise_warning("Was unable to unbind some nanodbc buffers"); }; } diff --git a/src/odbc_result.h b/src/odbc_result.h index 74be7925..baad43c6 100644 --- a/src/odbc_result.h +++ b/src/odbc_result.h @@ -24,7 +24,10 @@ class odbc_error : public Rcpp::exception { const std::string& sql, Iconv& output_encoder) : Rcpp::exception("", false) { - std::string m = std::string(e.what()) + "\n '" + sql + "'"; + std::string m = std::string(e.what()); + if (sql != "") { + m += "\n '" + sql + "'"; + } // #432: [R] expects UTF-8 encoded strings but both nanodbc and sql are // encoded in the database encoding, which may differ from UTF-8 message = Rf_translateChar( @@ -45,7 +48,6 @@ class odbc_result { std::shared_ptr statement() const; std::shared_ptr result() const; void prepare(); - void execute(); void describe_parameters(Rcpp::List const& x); void bind_list(Rcpp::List const& x, bool use_transaction, size_t batch_rows); Rcpp::DataFrame fetch(int n_max = -1); @@ -70,6 +72,7 @@ class odbc_result { int num_columns_; bool complete_; bool bound_; + bool immediate_; Iconv output_encoder_; std::map> strings_; @@ -82,6 +85,11 @@ class odbc_result { void clear_buffers(); void unbind_if_needed(); + // Private method - use only in constructor. + // It will allocate nanodbc resources ( statement, result ) + // and call execute. + void execute(); + void bind_columns( nanodbc::statement& statement, r_type type, diff --git a/src/result.cpp b/src/result.cpp index 9fbca8a7..21a6db47 100644 --- a/src/result.cpp +++ b/src/result.cpp @@ -49,9 +49,6 @@ void result_bind(result_ptr const& r, List const& params, size_t batch_rows) { r->bind_list(params, false, batch_rows); } -// [[Rcpp::export]] -void result_execute(result_ptr const& r) { r->execute(); } - // [[Rcpp::export]] void result_insert_dataframe( result_ptr const& r, DataFrame const& df, size_t batch_rows) { diff --git a/src/utils.cpp b/src/utils.cpp index 6a34bec8..412b075e 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -1,7 +1,10 @@ #include #include +#include #include "utils.h" - +#if !defined(_WIN32) && !defined(_WIN64) +#include +#endif namespace odbc { namespace utils { @@ -47,4 +50,79 @@ namespace utils { } } } + + void run_interruptible(const std::function& exec_fn, const std::function& cleanup_fn) + { + std::exception_ptr eptr; +#if !defined(_WIN32) && !defined(_WIN64) + sigset_t set, old_set; + sigemptyset(&set); + sigaddset(&set, SIGINT); + int rc = pthread_sigmask(SIG_BLOCK, &set, &old_set); + if ( rc != 0 ) + { + // Unable to properly mask SIGINT from execution thread + raise_warning("Unexpected behavior when creating execution thread. Signals to interrupt execution may not be caught."); + } +#endif + auto future = std::async(std::launch::async, [&exec_fn, &eptr]() { + try { + exec_fn(); + } catch (...) { + eptr = std::current_exception(); + } + return; + }); +#if !defined(_WIN32) && !defined(_WIN64) + pthread_sigmask(SIG_SETMASK, &old_set, NULL); +#endif + std::future_status status; + do { + status = future.wait_for(std::chrono::seconds(1)); + if (status != std::future_status::ready) { + try { Rcpp::checkUserInterrupt(); } + catch (const Rcpp::internal::InterruptedException& e) { + raise_message("Caught user interrupt, attempting a clean exit..."); + cleanup_fn(); + } catch (...) { throw; } + } + } while (status != std::future_status::ready); + if (eptr) { + // An exception was thrown in the thread + try { std::rethrow_exception(eptr); } + catch (const odbc_error& e) { raise_error(e); } + catch (...) { raise_message("Unknown exception while executing"); throw; }; + } + } + + void raise_message(const std::string& message) { + Rcpp::Environment pkg = Rcpp::Environment::namespace_env("cli"); + Rcpp::Function r_method = pkg["cli_inform"]; + Rcpp::CharacterVector argMessage = + Rcpp::CharacterVector::create(Rcpp::Named("i", message)); + r_method(argMessage); + } + + void raise_warning(const std::string& message) { + Rcpp::Environment pkg = Rcpp::Environment::namespace_env("cli"); + Rcpp::Function r_method = pkg["cli_warn"]; + Rcpp::CharacterVector argMessage = + Rcpp::CharacterVector::create(Rcpp::Named("!", message)); + r_method(argMessage); + } + + void raise_error(const std::string& message) { + Rcpp::Environment pkg = Rcpp::Environment::namespace_env("cli"); + Rcpp::Function r_method = pkg["cli_abort"]; + Rcpp::CharacterVector argMessage = + Rcpp::CharacterVector::create(Rcpp::Named("x", message)); + r_method(argMessage); + } + + void raise_error(const odbc_error& e) { + Rcpp::Environment pkg = Rcpp::Environment::namespace_env("odbc"); + Rcpp::Function r_method = pkg["rethrow_database_error"]; + r_method(e.what()); + } + }} diff --git a/src/utils.h b/src/utils.h index 23cd4d6e..2f2bee9d 100644 --- a/src/utils.h +++ b/src/utils.h @@ -7,6 +7,7 @@ #include #include "sql_types.h" +#include "odbc_result.h" #include "nanodbc.h" namespace odbc { @@ -44,5 +45,39 @@ namespace utils { /// \param token The authentication token. /// \return A shared pointer to the buffer containing the serialized structure. std::shared_ptr< void > serialize_azure_token( const std::string& token ); + + /// \brief Wrapper to allow for interruptible execution of argument function + /// + /// The execution function is relegated to a separate thread. + /// On the main thread, we wait for the execution to complete while + /// at the same time checking for user interrupts every one second. + /// + /// \param exec_fn Function executed on a separate thread. Exceptions + /// are caught and re-thrown on the main thread. + /// \param cleanup_fn Function executed on main thread in the event a + /// user interrupt is caught. + void run_interruptible(const std::function& exec_fn, const std::function& cleanup_fn); + + /// \brief Entry point for package::cli::cli_inform + /// + /// \param message Message to be shown. + void raise_message(const std::string& message); + + /// \brief Entry point for package::cli::cli_warn + /// + /// \param message Message to be shown. + void raise_warning(const std::string& message); + + /// \brief Entry point for package::cli::cli_abort + /// + /// \param message Message to be shown. + void raise_error(const std::string& message); + + /// \brief Entry point for package::odbc:::rethrow_database_error + /// + /// On the [R] side, the message ( e.what() ) is parsed and + /// displayed in a user friendly / multi-line format. + /// \param e (nanodbc::database_)Exception to be raised. + void raise_error(const odbc_error& e); }} #endif diff --git a/tests/testthat/_snaps/utils.md b/tests/testthat/_snaps/utils.md index 26dd646b..a906df6c 100644 --- a/tests/testthat/_snaps/utils.md +++ b/tests/testthat/_snaps/utils.md @@ -27,7 +27,7 @@ Error in `dbConnect()`: ! ODBC failed with error 00000 from [unixODBC][Driver Manager]. x Data source name not found and no default driver specified - i From 'nanodbc/nanodbc.cpp:1156'. + i From 'nanodbc/nanodbc.cpp:1150'. --- @@ -37,7 +37,8 @@ Error in `dbExecute()`: ! ODBC failed with error 00000 from [SQLite]. x no such table: boopbopbopbeep (1) - i From 'nanodbc/nanodbc.cpp:1728'. + * 'SELECT * FROM boopbopbopbeep' + i From 'nanodbc/nanodbc.cpp:1722'. # rethrow_database_error() errors well when parse_database_error() fails