From 89b95eeb6883d47654709b135545e73fafc61e6f Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Thu, 5 Dec 2024 13:34:18 -0800 Subject: [PATCH 1/3] fix: impl timeout layer. --- Cargo.lock | 1 + crates/provider/Cargo.toml | 1 + crates/provider/src/alloy/metrics.rs | 5 + crates/provider/src/alloy/mod.rs | 10 +- crates/provider/src/alloy/provider_timeout.rs | 137 ++++++++++++++++++ crates/types/src/task/status_code.rs | 1 + 6 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 crates/provider/src/alloy/provider_timeout.rs diff --git a/Cargo.lock b/Cargo.lock index 78ddb0282..7b7991c3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4933,6 +4933,7 @@ dependencies = [ "const-hex", "futures-util", "mockall", + "pin-project", "reqwest", "reth-tasks", "rundler-bindings-fastlz", diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 6f6319ee1..a3b521057 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -34,6 +34,7 @@ async-trait.workspace = true auto_impl.workspace = true const-hex.workspace = true futures-util.workspace = true +pin-project.workspace = true reqwest.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/provider/src/alloy/metrics.rs b/crates/provider/src/alloy/metrics.rs index 18650c353..5c293021e 100644 --- a/crates/provider/src/alloy/metrics.rs +++ b/crates/provider/src/alloy/metrics.rs @@ -126,6 +126,11 @@ where method_logger.record_http(HttpCode::TwoHundreds); method_logger.record_rpc(RpcCode::Success); } + // for timeout error + alloy_json_rpc::RpcError::LocalUsageError(_) => { + method_logger.record_http(HttpCode::FourHundreds); + method_logger.record_rpc(RpcCode::ClientSideTimeout); + } _ => {} } } diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 49f08fac7..97d114662 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -11,6 +11,8 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. +use std::time::Duration; + use alloy_provider::{Provider as AlloyProvider, ProviderBuilder}; use alloy_rpc_client::ClientBuilder; use alloy_transport::layers::RetryBackoffService; @@ -18,6 +20,7 @@ use alloy_transport_http::Http; use anyhow::Context; use evm::AlloyEvmProvider; use metrics::{AlloyMetricLayer, AlloyMetricMiddleware}; +use provider_timeout::{ProviderTimeout, ProviderTimeoutLayer}; use reqwest::Client; use url::Url; @@ -28,6 +31,7 @@ pub use da::new_alloy_da_gas_oracle; pub(crate) mod entry_point; pub(crate) mod evm; pub(crate) mod metrics; +mod provider_timeout; /// Create a new alloy evm provider from a given RPC URL pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result { @@ -39,15 +43,19 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result anyhow::Result< - impl AlloyProvider>>> + Clone, + impl AlloyProvider>>>> + + Clone, > { let url = Url::parse(rpc_url).context("invalid rpc url")?; let metric_layer = AlloyMetricLayer::default(); // TODO: make this configurable: use a large number for CUPS for now let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000); + // add a timeout layer here. + let timeout_layer = ProviderTimeoutLayer::new(Duration::from_secs(10)); let client = ClientBuilder::default() .layer(retry_layer) .layer(metric_layer) + .layer(timeout_layer) .http(url); let provider = ProviderBuilder::new().on_client(client); Ok(provider) diff --git a/crates/provider/src/alloy/provider_timeout.rs b/crates/provider/src/alloy/provider_timeout.rs new file mode 100644 index 000000000..2d9db2e3e --- /dev/null +++ b/crates/provider/src/alloy/provider_timeout.rs @@ -0,0 +1,137 @@ +//! Middleware that applies a timeout to requests. +//! +//! If the response does not complete within the specified timeout, the response +//! will be aborted. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::TransportError; +use pin_project::pin_project; +use tokio::time::Sleep; +use tower::{Layer, Service}; + +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug, Clone)] +pub(crate) struct ProviderTimeoutLayer { + timeout: Duration, +} + +impl ProviderTimeoutLayer { + /// Create a timeout from a duration + pub(crate) fn new(timeout: Duration) -> Self { + ProviderTimeoutLayer { timeout } + } +} + +impl Layer for ProviderTimeoutLayer +where + S: Service + Sync, +{ + type Service = ProviderTimeout; + + fn layer(&self, service: S) -> Self::Service { + ProviderTimeout::new(service, self.timeout) + } +} + +/// Applies a timeout to requests. +#[derive(Debug)] +pub struct ProviderTimeout { + service: S, + timeout: Duration, +} + +// ===== impl Timeout ===== + +impl ProviderTimeout +where + S: Service + Sync, +{ + /// Creates a new [`Timeout`] + pub const fn new(service: S, timeout: Duration) -> Self { + ProviderTimeout { service, timeout } + } +} + +impl Clone for ProviderTimeout +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + timeout: self.timeout, + } + } +} +impl Service for ProviderTimeout +where + S: Service + + Sync + + Send + + Clone + + 'static, + S::Future: Send, +{ + type Response = S::Response; + type Error = TransportError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.service.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + } + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let response = self.service.call(request); + let sleep = tokio::time::sleep(self.timeout); + ResponseFuture::new(response, sleep) + } +} + +#[pin_project] +#[derive(Debug)] +pub struct ResponseFuture { + #[pin] + response: T, + #[pin] + sleep: Sleep, +} + +impl ResponseFuture { + pub(crate) fn new(response: T, sleep: Sleep) -> Self { + ResponseFuture { response, sleep } + } +} + +impl Future for ResponseFuture +where + F: Future>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // First, try polling the future + match this.response.poll(cx) { + Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)), + Poll::Pending => {} + } + // Now check the sleep + match this.sleep.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(Err(TransportError::local_usage_str( + "provider request timeout from client side", + ))), + } + } +} diff --git a/crates/types/src/task/status_code.rs b/crates/types/src/task/status_code.rs index 610e0ccdd..fe9471764 100644 --- a/crates/types/src/task/status_code.rs +++ b/crates/types/src/task/status_code.rs @@ -24,6 +24,7 @@ pub enum RpcCode { Other, InvalidParams, DeadlineExceed, + ClientSideTimeout, MethodNotFound, AlreadyExist, PermissionDenied, From 6054f88e30d3f78bc05dd8d1c39772cf37fab610 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Thu, 19 Dec 2024 14:41:29 -0500 Subject: [PATCH 2/3] feat: pass a flag to contorl timeout seconds --- Cargo.lock | 26 ++++++++++++++++ Cargo.toml | 1 + bin/rundler/Cargo.toml | 1 + bin/rundler/src/cli/builder.rs | 3 ++ bin/rundler/src/cli/mod.rs | 9 ++++++ crates/builder/src/sender/mod.rs | 9 ++++-- crates/builder/src/task.rs | 12 +++++--- crates/provider/Cargo.toml | 1 + crates/provider/src/alloy/mod.rs | 53 ++++++++++++++++++++++++++++++-- 9 files changed, 105 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b7991c3d..4c0ebb1f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -873,6 +873,12 @@ dependencies = [ "serde", ] +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "async-channel" version = "1.9.0" @@ -1727,6 +1733,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "clang-sys" version = "1.8.1" @@ -4791,6 +4803,7 @@ dependencies = [ "serde", "serde_json", "sscanf", + "tiny_http", "tokio", "tokio-metrics", "tokio-rustls 0.26.1", @@ -4942,6 +4955,7 @@ dependencies = [ "rundler-types", "rundler-utils", "thiserror 1.0.69", + "tiny_http", "tokio", "tower 0.4.13", "tracing", @@ -5940,6 +5954,18 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinystr" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index d9ecdda1e..7ca038099 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,3 +94,4 @@ tower-http = { version = "0.6.2", features = ["cors"] } tracing = "0.1.40" strum = { version = "0.26.3", features = ["derive"] } url = "2.5.2" +tiny_http = "0.12.0" diff --git a/bin/rundler/Cargo.toml b/bin/rundler/Cargo.toml index f8665bc15..ac2adf83a 100644 --- a/bin/rundler/Cargo.toml +++ b/bin/rundler/Cargo.toml @@ -41,6 +41,7 @@ reth-tasks.workspace = true serde.workspace = true serde_json.workspace = true sscanf = "0.4.2" +tiny_http.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync"] } tokio-metrics = "0.3.1" tokio-rustls = "0.26.0" diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 6376361ed..0e5f485a1 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -336,6 +336,8 @@ impl BuilderArgs { let da_gas_tracking_enabled = super::lint_da_gas_tracking(common.da_gas_tracking_enabled, &chain_spec); + let provider_client_timeout_seconds = common.provider_client_timeout_seconds; + Ok(BuilderTaskArgs { entry_points, chain_spec, @@ -358,6 +360,7 @@ impl BuilderArgs { max_replacement_underpriced_blocks: self.max_replacement_underpriced_blocks, remote_address, da_gas_tracking_enabled, + provider_client_timeout_seconds, }) } diff --git a/bin/rundler/src/cli/mod.rs b/bin/rundler/src/cli/mod.rs index 577ee8ad7..959c37773 100644 --- a/bin/rundler/src/cli/mod.rs +++ b/bin/rundler/src/cli/mod.rs @@ -346,6 +346,14 @@ pub struct CommonArgs { default_value = "false" )] pub da_gas_tracking_enabled: bool, + + #[arg( + long = "provider_client_timeout_seconds", + name = "provider_client_timeout_seconds", + env = "PROVIDER_CLIENT_TIMEOUT_SECONDS", + default_value = "10" + )] + pub provider_client_timeout_seconds: u64, } const SIMULATION_GAS_OVERHEAD: u64 = 100_000; @@ -594,6 +602,7 @@ pub fn construct_providers( ) -> anyhow::Result { let provider = Arc::new(rundler_provider::new_alloy_provider( args.node_http.as_ref().context("must provide node_http")?, + args.provider_client_timeout_seconds, )?); let (da_gas_oracle, da_gas_oracle_sync) = rundler_provider::new_alloy_da_gas_oracle(chain_spec, provider.clone()); diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index 9c94c8961..60a97a3e5 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -171,12 +171,17 @@ impl TransactionSenderArgs { self, rpc_url: &str, signer: S, + provider_client_timeout_seconds: u64, ) -> std::result::Result, SenderConstructorErrors> { - let provider = rundler_provider::new_alloy_evm_provider(rpc_url)?; + let provider = + rundler_provider::new_alloy_evm_provider(rpc_url, provider_client_timeout_seconds)?; let sender = match self { Self::Raw(args) => { - let submitter = rundler_provider::new_alloy_evm_provider(&args.submit_url)?; + let submitter = rundler_provider::new_alloy_evm_provider( + &args.submit_url, + provider_client_timeout_seconds, + )?; if args.use_submit_for_status { TransactionSenderEnum::Raw(RawTransactionSender::new( diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index b0d6e5230..8f8c22ce2 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -89,6 +89,8 @@ pub struct Args { pub entry_points: Vec, /// Enable DA tracking pub da_gas_tracking_enabled: bool, + /// Provider client timeout + pub provider_client_timeout_seconds: u64, } /// Builder settings for an entrypoint @@ -355,11 +357,11 @@ where da_gas_tracking_enabled: self.args.da_gas_tracking_enabled, }; - let transaction_sender = self - .args - .sender_args - .clone() - .into_sender(&self.args.rpc_url, signer)?; + let transaction_sender = self.args.sender_args.clone().into_sender( + &self.args.rpc_url, + signer, + self.args.provider_client_timeout_seconds, + )?; let tracker_settings = transaction_tracker::Settings { replacement_fee_percent_increase: self.args.replacement_fee_percent_increase, diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index a3b521057..be07752dc 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -52,4 +52,5 @@ alloy-node-bindings = "0.4.2" alloy-provider = { workspace = true, features = ["debug-api", "anvil-node"] } alloy-sol-macro.workspace = true rundler-provider = { workspace = true, features = ["test-utils"] } +tiny_http.workspace = true tokio.workspace = true diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 97d114662..673e9eb91 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -34,14 +34,18 @@ pub(crate) mod metrics; mod provider_timeout; /// Create a new alloy evm provider from a given RPC URL -pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result { - let provider = new_alloy_provider(rpc_url)?; +pub fn new_alloy_evm_provider( + rpc_url: &str, + provider_client_timeout_seconds: u64, +) -> anyhow::Result { + let provider = new_alloy_provider(rpc_url, provider_client_timeout_seconds)?; Ok(AlloyEvmProvider::new(provider)) } /// Create a new alloy provider from a given RPC URL pub fn new_alloy_provider( rpc_url: &str, + provider_client_timeout_seconds: u64, ) -> anyhow::Result< impl AlloyProvider>>>> + Clone, @@ -51,7 +55,8 @@ pub fn new_alloy_provider( // TODO: make this configurable: use a large number for CUPS for now let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000); // add a timeout layer here. - let timeout_layer = ProviderTimeoutLayer::new(Duration::from_secs(10)); + let timeout_layer = + ProviderTimeoutLayer::new(Duration::from_secs(provider_client_timeout_seconds)); let client = ClientBuilder::default() .layer(retry_layer) .layer(metric_layer) @@ -60,3 +65,45 @@ pub fn new_alloy_provider( let provider = ProviderBuilder::new().on_client(client); Ok(provider) } + +#[cfg(test)] +mod tests { + use std::{ + thread::{self, sleep}, + time::Duration, + }; + + use alloy_provider::Provider; + use tiny_http::{Response, Server}; + + use crate::new_alloy_provider; + fn setup() { + let server = Server::http("0.0.0.0:8000").unwrap(); + for request in server.incoming_requests() { + sleep(Duration::from_secs(10)); + let _ = request.respond(Response::from_string( + "{\"jsonrpc\": \"2.0\", \"id\": 1, \"result\": \"0x146b6d7\"}", + )); + } + } + #[tokio::test] + async fn test_timeout() { + thread::spawn(move || { + setup(); + }); + { + // Wait 11 seconds and get result + let provider = new_alloy_provider("http://localhost:8000", 11) + .expect("can not initialize provider"); + let x = provider.get_block_number().await; + assert!(x.is_ok()); + } + { + // Wait 9 seconds and timeout form client side + let provider = new_alloy_provider("http://localhost:8000", 9) + .expect("can not initialize provider"); + let x = provider.get_block_number().await; + assert!(x.is_err()); + } + } +} From 6eda480f2d9d363a0f3ed25972facea9515ab785 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Thu, 19 Dec 2024 16:19:19 -0500 Subject: [PATCH 3/3] fix: make the unit test less flaky --- Cargo.lock | 3 +-- bin/rundler/Cargo.toml | 1 - crates/provider/src/alloy/mod.rs | 4 ++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c0ebb1f5..6ec2f367f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -4803,7 +4803,6 @@ dependencies = [ "serde", "serde_json", "sscanf", - "tiny_http", "tokio", "tokio-metrics", "tokio-rustls 0.26.1", diff --git a/bin/rundler/Cargo.toml b/bin/rundler/Cargo.toml index ac2adf83a..f8665bc15 100644 --- a/bin/rundler/Cargo.toml +++ b/bin/rundler/Cargo.toml @@ -41,7 +41,6 @@ reth-tasks.workspace = true serde.workspace = true serde_json.workspace = true sscanf = "0.4.2" -tiny_http.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync"] } tokio-metrics = "0.3.1" tokio-rustls = "0.26.0" diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 673e9eb91..1256b9cca 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -93,14 +93,14 @@ mod tests { }); { // Wait 11 seconds and get result - let provider = new_alloy_provider("http://localhost:8000", 11) + let provider = new_alloy_provider("http://localhost:8000", 15) .expect("can not initialize provider"); let x = provider.get_block_number().await; assert!(x.is_ok()); } { // Wait 9 seconds and timeout form client side - let provider = new_alloy_provider("http://localhost:8000", 9) + let provider = new_alloy_provider("http://localhost:8000", 5) .expect("can not initialize provider"); let x = provider.get_block_number().await; assert!(x.is_err());