Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start_with_method_variants #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,43 @@ impl Methods {
}
}

#[derive(Default, Debug, Clone)]
/// Wraps an arbitrary number of [`Methods`] instances, pointing to the current [`Methods`] instance
pub struct MethodsPicker {
inner: Arc<Vec<Methods>>,
current: Methods,
}

impl From<Methods> for MethodsPicker {
fn from(m: Methods) -> Self {
Self { inner: Arc::new(vec![m.clone()]), current: m }
}
}

impl From<Vec<Methods>> for MethodsPicker {
fn from(v: Vec<Methods>) -> Self {
let current = if v.is_empty() { Methods::default() } else { v[0].clone() };
Self { inner: Arc::new(v), current }
}
}

impl MethodsPicker {
/// Instruct the picker which [`Methods`] instance to use for the current request
pub fn pick<F>(&mut self, f: F)
where
F: FnOnce(&[Methods]) -> &Methods,
{
let current = f(&self.inner);
self.current = current.clone();
}

/// Points to the currently picked [`Methods`] set.
/// Returns [`Methods::default()`] if the internal collection is empty.
pub fn current(&self) -> Methods {
self.current.clone()
}
}

impl<Context> Deref for RpcModule<Context> {
type Target = Methods;

Expand Down
112 changes: 112 additions & 0 deletions examples/examples/method_router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! This example sets a custom tower service middleware which picks a variant
//! of rpc methods depending on the uri path.
//!
//! It works with both `WebSocket` and `HTTP` which is done in the example.

use jsonrpsee::rpc_params;
use std::net::SocketAddr;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::server::{logger::Logger, RpcModule, ServerBuilder, TowerService};
use jsonrpsee::ws_client::WsClientBuilder;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let addr = run_server().await?;

// HTTP.
{
let client = HttpClientBuilder::default().build(format!("http://{}/v1", addr))?;
let response: String = client.request("say_hello", rpc_params![]).await?;
println!("[main]: http response: {:?}", response);
}
{
let client = HttpClientBuilder::default().build(format!("http://{}/v2", addr))?;
let response: String = client.request("say_hello", rpc_params![]).await?;
println!("[main]: http response: {:?}", response);
}
{
let client = HttpClientBuilder::default().build(format!("http://{}", addr))?;
let response = client.request::<String, _>("say_hello", rpc_params![]).await.expect_err("404");
println!("[main]: http response: {:}", response);
}

// WebSocket.
{
let client = WsClientBuilder::default().build(format!("ws://{}/v1", addr)).await?;
let response: String = client.request("say_hello", rpc_params![]).await?;
println!("[main]: ws response: {:?}", response);
}
{
let client = WsClientBuilder::default().build(format!("ws://{}/v2", addr)).await?;
let response: String = client.request("say_hello", rpc_params![]).await?;
println!("[main]: ws response: {:?}", response);
}
{
let error = WsClientBuilder::default().build(format!("ws://{}", addr)).await.expect_err("404");
println!("[main]: ws response: {:}", error);
}

Ok(())
}

/// Wraps the ultimate core service of the jsonrpsee server in order to access its RPC method picker.
struct MethodRouter<L: Logger>(TowerService<L>);

impl<L> tower::Service<hyper::Request<hyper::Body>> for MethodRouter<L>
where
L: Logger,
{
type Response = <TowerService<L> as hyper::service::Service<hyper::Request<hyper::Body>>>::Response;
type Error = <TowerService<L> as hyper::service::Service<hyper::Request<hyper::Body>>>::Error;
type Future = <TowerService<L> as hyper::service::Service<hyper::Request<hyper::Body>>>::Future;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
let idx = match req.uri().path() {
"/v1" => 0,
"/v2" => 1,
_ => return Box::pin(std::future::ready(Ok(jsonrpsee::server::http_response::not_found()))),
};

self.0.inner.methods.pick(|all_methods| &all_methods[idx]);
self.0.call(req)
}
}

struct MethodRouterLayer;

impl<L: Logger> tower::Layer<TowerService<L>> for MethodRouterLayer {
type Service = MethodRouter<L>;

fn layer(&self, inner: TowerService<L>) -> Self::Service {
MethodRouter(inner)
}
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let service_builder = tower::ServiceBuilder::new().layer(MethodRouterLayer);

let server =
ServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

let mut module_v1 = RpcModule::new(());
module_v1.register_method("say_hello", |_, _| Ok("lo v1")).unwrap();
let mut module_v2 = RpcModule::new(());
module_v2.register_method("say_hello", |_, _| Ok("lo v2")).unwrap();

// Serve different apis on different paths
let handle = server.start_with_methods_variants([module_v1.into(), module_v2.into()])?;

// In this example we don't care about doing shutdown so let's it run forever.
// You may use the `ServerHandle` to shut it down or manage it yourself.
tokio::spawn(handle.stopped());

Ok(addr)
}
3 changes: 3 additions & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ pub use jsonrpsee_core::{id_providers::*, traits::IdProvider};
pub use jsonrpsee_types as types;
pub use server::{Builder as ServerBuilder, Server};
pub use tracing;

pub use server::{ServiceData, TowerService};
pub use transport::http::response as http_response;
45 changes: 38 additions & 7 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
use jsonrpsee_core::server::helpers::MethodResponse;
use jsonrpsee_core::server::host_filtering::AllowHosts;
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::Methods;
use jsonrpsee_core::server::rpc_module::{Methods, MethodsPicker};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{http_helpers, Error, TEN_MB_SIZE_BYTES};

Expand Down Expand Up @@ -106,7 +106,7 @@ where
///
/// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is dropped.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let methods = methods.into().initialize_resources(&self.resources)?;
let methods = methods.into().initialize_resources(&self.resources)?.into();
let (stop_tx, stop_rx) = watch::channel(());

let stop_handle = StopHandle::new(stop_rx);
Expand All @@ -119,7 +119,36 @@ where
Ok(ServerHandle::new(stop_tx))
}

async fn start_inner(self, methods: Methods, stop_handle: StopHandle) {
/// Start responding to connections requests.
///
/// By default utilizes the first item in `methods` unless instructed otherwise via middleware.
/// See `examples/method_router` for more details.
///
/// Replies with [`Error::MethodNotFound`] if the `methods` collection is empty.
///
/// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is dropped.
pub fn start_with_methods_variants(
mut self,
methods: impl IntoIterator<Item = Methods>,
) -> Result<ServerHandle, Error> {
let methods = methods
.into_iter()
.map(|methods| methods.initialize_resources(&self.resources))
.collect::<Result<Vec<Methods>, Error>>()?
.into();
let (stop_tx, stop_rx) = watch::channel(());

let stop_handle = StopHandle::new(stop_rx);

match self.cfg.tokio_runtime.take() {
Some(rt) => rt.spawn(self.start_inner(methods, stop_handle)),
None => tokio::spawn(self.start_inner(methods, stop_handle)),
};

Ok(ServerHandle::new(stop_tx))
}

async fn start_inner(self, methods: MethodsPicker, stop_handle: StopHandle) {
let max_request_body_size = self.cfg.max_request_body_size;
let max_response_body_size = self.cfg.max_response_body_size;
let max_log_length = self.cfg.max_log_length;
Expand Down Expand Up @@ -535,11 +564,12 @@ impl MethodResult {

/// Data required by the server to handle requests.
#[derive(Debug, Clone)]
pub(crate) struct ServiceData<L: Logger> {
pub struct ServiceData<L: Logger> {
/// Remote server address.
pub(crate) remote_addr: SocketAddr,
/// Registered server methods.
pub(crate) methods: Methods,
/// FIXME making it public for the example only
pub methods: MethodsPicker,
/// Access control.
pub(crate) allow_hosts: AllowHosts,
/// Tracker for currently used resources on the server.
Expand Down Expand Up @@ -576,7 +606,8 @@ pub(crate) struct ServiceData<L: Logger> {
/// This is similar to [`hyper::service::service_fn`].
#[derive(Debug)]
pub struct TowerService<L: Logger> {
inner: ServiceData<L>,
/// FIXME making it public for the example only
pub inner: ServiceData<L>,
}

impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerService<L> {
Expand Down Expand Up @@ -647,7 +678,7 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
} else {
// The request wasn't an upgrade request; let's treat it as a standard HTTP request:
let data = http::HandleRequest {
methods: self.inner.methods.clone(),
methods: self.inner.methods.current(),
resources: self.inner.resources.clone(),
max_request_body_size: self.inner.max_request_body_size,
max_response_body_size: self.inner.max_response_body_size,
Expand Down
8 changes: 7 additions & 1 deletion server/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ pub(crate) async fn handle_request<L: Logger>(
res
}

pub(crate) mod response {
/// FIXME making it public for the example only
pub mod response {
use jsonrpsee_types::error::reject_too_big_request;
use jsonrpsee_types::error::{ErrorCode, ErrorResponse};
use jsonrpsee_types::Id;
Expand Down Expand Up @@ -420,4 +421,9 @@ pub(crate) mod response {
TEXT,
)
}

/// 404
pub fn not_found() -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::NOT_FOUND, hyper::StatusCode::NOT_FOUND.to_string(), TEXT)
}
}
2 changes: 2 additions & 0 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ pub(crate) async fn background_task<L: Logger>(
..
} = svc;

let methods = methods.current();

let (tx, rx) = mpsc::unbounded::<String>();
let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection);
let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length);
Expand Down