From 89b95eeb6883d47654709b135545e73fafc61e6f Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Thu, 5 Dec 2024 13:34:18 -0800 Subject: [PATCH] fix: impl timeout layer. --- Cargo.lock | 1 + crates/provider/Cargo.toml | 1 + crates/provider/src/alloy/metrics.rs | 5 + crates/provider/src/alloy/mod.rs | 10 +- crates/provider/src/alloy/provider_timeout.rs | 137 ++++++++++++++++++ crates/types/src/task/status_code.rs | 1 + 6 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 crates/provider/src/alloy/provider_timeout.rs diff --git a/Cargo.lock b/Cargo.lock index 78ddb0282..7b7991c3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4933,6 +4933,7 @@ dependencies = [ "const-hex", "futures-util", "mockall", + "pin-project", "reqwest", "reth-tasks", "rundler-bindings-fastlz", diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 6f6319ee1..a3b521057 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -34,6 +34,7 @@ async-trait.workspace = true auto_impl.workspace = true const-hex.workspace = true futures-util.workspace = true +pin-project.workspace = true reqwest.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/provider/src/alloy/metrics.rs b/crates/provider/src/alloy/metrics.rs index 18650c353..5c293021e 100644 --- a/crates/provider/src/alloy/metrics.rs +++ b/crates/provider/src/alloy/metrics.rs @@ -126,6 +126,11 @@ where method_logger.record_http(HttpCode::TwoHundreds); method_logger.record_rpc(RpcCode::Success); } + // for timeout error + alloy_json_rpc::RpcError::LocalUsageError(_) => { + method_logger.record_http(HttpCode::FourHundreds); + method_logger.record_rpc(RpcCode::ClientSideTimeout); + } _ => {} } } diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 49f08fac7..97d114662 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -11,6 +11,8 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. +use std::time::Duration; + use alloy_provider::{Provider as AlloyProvider, ProviderBuilder}; use alloy_rpc_client::ClientBuilder; use alloy_transport::layers::RetryBackoffService; @@ -18,6 +20,7 @@ use alloy_transport_http::Http; use anyhow::Context; use evm::AlloyEvmProvider; use metrics::{AlloyMetricLayer, AlloyMetricMiddleware}; +use provider_timeout::{ProviderTimeout, ProviderTimeoutLayer}; use reqwest::Client; use url::Url; @@ -28,6 +31,7 @@ pub use da::new_alloy_da_gas_oracle; pub(crate) mod entry_point; pub(crate) mod evm; pub(crate) mod metrics; +mod provider_timeout; /// Create a new alloy evm provider from a given RPC URL pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result { @@ -39,15 +43,19 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result anyhow::Result< - impl AlloyProvider>>> + Clone, + impl AlloyProvider>>>> + + Clone, > { let url = Url::parse(rpc_url).context("invalid rpc url")?; let metric_layer = AlloyMetricLayer::default(); // TODO: make this configurable: use a large number for CUPS for now let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000); + // add a timeout layer here. + let timeout_layer = ProviderTimeoutLayer::new(Duration::from_secs(10)); let client = ClientBuilder::default() .layer(retry_layer) .layer(metric_layer) + .layer(timeout_layer) .http(url); let provider = ProviderBuilder::new().on_client(client); Ok(provider) diff --git a/crates/provider/src/alloy/provider_timeout.rs b/crates/provider/src/alloy/provider_timeout.rs new file mode 100644 index 000000000..2d9db2e3e --- /dev/null +++ b/crates/provider/src/alloy/provider_timeout.rs @@ -0,0 +1,137 @@ +//! Middleware that applies a timeout to requests. +//! +//! If the response does not complete within the specified timeout, the response +//! will be aborted. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::TransportError; +use pin_project::pin_project; +use tokio::time::Sleep; +use tower::{Layer, Service}; + +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug, Clone)] +pub(crate) struct ProviderTimeoutLayer { + timeout: Duration, +} + +impl ProviderTimeoutLayer { + /// Create a timeout from a duration + pub(crate) fn new(timeout: Duration) -> Self { + ProviderTimeoutLayer { timeout } + } +} + +impl Layer for ProviderTimeoutLayer +where + S: Service + Sync, +{ + type Service = ProviderTimeout; + + fn layer(&self, service: S) -> Self::Service { + ProviderTimeout::new(service, self.timeout) + } +} + +/// Applies a timeout to requests. +#[derive(Debug)] +pub struct ProviderTimeout { + service: S, + timeout: Duration, +} + +// ===== impl Timeout ===== + +impl ProviderTimeout +where + S: Service + Sync, +{ + /// Creates a new [`Timeout`] + pub const fn new(service: S, timeout: Duration) -> Self { + ProviderTimeout { service, timeout } + } +} + +impl Clone for ProviderTimeout +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + timeout: self.timeout, + } + } +} +impl Service for ProviderTimeout +where + S: Service + + Sync + + Send + + Clone + + 'static, + S::Future: Send, +{ + type Response = S::Response; + type Error = TransportError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.service.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + } + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let response = self.service.call(request); + let sleep = tokio::time::sleep(self.timeout); + ResponseFuture::new(response, sleep) + } +} + +#[pin_project] +#[derive(Debug)] +pub struct ResponseFuture { + #[pin] + response: T, + #[pin] + sleep: Sleep, +} + +impl ResponseFuture { + pub(crate) fn new(response: T, sleep: Sleep) -> Self { + ResponseFuture { response, sleep } + } +} + +impl Future for ResponseFuture +where + F: Future>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // First, try polling the future + match this.response.poll(cx) { + Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)), + Poll::Pending => {} + } + // Now check the sleep + match this.sleep.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(Err(TransportError::local_usage_str( + "provider request timeout from client side", + ))), + } + } +} diff --git a/crates/types/src/task/status_code.rs b/crates/types/src/task/status_code.rs index 610e0ccdd..fe9471764 100644 --- a/crates/types/src/task/status_code.rs +++ b/crates/types/src/task/status_code.rs @@ -24,6 +24,7 @@ pub enum RpcCode { Other, InvalidParams, DeadlineExceed, + ClientSideTimeout, MethodNotFound, AlreadyExist, PermissionDenied,