Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: impl timeout layer. #926

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ tower-http = { version = "0.6.2", features = ["cors"] }
tracing = "0.1.40"
strum = { version = "0.26.3", features = ["derive"] }
url = "2.5.2"
tiny_http = "0.12.0"
3 changes: 3 additions & 0 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ impl BuilderArgs {
let da_gas_tracking_enabled =
super::lint_da_gas_tracking(common.da_gas_tracking_enabled, &chain_spec);

let provider_client_timeout_seconds = common.provider_client_timeout_seconds;

Ok(BuilderTaskArgs {
entry_points,
chain_spec,
Expand All @@ -358,6 +360,7 @@ impl BuilderArgs {
max_replacement_underpriced_blocks: self.max_replacement_underpriced_blocks,
remote_address,
da_gas_tracking_enabled,
provider_client_timeout_seconds,
})
}

Expand Down
9 changes: 9 additions & 0 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ pub struct CommonArgs {
default_value = "false"
)]
pub da_gas_tracking_enabled: bool,

#[arg(
long = "provider_client_timeout_seconds",
name = "provider_client_timeout_seconds",
env = "PROVIDER_CLIENT_TIMEOUT_SECONDS",
default_value = "10"
)]
pub provider_client_timeout_seconds: u64,
}

const SIMULATION_GAS_OVERHEAD: u64 = 100_000;
Expand Down Expand Up @@ -594,6 +602,7 @@ pub fn construct_providers(
) -> anyhow::Result<impl Providers> {
let provider = Arc::new(rundler_provider::new_alloy_provider(
args.node_http.as_ref().context("must provide node_http")?,
args.provider_client_timeout_seconds,
)?);
let (da_gas_oracle, da_gas_oracle_sync) =
rundler_provider::new_alloy_da_gas_oracle(chain_spec, provider.clone());
Expand Down
9 changes: 7 additions & 2 deletions crates/builder/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,17 @@ impl TransactionSenderArgs {
self,
rpc_url: &str,
signer: S,
provider_client_timeout_seconds: u64,
) -> std::result::Result<TransactionSenderEnum<impl EvmProvider, S>, SenderConstructorErrors>
{
let provider = rundler_provider::new_alloy_evm_provider(rpc_url)?;
let provider =
rundler_provider::new_alloy_evm_provider(rpc_url, provider_client_timeout_seconds)?;
let sender = match self {
Self::Raw(args) => {
let submitter = rundler_provider::new_alloy_evm_provider(&args.submit_url)?;
let submitter = rundler_provider::new_alloy_evm_provider(
&args.submit_url,
provider_client_timeout_seconds,
)?;

if args.use_submit_for_status {
TransactionSenderEnum::Raw(RawTransactionSender::new(
Expand Down
12 changes: 7 additions & 5 deletions crates/builder/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct Args {
pub entry_points: Vec<EntryPointBuilderSettings>,
/// Enable DA tracking
pub da_gas_tracking_enabled: bool,
/// Provider client timeout
pub provider_client_timeout_seconds: u64,
}

/// Builder settings for an entrypoint
Expand Down Expand Up @@ -355,11 +357,11 @@ where
da_gas_tracking_enabled: self.args.da_gas_tracking_enabled,
};

let transaction_sender = self
.args
.sender_args
.clone()
.into_sender(&self.args.rpc_url, signer)?;
let transaction_sender = self.args.sender_args.clone().into_sender(
&self.args.rpc_url,
signer,
self.args.provider_client_timeout_seconds,
)?;

let tracker_settings = transaction_tracker::Settings {
replacement_fee_percent_increase: self.args.replacement_fee_percent_increase,
Expand Down
2 changes: 2 additions & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async-trait.workspace = true
auto_impl.workspace = true
const-hex.workspace = true
futures-util.workspace = true
pin-project.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand All @@ -51,4 +52,5 @@ alloy-node-bindings = "0.4.2"
alloy-provider = { workspace = true, features = ["debug-api", "anvil-node"] }
alloy-sol-macro.workspace = true
rundler-provider = { workspace = true, features = ["test-utils"] }
tiny_http.workspace = true
tokio.workspace = true
5 changes: 5 additions & 0 deletions crates/provider/src/alloy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ where
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
}
// for timeout error
alloy_json_rpc::RpcError::LocalUsageError(_) => {
method_logger.record_http(HttpCode::FourHundreds);
andysim3d marked this conversation as resolved.
Show resolved Hide resolved
method_logger.record_rpc(RpcCode::ClientSideTimeout);
}
_ => {}
}
}
Expand Down
61 changes: 58 additions & 3 deletions crates/provider/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::time::Duration;

use alloy_provider::{Provider as AlloyProvider, ProviderBuilder};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::layers::RetryBackoffService;
use alloy_transport_http::Http;
use anyhow::Context;
use evm::AlloyEvmProvider;
use metrics::{AlloyMetricLayer, AlloyMetricMiddleware};
use provider_timeout::{ProviderTimeout, ProviderTimeoutLayer};
use reqwest::Client;
use url::Url;

Expand All @@ -28,27 +31,79 @@ pub use da::new_alloy_da_gas_oracle;
pub(crate) mod entry_point;
pub(crate) mod evm;
pub(crate) mod metrics;
mod provider_timeout;

/// Create a new alloy evm provider from a given RPC URL
pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider + Clone> {
let provider = new_alloy_provider(rpc_url)?;
pub fn new_alloy_evm_provider(
rpc_url: &str,
provider_client_timeout_seconds: u64,
) -> anyhow::Result<impl EvmProvider + Clone> {
let provider = new_alloy_provider(rpc_url, provider_client_timeout_seconds)?;
Ok(AlloyEvmProvider::new(provider))
}

/// Create a new alloy provider from a given RPC URL
pub fn new_alloy_provider(
rpc_url: &str,
provider_client_timeout_seconds: u64,
) -> anyhow::Result<
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<Http<Client>>>> + Clone,
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<ProviderTimeout<Http<Client>>>>>
+ Clone,
> {
let url = Url::parse(rpc_url).context("invalid rpc url")?;
let metric_layer = AlloyMetricLayer::default();
// TODO: make this configurable: use a large number for CUPS for now
let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000);
// add a timeout layer here.
let timeout_layer =
ProviderTimeoutLayer::new(Duration::from_secs(provider_client_timeout_seconds));
let client = ClientBuilder::default()
.layer(retry_layer)
.layer(metric_layer)
.layer(timeout_layer)
.http(url);
let provider = ProviderBuilder::new().on_client(client);
Ok(provider)
}

#[cfg(test)]
mod tests {
use std::{
thread::{self, sleep},
time::Duration,
};

use alloy_provider::Provider;
use tiny_http::{Response, Server};

use crate::new_alloy_provider;
fn setup() {
let server = Server::http("0.0.0.0:8000").unwrap();
for request in server.incoming_requests() {
sleep(Duration::from_secs(10));
let _ = request.respond(Response::from_string(
"{\"jsonrpc\": \"2.0\", \"id\": 1, \"result\": \"0x146b6d7\"}",
));
}
}
#[tokio::test]
async fn test_timeout() {
thread::spawn(move || {
setup();
});
{
// Wait 11 seconds and get result
let provider = new_alloy_provider("http://localhost:8000", 15)
.expect("can not initialize provider");
let x = provider.get_block_number().await;
assert!(x.is_ok());
}
{
// Wait 9 seconds and timeout form client side
let provider = new_alloy_provider("http://localhost:8000", 5)
.expect("can not initialize provider");
let x = provider.get_block_number().await;
assert!(x.is_err());
}
}
}
Loading
Loading