Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection error/retry API and UI support #2902

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/python-tornado/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<perspective-viewer id="viewer"></perspective-viewer>

<script type="module">
import "/node_modules/@finos/perspective-viewer/dist/cdn/perspective-viewer.js";
import perspective from "/node_modules/@finos/perspective/dist/cdn/perspective.js";

/**
Expand Down
9 changes: 9 additions & 0 deletions examples/rust-axum/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
const socket = await perspective.websocket("/ws");
const table = await socket.open_table("my_data_source");
const viewer = document.getElementsByTagName("perspective-viewer")[0];

// socket.on_error(async (message, reconnect) => {
// console.warn(`${message}, reconnecting ...`);
// setTimeout(async () => {
// await reconnect();
// await viewer.invalidate();
// }, 5000);
// });

viewer.load(table);
viewer.restore({ settings: true, plugin_config: { edit_mode: "EDIT" } });
</script>
Expand Down
41 changes: 38 additions & 3 deletions rust/perspective-client/src/rust/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use async_lock::{Mutex, RwLock};
use futures::future::BoxFuture;
use futures::future::{join_all, BoxFuture, LocalBoxFuture};
use futures::Future;
use nanoid::*;
use prost::Message;
Expand Down Expand Up @@ -63,8 +63,11 @@ impl GetFeaturesResp {
}

type BoxFn<I, O> = Box<dyn Fn(I) -> O + Send + Sync + 'static>;
type Box2Fn<I, J, O> = Box<dyn Fn(I, J) -> O + Send + Sync + 'static>;

type Subscriptions<C> = Arc<RwLock<HashMap<u32, C>>>;
type OnErrorCallback =
Box2Fn<Option<String>, Option<ReconnectCallback>, BoxFuture<'static, Result<(), ClientError>>>;
type OnceCallback = Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>;
type SendCallback = Arc<
dyn for<'a> Fn(&'a Request) -> BoxFuture<'a, Result<(), Box<dyn Error + Send + Sync>>>
Expand All @@ -86,6 +89,7 @@ pub struct Client {
features: Arc<Mutex<Option<Features>>>,
send: SendCallback,
id_gen: Arc<AtomicU32>,
subscriptions_errors: Subscriptions<OnErrorCallback>,
subscriptions_once: Subscriptions<OnceCallback>,
subscriptions: Subscriptions<BoxFn<Response, BoxFuture<'static, Result<(), ClientError>>>>,
}
Expand All @@ -98,6 +102,15 @@ impl std::fmt::Debug for Client {
}
}

/// The type of the `reconnect` parameter passed to [`Client::handle_error`},
/// and to the callback closure of [`Client::on_error`].
///
/// Calling this function from a [`Client::on_error`] closure should run the
/// (implementation specific) client reconnect logic, e.g. rebindign a
/// websocket.
pub type ReconnectCallback =
Arc<dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync>;

impl Client {
/// Create a new client instance with a closure that handles message
/// dispatch. See [`Client::new`] for details.
Expand All @@ -119,9 +132,10 @@ impl Client {
Client {
features: Arc::default(),
id_gen: Arc::new(AtomicU32::new(1)),
subscriptions_once: Arc::default(),
subscriptions: Subscriptions::default(),
send,
subscriptions: Subscriptions::default(),
subscriptions_errors: Arc::default(),
subscriptions_once: Arc::default(),
}
}

Expand Down Expand Up @@ -160,6 +174,27 @@ impl Client {
Ok(false)
}

pub async fn handle_error(
&self,
message: Option<String>,
reconnect: Option<ReconnectCallback>,
) -> ClientResult<()> {
let subs = self.subscriptions_errors.read().await;
let reconnect: Option<ReconnectCallback> = reconnect.map(Arc::from);
let tasks = join_all(
subs.values()
.map(|callback| callback(message.clone(), reconnect.clone())),
);
tasks.await.into_iter().collect::<Result<(), _>>()?;
Ok(())
}

pub async fn on_error(&self, on_error: OnErrorCallback) -> ClientResult<u32> {
let id = self.gen_id();
self.subscriptions_errors.write().await.insert(id, on_error);
Ok(id)
}

pub async fn init(&self) -> ClientResult<()> {
let msg = Request {
msg_id: self.gen_id(),
Expand Down
2 changes: 1 addition & 1 deletion rust/perspective-client/src/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod config;
mod proto;
pub mod utils;

pub use crate::client::{Client, ClientHandler, Features, SystemInfo};
pub use crate::client::{Client, ClientHandler, Features, ReconnectCallback, SystemInfo};
pub use crate::proto::{ColumnType, SortOp, ViewOnUpdateResp};
pub use crate::session::{ProxySession, Session};
pub use crate::table::{
Expand Down
1 change: 1 addition & 0 deletions rust/perspective-js/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ base64 = "0.13.0"
chrono = "0.4"
extend = "1.1.2"
futures = "0.3.28"
derivative = "2.2.0"
getrandom = { version = "0.2", features = ["js"] }
js-intern = "0.3.1"
js-sys = "0.3.64"
Expand Down
157 changes: 149 additions & 8 deletions rust/perspective-js/src/rust/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use std::error::Error;
use std::future::Future;
use std::sync::Arc;

use derivative::Derivative;
use futures::channel::oneshot;
use futures::future::LocalBoxFuture;
use js_sys::{Function, Uint8Array};
use macro_rules_attribute::apply;
#[cfg(doc)]
use perspective_client::SystemInfo;
use perspective_client::{TableData, TableInitOptions};
use perspective_client::{ReconnectCallback, TableData, TableInitOptions};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::{future_to_promise, JsFuture};

pub use crate::table::*;
use crate::utils::{inherit_docs, ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
use crate::utils::{inherit_docs, ApiError, ApiResult, JsValueSerdeExt};

#[wasm_bindgen(typescript_custom_section)]
const TS_APPEND_CONTENT: &'static str = r#"
Expand All @@ -40,20 +48,66 @@ pub struct Client {
pub(crate) client: perspective_client::Client,
}

/// A wrapper around [`js_sys::Function`] to ease async integration for the
/// `reconnect` argument of [`Client::on_error`] callback.
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);

unsafe impl<I> Send for JsReconnect<I> {}
unsafe impl<I> Sync for JsReconnect<I> {}

impl<I> JsReconnect<I> {
fn run(&self, args: I) -> js_sys::Promise {
self.0(args)
}

fn run_all(
&self,
args: I,
) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + Sync {
let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
let p = self.0(args);
let _ = future_to_promise(async move {
let result = JsFuture::from(p)
.await
.map(|_| ())
.map_err(|x| format!("{:?}", x).into());

sender.send(result).unwrap();
Ok(JsValue::UNDEFINED)
});

async move { receiver.await.unwrap() }
}
}

impl<F, I> From<F> for JsReconnect<I>
where
F: Fn(I) -> js_sys::Promise + 'static,
{
fn from(value: F) -> Self {
JsReconnect(Arc::new(value))
}
}

#[wasm_bindgen]
impl Client {
#[wasm_bindgen(constructor)]
pub fn new(send_request: Function, close: Option<Function>) -> Self {
let send1 = send_request.clone();
let send_loop = LocalPollLoop::new(move |mut buff: Vec<u8>| {
let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(buff.as_mut_ptr(), buff.len()) };
send1.call1(&JsValue::UNDEFINED, &buff2)
let send_request = JsReconnect::from(move |buff2: Uint8Array| {
send_request
.call1(&JsValue::UNDEFINED, &buff2)
.unwrap()
.unchecked_into::<js_sys::Promise>()
});

let client = perspective_client::Client::new_with_callback(move |msg| {
let task = send_loop.poll(msg.to_vec());
let send_request = send_request.clone();
Box::pin(async move {
task.await;
let mut v = msg.to_vec();
let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
send_request.run_all(buff2).await?;
Ok(())
})
});
Expand All @@ -76,6 +130,93 @@ impl Client {
Ok(())
}

#[doc(hidden)]
#[wasm_bindgen]
pub async fn handle_error(
&self,
error: Option<String>,
reconnect: Option<Function>,
) -> ApiResult<()> {
self.client
.handle_error(
error,
reconnect.map(|reconnect| {
let reconnect =
JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
Ok(x) => x.unchecked_into::<js_sys::Promise>(),
Err(e) => {
// This error may occur when _invoking_ the function
tracing::warn!("{:?}", e);
js_sys::Promise::reject(&format!("C {:?}", e).into())
},
});

Arc::new(move || {
let fut = JsFuture::from(reconnect.run(()));
Box::pin(async move {
// This error may occur when _awaiting_ the promise returned by the
// function
if let Err(e) = fut.await {
if let Some(e) = e.dyn_ref::<js_sys::Object>() {
Err(e.to_string().as_string().unwrap().into())
} else {
Err(e.as_string().unwrap().into())
}
} else {
Ok(())
}
})
as LocalBoxFuture<'static, Result<(), Box<dyn Error>>>
}) as Arc<(dyn Fn() -> _ + Send + Sync)>
}),
)
.await?;

Ok(())
}

#[doc(hidden)]
#[wasm_bindgen]
pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
let callback = JsReconnect::from(
move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
let reconnect = reconnect.clone();
future_to_promise(async move {
if let Some(f) = reconnect {
f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
}

Ok(JsValue::UNDEFINED)
})
});

if let Err(e) = callback.call2(
&JsValue::UNDEFINED,
&JsValue::from(message),
&cl.into_js_value(),
) {
tracing::warn!("D {:?}", e);
}

js_sys::Promise::resolve(&JsValue::UNDEFINED)
},
);

let id = self
.client
.on_error(Box::new(move |message, reconnect| {
let callback = callback.clone();
Box::pin(async move {
let _promise = callback.run((message, reconnect));
Ok(())
})
}))
.await?;

Ok(id)
}

#[apply(inherit_docs)]
#[inherit_doc = "client/table.md"]
#[wasm_bindgen]
Expand Down
8 changes: 8 additions & 0 deletions rust/perspective-js/src/rust/utils/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ impl ApiError {
}
}

impl Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::error::Error for ApiError {}

/// A common Rust error handling idion (see e.g. `anyhow::Result`)
pub type ApiResult<T> = Result<T, ApiError>;

Expand Down
19 changes: 19 additions & 0 deletions rust/perspective-js/src/rust/utils/local_poll_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use wasm_bindgen_futures::spawn_local;

/// A useful abstraction for connecting `!Sync + !Send` callbacks (like
/// `js_sys::Function`) to `Send + Sync` contexts (like the client loop).
#[derive(Clone)]
pub struct LocalPollLoop<R: Send + Sync + Clone + 'static>(UnboundedSender<R>);

impl<R: Send + Sync + Clone + 'static> LocalPollLoop<R> {
Expand All @@ -36,6 +37,24 @@ impl<R: Send + Sync + Clone + 'static> LocalPollLoop<R> {
Self(emit)
}

/// Create a new loop which accepts a `R: Send + Sync` intermediate state
/// argument and calls the `!Send + !Sync` callback.
pub fn new_async<F: Fn(R) -> FUT + 'static, FUT: Future<Output = Result<JsValue, JsValue>>>(
send: F,
) -> Self {
let (emit, mut receive) = unbounded::<R>();
spawn_local(async move {
while let Some(resp) = receive.next().await {
let resp = send(resp).await;
if let Err(err) = resp {
web_sys::console::error_2(&"Failed to serialize".into(), &err);
}
}
});

Self(emit)
}

/// Send a new `R` to the poll loop.
pub fn poll(&self, msg: R) -> impl Future<Output = ()> + Send + Sync + 'static {
let mut emit = self.0.clone();
Expand Down
Loading
Loading