From eb7b78287221105cc3f61a9c4217fb0461c04ce8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 Nov 2023 12:33:57 +0100 Subject: [PATCH 1/4] Make `PlatformRef::connect_*` instantaneously return --- full-node/src/network_service/tasks.rs | 28 +- lib/src/libp2p/with_buffers.rs | 212 ++++++++------- light-base/src/network_service.rs | 22 +- light-base/src/network_service/tasks.rs | 47 +--- light-base/src/platform.rs | 23 +- light-base/src/platform/default.rs | 45 ++-- wasm-node/javascript/src/internals/client.ts | 71 +++-- .../src/internals/local-instance.ts | 40 ++- .../src/internals/remote-instance.ts | 18 +- .../src/no-auto-bytecode-browser.ts | 123 ++------- .../javascript/src/no-auto-bytecode-deno.ts | 4 +- .../javascript/src/no-auto-bytecode-nodejs.ts | 7 +- wasm-node/rust/src/bindings.rs | 68 ++--- wasm-node/rust/src/platform.rs | 254 ++++++++---------- 14 files changed, 396 insertions(+), 566 deletions(-) diff --git a/full-node/src/network_service/tasks.rs b/full-node/src/network_service/tasks.rs index 883d254619..96c740e7f1 100644 --- a/full-node/src/network_service/tasks.rs +++ b/full-node/src/network_service/tasks.rs @@ -52,32 +52,8 @@ pub(super) async fn connection_task( mut coordinator_to_connection: channel::Receiver, connection_to_coordinator: channel::Sender, ) { - // Finishing ongoing connection process. - let socket = match socket.await.map_err(|_| ()) { - Ok(s) => s, - Err(_err) => { - // TODO: log - connection_task.reset(); - loop { - let (task_update, opaque_message) = connection_task.pull_message_to_coordinator(); - let _ = connection_to_coordinator - .send(super::ToBackground::FromConnectionTask { - connection_id, - opaque_message, - connection_now_dead: true, - }) - .await; - if let Some(task_update) = task_update { - connection_task = task_update; - } else { - return; - } - } - } - }; - - // The socket is wrapped around an object containing a read buffer and a write buffer and - // allowing easier usage. + // The socket future is wrapped around an object containing a read buffer and a write buffer + // and allowing easier usage. let mut socket = pin::pin!(with_buffers::WithBuffers::new(socket)); // Future that sends a message to the coordinator. Only one message is sent to the coordinator diff --git a/lib/src/libp2p/with_buffers.rs b/lib/src/libp2p/with_buffers.rs index 7b3c762800..3d48c78890 100644 --- a/lib/src/libp2p/with_buffers.rs +++ b/lib/src/libp2p/with_buffers.rs @@ -38,10 +38,10 @@ use std::io; /// Holds an implementation of `AsyncRead` and `AsyncWrite`, alongside with a read buffer and a /// write buffer. #[pin_project::pin_project] -pub struct WithBuffers { +pub struct WithBuffers { /// Actual socket to read from/write to. #[pin] - socket: TSocket, + socket: Socket, /// Error that has happened on the socket, if any. error: Option, /// Storage for data read from the socket. The first [`WithBuffers::read_buffer_valid`] bytes @@ -73,18 +73,22 @@ pub struct WithBuffers { read_write_wake_up_after: Option, } -impl WithBuffers +#[pin_project::pin_project(project = SocketProj)] +enum Socket { + Pending(#[pin] TSocketFut), + Resolved(#[pin] TSocket), +} + +impl WithBuffers where TNow: Clone + Ord, { - /// Initializes a new [`WithBuffers`] with the given socket. - /// - /// The socket must still be open in both directions. - pub fn new(socket: TSocket) -> Self { + /// Initializes a new [`WithBuffers`] with the given socket-yielding future. + pub fn new(socket: TSocketFut) -> Self { let read_buffer_reasonable_capacity = 65536; // TODO: make configurable? WithBuffers { - socket, + socket: Socket::Pending(socket), error: None, read_buffer: Vec::with_capacity(read_buffer_reasonable_capacity), read_buffer_valid: 0, @@ -123,6 +127,8 @@ where this.read_buffer.truncate(*this.read_buffer_valid); + let is_resolved = matches!(*this.socket, Socket::Resolved(_)); + let write_bytes_queued = this.write_buffers.iter().map(Vec::len).sum(); Ok(ReadWriteAccess { @@ -135,7 +141,9 @@ where read_bytes: 0, write_bytes_queued, write_buffers: mem::take(this.write_buffers), - write_bytes_queueable: if !*this.write_closed { + write_bytes_queueable: if !is_resolved { + Some(0) + } else if !*this.write_closed { // Limit outgoing buffer size to 128kiB. // TODO: make configurable? Some((128 * 1024usize).saturating_sub(write_bytes_queued)) @@ -155,9 +163,10 @@ where } } -impl WithBuffers +impl WithBuffers where TSocket: AsyncRead + AsyncWrite, + TSocketFut: future::Future>, TNow: Clone + Ord, { /// Waits until [`WithBuffers::read_write_access`] should be called again. @@ -213,100 +222,115 @@ where } } - if !*this.read_closed { - let read_result = AsyncRead::poll_read( - this.socket.as_mut(), - cx, - &mut this.read_buffer[*this.read_buffer_valid..], - ); - - match read_result { + match this.socket.as_mut().project() { + SocketProj::Pending(future) => match future::Future::poll(future, cx) { Poll::Pending => {} - Poll::Ready(Ok(0)) => { - *this.read_closed = true; - pending = false; - } - Poll::Ready(Ok(n)) => { - *this.read_buffer_valid += n; - // TODO: consider waking up only if the expected bytes of the consumer are exceeded + Poll::Ready(Ok(socket)) => { + this.socket.set(Socket::Resolved(socket)); pending = false; } Poll::Ready(Err(err)) => { *this.error = Some(err); return Poll::Ready(()); } - }; - } - - loop { - if this.write_buffers.iter().any(|b| !b.is_empty()) { - let write_result = { - let buffers = this - .write_buffers - .iter() - .map(|buf| io::IoSlice::new(buf)) - .collect::>(); - AsyncWrite::poll_write_vectored(this.socket.as_mut(), cx, &buffers) - }; - - match write_result { - Poll::Ready(Ok(0)) => { - // It is not legal for `poll_write` to return 0 bytes written. - unreachable!(); - } - Poll::Ready(Ok(mut n)) => { - *this.flush_pending = true; - while n > 0 { - let first_buf = this.write_buffers.first_mut().unwrap(); - if first_buf.len() <= n { - n -= first_buf.len(); - this.write_buffers.remove(0); - } else { - // TODO: consider keeping the buffer as is but starting the next write at a later offset - first_buf.copy_within(n.., 0); - first_buf.truncate(first_buf.len() - n); - break; - } + }, + SocketProj::Resolved(mut socket) => { + if !*this.read_closed { + let read_result = AsyncRead::poll_read( + socket.as_mut(), + cx, + &mut this.read_buffer[*this.read_buffer_valid..], + ); + + match read_result { + Poll::Pending => {} + Poll::Ready(Ok(0)) => { + *this.read_closed = true; + pending = false; } - // Wake up if the write buffers switch from non-empty to empty. - if this.write_buffers.is_empty() { + Poll::Ready(Ok(n)) => { + *this.read_buffer_valid += n; + // TODO: consider waking up only if the expected bytes of the consumer are exceeded pending = false; } - } - Poll::Ready(Err(err)) => { - *this.error = Some(err); - return Poll::Ready(()); - } - Poll::Pending => break, - }; - } else if *this.flush_pending { - match AsyncWrite::poll_flush(this.socket.as_mut(), cx) { - Poll::Ready(Ok(())) => { - *this.flush_pending = false; - } - Poll::Ready(Err(err)) => { - *this.error = Some(err); - return Poll::Ready(()); - } - Poll::Pending => break, + Poll::Ready(Err(err)) => { + *this.error = Some(err); + return Poll::Ready(()); + } + }; } - } else if *this.close_pending { - match AsyncWrite::poll_close(this.socket.as_mut(), cx) { - Poll::Ready(Ok(())) => { - *this.close_pending = false; - pending = false; + + loop { + if this.write_buffers.iter().any(|b| !b.is_empty()) { + let write_result = { + let buffers = this + .write_buffers + .iter() + .map(|buf| io::IoSlice::new(buf)) + .collect::>(); + AsyncWrite::poll_write_vectored(socket.as_mut(), cx, &buffers) + }; + + match write_result { + Poll::Ready(Ok(0)) => { + // It is not legal for `poll_write` to return 0 bytes written. + unreachable!(); + } + Poll::Ready(Ok(mut n)) => { + *this.flush_pending = true; + while n > 0 { + let first_buf = this.write_buffers.first_mut().unwrap(); + if first_buf.len() <= n { + n -= first_buf.len(); + this.write_buffers.remove(0); + } else { + // TODO: consider keeping the buffer as is but starting the next write at a later offset + first_buf.copy_within(n.., 0); + first_buf.truncate(first_buf.len() - n); + break; + } + } + // Wake up if the write buffers switch from non-empty to empty. + if this.write_buffers.is_empty() { + pending = false; + } + } + Poll::Ready(Err(err)) => { + *this.error = Some(err); + return Poll::Ready(()); + } + Poll::Pending => break, + }; + } else if *this.flush_pending { + match AsyncWrite::poll_flush(socket.as_mut(), cx) { + Poll::Ready(Ok(())) => { + *this.flush_pending = false; + } + Poll::Ready(Err(err)) => { + *this.error = Some(err); + return Poll::Ready(()); + } + Poll::Pending => break, + } + } else if *this.close_pending { + match AsyncWrite::poll_close(socket.as_mut(), cx) { + Poll::Ready(Ok(())) => { + *this.close_pending = false; + pending = false; + break; + } + Poll::Ready(Err(err)) => { + *this.error = Some(err); + return Poll::Ready(()); + } + Poll::Pending => break, + } + } else { break; } - Poll::Ready(Err(err)) => { - *this.error = Some(err); - return Poll::Ready(()); - } - Poll::Pending => break, } - } else { - break; } - } + }; if !pending { Poll::Ready(()) @@ -318,9 +342,15 @@ where } } -impl fmt::Debug for WithBuffers { +impl fmt::Debug for WithBuffers { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("WithBuffers").field(&self.socket).finish() + let mut t = f.debug_tuple("WithBuffers"); + if let Socket::Resolved(socket) = &self.socket { + t.field(socket); + } else { + t.field(&""); + } + t.finish() } } diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 870dadc73a..61f4b97031 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -1845,7 +1845,11 @@ async fn background_task(mut task: BackgroundTask) { let task_name = format!("connection-{}-{}", peer_id, multiaddr); let connection_id = match address { - address_parse::AddressOrMultiStreamAddress::Address(_) => { + address_parse::AddressOrMultiStreamAddress::Address(address) => { + // As documented in the `PlatformRef` trait, `connect_stream` must + // return as soon as possible. + let connection = task.platform.connect_stream(address).await; + let (connection_id, connection_task) = task.network.add_single_stream_connection( task.platform.now(), @@ -1860,7 +1864,8 @@ async fn background_task(mut task: BackgroundTask) { task.platform.spawn_task( task_name.into(), tasks::single_stream_connection_task::( - multiaddr, + connection, + multiaddr.to_string(), task.platform.clone(), connection_id, connection_task, @@ -1878,12 +1883,10 @@ async fn background_task(mut task: BackgroundTask) { remote_certificate_sha256, }, ) => { - // TODO: we unfortunately need to know the local TLS certificate in order to - // insert the connection, and this local TLS certificate can only be given - // to us by the platform implementation, leading to this `await` here which - // really shouldn't exist. For the moment it's fine because the only implementations - // of multistream connections returns very quickly, but in theory this `await` - // could block for a long time. + // We need to know the local TLS certificate in order to insert the + // connection, and as such we need to call `connect_multistream` here. + // As documented in the `PlatformRef` trait, `connect_multistream` must + // return as soon as possible. let connection = task .platform .connect_multistream(platform::MultiStreamAddress::WebRtc { @@ -1891,8 +1894,7 @@ async fn background_task(mut task: BackgroundTask) { port, remote_certificate_sha256, }) - .await - .unwrap_or_else(|_| unreachable!()); // TODO: don't unwrap, again we know that the only implementation that exists never unwraps here, but in theory it's possible + .await; // Convert the SHA256 hashes into multihashes. let local_tls_certificate_multihash = [12u8, 32] diff --git a/light-base/src/network_service/tasks.rs b/light-base/src/network_service/tasks.rs index 63041a3194..aaa25eea80 100644 --- a/light-base/src/network_service/tasks.rs +++ b/light-base/src/network_service/tasks.rs @@ -16,61 +16,28 @@ // along with this program. If not, see . use super::ToBackground; -use crate::platform::{address_parse, PlatformRef, SubstreamDirection}; +use crate::platform::{PlatformRef, SubstreamDirection}; -use alloc::{ - boxed::Box, - string::{String, ToString as _}, -}; +use alloc::{boxed::Box, string::String}; use core::{pin, time::Duration}; use futures_lite::FutureExt as _; use futures_util::{future, stream::FuturesUnordered, StreamExt as _}; -use smoldot::{ - libp2p::{collection::SubstreamFate, Multiaddr}, - network::service, -}; +use smoldot::{libp2p::collection::SubstreamFate, network::service}; /// Asynchronous task managing a specific single-stream connection. pub(super) async fn single_stream_connection_task( - address: Multiaddr, + mut connection: TPlat::Stream, + address_string: String, platform: TPlat, connection_id: service::ConnectionId, mut connection_task: service::SingleStreamConnectionTask, coordinator_to_connection: async_channel::Receiver, connection_to_coordinator: async_channel::Sender, ) { - let address_string = address.to_string(); - let Ok(address_parse::AddressOrMultiStreamAddress::Address(address)) = - address_parse::multiaddr_to_address(&address) - else { - unreachable!() - }; // We need to pin the receiver, as the type doesn't implement `Unpin`. let mut coordinator_to_connection = pin::pin!(coordinator_to_connection); - - let mut socket = pin::pin!(match platform.connect_stream(address).await { - Ok(s) => s, - Err(err) => { - log::trace!(target: "connections", "Connection({address_string}) => Reset({:?})", err.message); - connection_task.reset(); - loop { - let (task_update, message) = connection_task.pull_message_to_coordinator(); - if let Some(message) = message { - let _ = connection_to_coordinator - .send(super::ToBackground::ConnectionMessage { - connection_id, - message, - }) - .await; - } - if let Some(task_update) = task_update { - connection_task = task_update; - } else { - return; - } - } - } - }); + // We also need to pin the socket, as we don't know whether it implements `Unpin`. + let mut socket = pin::pin!(connection); // Future that sends a message to the coordinator. Only one message is sent to the coordinator // at a time. `None` if no message is being sent. diff --git a/light-base/src/platform.rs b/light-base/src/platform.rs index 009d250b1e..96f7f0dd7a 100644 --- a/light-base/src/platform.rs +++ b/light-base/src/platform.rs @@ -15,7 +15,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use alloc::{borrow::Cow, string::String}; +use alloc::borrow::Cow; use core::{fmt, future::Future, ops, pin::Pin, str, time::Duration}; use futures_util::future; @@ -59,8 +59,8 @@ pub trait PlatformRef: Clone + Send + Sync + 'static { /// should be abruptly dropped (i.e. RST) as well, unless its reading and writing sides /// have been gracefully closed in the past. type Stream: Send + 'static; - type StreamConnectFuture: Future> + Send + 'static; - type MultiStreamConnectFuture: Future, ConnectError>> + type StreamConnectFuture: Future + Send + 'static; + type MultiStreamConnectFuture: Future> + Send + 'static; type ReadWriteAccess<'a>: ops::DerefMut> + 'a; @@ -126,6 +126,11 @@ pub trait PlatformRef: Clone + Send + Sync + 'static { /// Starts a connection attempt to the given multiaddress. /// + /// This function returns a `Future`. This `Future` **must** return as soon as possible, and + /// must **not** wait for the connection to be established. + /// In the most scenarios, the `Future` returned by this function should immediately produce + /// an output. + /// /// # Panic /// /// The function implementation panics if [`Address`] is of a type for which @@ -135,6 +140,11 @@ pub trait PlatformRef: Clone + Send + Sync + 'static { /// Starts a connection attempt to the given multiaddress. /// + /// This function returns a `Future`. This `Future` **must** return as soon as possible, and + /// must **not** wait for the connection to be established. + /// In the most scenarios, the `Future` returned by this function should immediately produce + /// an output. + /// /// # Panic /// /// The function implementation panics if [`MultiStreamAddress`] is of a type for which @@ -383,10 +393,3 @@ pub enum IpAddr { V4([u8; 4]), V6([u8; 16]), } - -/// Error potentially returned by [`PlatformRef::connect_stream`] or -/// [`PlatformRef::connect_multistream`]. -pub struct ConnectError { - /// Human-readable error message. - pub message: String, -} diff --git a/light-base/src/platform/default.rs b/light-base/src/platform/default.rs index f94937e7d3..a9f8787833 100644 --- a/light-base/src/platform/default.rs +++ b/light-base/src/platform/default.rs @@ -19,7 +19,7 @@ #![cfg_attr(docsrs, doc(cfg(feature = "std")))] use super::{ - with_buffers, Address, ConnectError, ConnectionType, IpAddr, MultiStreamAddress, + with_buffers, Address, ConnectionType, IpAddr, MultiStreamAddress, MultiStreamWebRtcConnection, PlatformRef, SubstreamDirection, }; @@ -85,11 +85,8 @@ impl PlatformRef for Arc { type Instant = Instant; type MultiStream = std::convert::Infallible; // TODO: replace with `!` once stable: https://github.com/rust-lang/rust/issues/35121 type Stream = Stream; - type StreamConnectFuture = future::BoxFuture<'static, Result>; - type MultiStreamConnectFuture = future::BoxFuture< - 'static, - Result, ConnectError>, - >; + type StreamConnectFuture = future::Ready; + type MultiStreamConnectFuture = future::Pending>; type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>; type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>; type StreamErrorRef<'a> = &'a io::Error; @@ -189,7 +186,7 @@ impl PlatformRef for Arc { _ => unreachable!(), }; - Box::pin(async move { + let socket_future = async { let tcp_socket = match tcp_socket_addr { either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await, either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await, @@ -199,28 +196,25 @@ impl PlatformRef for Arc { let _ = tcp_socket.set_nodelay(true); } - let socket: TcpOrWs = match (tcp_socket, host_if_websocket) { - (Ok(tcp_socket), Some(host)) => future::Either::Right( + match (tcp_socket, host_if_websocket) { + (Ok(tcp_socket), Some(host)) => { websocket::websocket_client_handshake(websocket::Config { tcp_socket, host: &host, url: "/", }) .await - .map_err(|err| ConnectError { - message: format!("Failed to negotiate WebSocket: {err}"), - })?, - ), - (Ok(tcp_socket), None) => future::Either::Left(tcp_socket), - (Err(err), _) => { - return Err(ConnectError { - message: format!("Failed to reach peer: {err}"), - }) + .map(TcpOrWs::Right) } - }; - Ok(Stream(with_buffers::WithBuffers::new(socket))) - }) + (Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)), + (Err(err), _) => Err(err), + } + }; + + future::ready(Stream(with_buffers::WithBuffers::new(Box::pin( + socket_future, + )))) } fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture { @@ -266,6 +260,13 @@ impl Drop for DefaultPlatform { /// Implementation detail of [`DefaultPlatform`]. #[pin_project::pin_project] -pub struct Stream(#[pin] with_buffers::WithBuffers); +pub struct Stream( + #[pin] + with_buffers::WithBuffers< + future::BoxFuture<'static, Result>, + TcpOrWs, + Instant, + >, +); type TcpOrWs = future::Either>; diff --git a/wasm-node/javascript/src/internals/client.ts b/wasm-node/javascript/src/internals/client.ts index 4775d5a43b..40c9d643e2 100644 --- a/wasm-node/javascript/src/internals/client.ts +++ b/wasm-node/javascript/src/internals/client.ts @@ -25,6 +25,9 @@ import * as remote from './remote-instance.js'; export interface PlatformBindings { /** * Tries to open a new connection using the given configuration. + * + * In case of a multistream connection, `onMultistreamHandshakeInfo` should be called as soon + * as possible. * * @see Connection */ @@ -44,19 +47,14 @@ export interface PlatformBindings { /** * Connection to a remote node. * - * At any time, a connection can be in one of the three following states: + * At any time, a connection can be in one of the following states: * - * - `Opening` (initial state) - * - `Open` + * - `Open` (initial state) * - `Reset` * - * When in the `Opening` or `Open` state, the connection can transition to the `Reset` state - * if the remote closes the connection or refuses the connection altogether. When that - * happens, `config.onReset` is called. Once in the `Reset` state, the connection cannot - * transition back to another state. - * - * Initially in the `Opening` state, the connection can transition to the `Open` state if the - * remote accepts the connection. When that happens, `config.onOpen` is called. + * When in the `Open` state, the connection can transition to the `Reset` state if the remote + * closes the connection or refuses the connection altogether. When that happens, `config.onReset` + * is called. Once in the `Reset` state, the connection cannot transition back to `Open`. * * When in the `Open` state, the connection can receive messages. When a message is received, * `config.onMessage` is called. @@ -139,17 +137,16 @@ export interface ConnectionConfig { address: instance.ParsedMultiaddr, /** - * Callback called when the connection transitions from the `Opening` to the `Open` state. + * Callback called when a multistream connection knows information about its handshake. Should + * be called as soon as possible. + * + * Can only happen while the connection is in the `Open` state. * * Must only be called once per connection. */ - onOpen: (info: - { - type: 'single-stream', handshake: 'multistream-select-noise-yamux', - initialWritableBytes: number - } | + onMultistreamHandshakeInfo: (info: { - type: 'multi-stream', handshake: 'webrtc', + handshake: 'webrtc', localTlsCertificateSha256: Uint8Array, remoteTlsCertificateSha256: Uint8Array, } @@ -167,7 +164,7 @@ export interface ConnectionConfig { * * This function must only be called for connections of type "multi-stream". */ - onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound', initialWritableBytes: number) => void; + onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound') => void; /** * Callback called when a stream transitions to the `Reset` state. @@ -183,14 +180,14 @@ export interface ConnectionConfig { * written on the stream, meaning that some buffer space is now free. * * Can only happen while the connection is in the `Open` state. - * * This callback must not be called after `closeSend` has been called. + * + * The total of writable bytes must not go beyond reasonable values (e.g. a few megabytes). It + * is not legal to provide a dummy implementation that simply passes an exceedingly large + * value. * * The `streamId` parameter must be provided if and only if the connection is of type * "multi-stream". - * - * Only a number of bytes equal to the size of the data provided to {@link Connection.send} - * must be reported. In other words, the `initialWritableBytes` must never be exceeded. */ onWritableBytes: (numExtra: number, streamId?: number) => void; @@ -264,7 +261,7 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom chainIds: new WeakMap(), connections: new Map(), addChainResults: [], - onExecutorShutdownOrWasmPanic: () => {}, + onExecutorShutdownOrWasmPanic: () => { }, chains: new Map(), }; @@ -302,13 +299,13 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom state.chains.clear(); const cb = state.onExecutorShutdownOrWasmPanic; - state.onExecutorShutdownOrWasmPanic = () => {}; + state.onExecutorShutdownOrWasmPanic = () => { }; cb(); break } case "executor-shutdown": { const cb = state.onExecutorShutdownOrWasmPanic; - state.onExecutorShutdownOrWasmPanic = () => {}; + state.onExecutorShutdownOrWasmPanic = () => { }; cb(); break; } @@ -343,15 +340,15 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom throw new Error(); state.instance.instance.streamMessage(connectionId, message, streamId); }, - onStreamOpened(streamId, direction, initialWritableBytes) { + onStreamOpened(streamId, direction) { if (state.instance.status !== "ready") throw new Error(); - state.instance.instance.streamOpened(connectionId, streamId, direction, initialWritableBytes); + state.instance.instance.streamOpened(connectionId, streamId, direction); }, - onOpen(info) { + onMultistreamHandshakeInfo(info) { if (state.instance.status !== "ready") throw new Error(); - state.instance.instance.connectionOpened(connectionId, info); + state.instance.instance.connectionMultiStreamSetHandshakeInfo(connectionId, info); }, onWritableBytes(numExtra, streamId) { if (state.instance.status !== "ready") @@ -441,14 +438,14 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom portToServer: portToWorker, eventCallback }).then((instance) => { - // The Wasm instance might have been crashed before this callback is called. - if (state.instance.status === "destroyed") - return; - state.instance = { - status: "ready", - instance, - }; - }) + // The Wasm instance might have been crashed before this callback is called. + if (state.instance.status === "destroyed") + return; + state.instance = { + status: "ready", + instance, + }; + }) }; } diff --git a/wasm-node/javascript/src/internals/local-instance.ts b/wasm-node/javascript/src/internals/local-instance.ts index cffdf707a6..94a675ed34 100644 --- a/wasm-node/javascript/src/internals/local-instance.ts +++ b/wasm-node/javascript/src/internals/local-instance.ts @@ -91,11 +91,11 @@ export interface Instance { * all connections. */ shutdownExecutor: () => void, - connectionOpened: (connectionId: number, info: { type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: number } | { type: 'multi-stream', handshake: 'webrtc', localTlsCertificateSha256: Uint8Array, remoteTlsCertificateSha256: Uint8Array }) => void, + connectionMultiStreamSetHandshakeInfo: (connectionId: number, info: { handshake: 'webrtc', localTlsCertificateSha256: Uint8Array, remoteTlsCertificateSha256: Uint8Array }) => void, connectionReset: (connectionId: number, message: string) => void, streamWritableBytes: (connectionId: number, numExtra: number, streamId?: number) => void, streamMessage: (connectionId: number, message: Uint8Array, streamId?: number) => void, - streamOpened: (connectionId: number, streamId: number, direction: 'inbound' | 'outbound', initialWritableBytes: number) => void, + streamOpened: (connectionId: number, streamId: number, direction: 'inbound' | 'outbound') => void, streamReset: (connectionId: number, streamId: number) => void, } @@ -546,25 +546,17 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly cb(); }, - connectionOpened: (connectionId: number, info: { type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: number } | { type: 'multi-stream', handshake: 'webrtc', localTlsCertificateSha256: Uint8Array, remoteTlsCertificateSha256: Uint8Array }) => { + connectionMultiStreamSetHandshakeInfo: (connectionId: number, info: { handshake: 'webrtc', localTlsCertificateSha256: Uint8Array, remoteTlsCertificateSha256: Uint8Array }) => { if (!state.instance) return; - switch (info.type) { - case 'single-stream': { - state.instance.exports.connection_open_single_stream(connectionId, info.initialWritableBytes); - break - } - case 'multi-stream': { - const handshakeTy = new Uint8Array(1 + info.localTlsCertificateSha256.length + info.remoteTlsCertificateSha256.length); - buffer.writeUInt8(handshakeTy, 0, 0); - handshakeTy.set(info.localTlsCertificateSha256, 1) - handshakeTy.set(info.remoteTlsCertificateSha256, 1 + info.localTlsCertificateSha256.length) - state.bufferIndices[0] = handshakeTy; - state.instance.exports.connection_open_multi_stream(connectionId, 0); - delete state.bufferIndices[0] - break - } - } + + const handshakeTy = new Uint8Array(1 + info.localTlsCertificateSha256.length + info.remoteTlsCertificateSha256.length); + buffer.writeUInt8(handshakeTy, 0, 0); + handshakeTy.set(info.localTlsCertificateSha256, 1) + handshakeTy.set(info.remoteTlsCertificateSha256, 1 + info.localTlsCertificateSha256.length) + state.bufferIndices[0] = handshakeTy; + state.instance.exports.connection_multi_stream_set_handshake_info(connectionId, 0); + delete state.bufferIndices[0] }, connectionReset: (connectionId: number, message: string) => { @@ -593,14 +585,13 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly delete state.bufferIndices[0] }, - streamOpened: (connectionId: number, streamId: number, direction: 'inbound' | 'outbound', initialWritableBytes: number) => { + streamOpened: (connectionId: number, streamId: number, direction: 'inbound' | 'outbound') => { if (!state.instance) return; state.instance.exports.connection_stream_opened( connectionId, streamId, - direction === 'outbound' ? 1 : 0, - initialWritableBytes + direction === 'outbound' ? 1 : 0 ); }, @@ -631,11 +622,10 @@ interface SmoldotWasmExports extends WebAssembly.Exports { json_rpc_responses_peek: (chainId: number) => number, json_rpc_responses_pop: (chainId: number) => void, timer_finished: () => void, - connection_open_single_stream: (connectionId: number, initialWritableBytes: number) => void, - connection_open_multi_stream: (connectionId: number, handshakeTyBufferIndex: number) => void, + connection_multi_stream_set_handshake_info: (connectionId: number, handshakeTyBufferIndex: number) => void, stream_writable_bytes: (connectionId: number, streamId: number, numBytes: number) => void, stream_message: (connectionId: number, streamId: number, bufferIndex: number) => void, - connection_stream_opened: (connectionId: number, streamId: number, outbound: number, initialWritableBytes: number) => void, + connection_stream_opened: (connectionId: number, streamId: number, outbound: number) => void, connection_reset: (connectionId: number, bufferIndex: number) => void, stream_reset: (connectionId: number, streamId: number) => void, } diff --git a/wasm-node/javascript/src/internals/remote-instance.ts b/wasm-node/javascript/src/internals/remote-instance.ts index 11586c4fd8..1d0c0b8ffd 100644 --- a/wasm-node/javascript/src/internals/remote-instance.ts +++ b/wasm-node/javascript/src/internals/remote-instance.ts @@ -195,8 +195,8 @@ export async function connectToInstanceServer(config: ConnectConfig): Promise { - config.onOpen({ - type: 'single-stream', handshake: 'multistream-select-noise-yamux', - initialWritableBytes: 1024 * 1024 - }); + config.onWritableBytes(1024 * 1024); }; connection.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); @@ -191,10 +188,6 @@ function connect(config: ConnectionConfig): Connection { let pc: RTCPeerConnection | null | undefined = undefined; // Contains the data channels that are open and have been reported to smoldot. const dataChannels = new Map(); - // For various reasons explained below, we open a data channel in advance without reporting it - // to smoldot. This data channel is stored in this variable. Once it is reported to smoldot, - // it is inserted in `dataChannels`. - let handshakeDataChannel: RTCDataChannel | undefined; // SHA256 hash of the DTLS certificate of the local node. Unknown as long as it hasn't been // generated. // TODO: could be merged with `pc` in one variable, and maybe even the other fields as well @@ -207,7 +200,7 @@ function connect(config: ConnectionConfig): Connection { // The `RTCPeerConnection` is created pretty quickly. It is however still possible for // smoldot to cancel the opening, in which case `pc` will still be undefined. if (!pc) { - console.assert(dataChannels.size === 0 && !handshakeDataChannel, "substreams exist while pc is undef") + console.assert(dataChannels.size === 0, "substreams exist while pc is undef") pc = null; return } @@ -224,72 +217,31 @@ function connect(config: ConnectionConfig): Connection { channel.channel.onmessage = null; } dataChannels.clear(); - if (handshakeDataChannel) { - handshakeDataChannel.onopen = null; - handshakeDataChannel.onerror = null; - handshakeDataChannel.onclose = null; - handshakeDataChannel.onbufferedamountlow = null; - handshakeDataChannel.onmessage = null; - } - handshakeDataChannel = undefined; pc!.close(); // Not necessarily necessary, but it doesn't hurt to do so. }; // Function that configures a newly-opened channel and adds it to the map. Used for both // inbound and outbound substreams. - const addChannel = (dataChannel: RTCDataChannel, direction: 'first-outbound' | 'inbound' | 'outbound') => { + const addChannel = (dataChannel: RTCDataChannel, direction: 'inbound' | 'outbound') => { const dataChannelId = dataChannel.id!; dataChannel.binaryType = 'arraybuffer'; - let isOpen = false; + let isOpen = { value: false }; dataChannel.onopen = () => { - console.assert(!isOpen, "substream opened twice") - isOpen = true; - - if (direction === 'first-outbound') { - console.assert(dataChannels.size === 0, "dataChannels not empty when opening"); - console.assert(handshakeDataChannel === dataChannel, "handshake substream mismatch"); - config.onOpen({ - type: 'multi-stream', - handshake: 'webrtc', - // `addChannel` can never be called before the local certificate is generated, so this - // value is always defined. - localTlsCertificateSha256: localTlsCertificateSha256!, - remoteTlsCertificateSha256, - }); - } else { - console.assert(direction !== 'outbound' || !handshakeDataChannel, "handshakeDataChannel still defined"); - config.onStreamOpened(dataChannelId, direction, 65536); - } + console.assert(!isOpen.value, "substream opened twice") + isOpen.value = true; + config.onStreamOpened(dataChannelId, direction); + config.onWritableBytes(65536, dataChannelId); }; dataChannel.onerror = dataChannel.onclose = (_error) => { - // A couple of different things could be happening here. - if (handshakeDataChannel === dataChannel && !isOpen) { - // The handshake data channel that we have opened ahead of time failed to open. As this - // happens before we have reported the WebRTC connection as a whole as being open, we - // need to report that the connection has failed to open. - killAllJs(); - // Note that the event doesn't give any additional reason for the failure. - config.onConnectionReset("handshake data channel failed to open"); - } else if (handshakeDataChannel === dataChannel) { - // The handshake data channel has been closed before we reported it to smoldot. This - // isn't really a problem. We just update the state and continue running. If smoldot - // requests a substream, another one will be opened. It could be a valid implementation - // to also just kill the entire connection, however doing so is a bit too intrusive and - // punches through abstraction layers. - handshakeDataChannel.onopen = null; - handshakeDataChannel.onerror = null; - handshakeDataChannel.onclose = null; - handshakeDataChannel.onbufferedamountlow = null; - handshakeDataChannel.onmessage = null; - handshakeDataChannel = undefined; - } else if (!isOpen) { - // Substream wasn't opened yet and thus has failed to open. The API has no mechanism to - // report substream openings failures. We could try opening it again, but given that - // it's unlikely to succeed, we simply opt to kill the entire connection. + if (!isOpen.value) { + // Substream wasn't opened yet and thus has failed to open. The API has no + // mechanism to report substream openings failures. We could try opening it + // again, but given that it's unlikely to succeed, we simply opt to kill the + // entire connection. killAllJs(); // Note that the event doesn't give any additional reason for the failure. config.onConnectionReset("data channel failed to open"); @@ -311,10 +263,7 @@ function connect(config: ConnectionConfig): Connection { config.onMessage(new Uint8Array(m.data), dataChannelId); } - if (direction !== 'first-outbound') - dataChannels.set(dataChannelId, { channel: dataChannel, bufferedBytes: 0 }); - else - handshakeDataChannel = dataChannel + dataChannels.set(dataChannelId, { channel: dataChannel, bufferedBytes: 0 }); } // It is possible for the browser to use multiple different certificates. @@ -472,16 +421,11 @@ function connect(config: ConnectionConfig): Connection { addChannel(channel, 'inbound') }; - // Creating a `RTCPeerConnection` doesn't actually do anything before `createDataChannel` is - // called. Smoldot's API, however, requires you to treat entire connections as open or - // closed. We know, according to the libp2p WebRTC specification, that every connection - // always starts with a substream where a handshake is performed. After we've reported that - // the connection is open, smoldot will open a substream in order to perform the handshake. - // Instead of following this API, we open this substream in advance, and will notify smoldot - // that the connection is open when the substream is open. - // Note that the label passed to `createDataChannel` is required to be empty as per the - // libp2p WebRTC specification. - addChannel(pc!.createDataChannel("", { id: 0, negotiated: true }), 'first-outbound') + config.onMultistreamHandshakeInfo({ + handshake: 'webrtc', + localTlsCertificateSha256, + remoteTlsCertificateSha256, + }); }); return { @@ -511,26 +455,11 @@ function connect(config: ConnectionConfig): Connection { closeSend: (): void => { throw new Error('Wrong connection type') }, openOutSubstream: () => { - // `openOutSubstream` can only be called after we have called `config.onOpen`, therefore - // `pc` is guaranteed to be non-null. - // As explained above, we open a data channel ahead of time. If this data channel is still - // there, we report it. - if (handshakeDataChannel) { - // Do this asynchronously because calling callbacks within callbacks is error-prone. - (async () => { - // We need to check again if `handshakeDataChannel` is still defined, as the - // connection might have been closed. - if (handshakeDataChannel) { - config.onStreamOpened(handshakeDataChannel.id!, 'outbound', 1024 * 1024) - dataChannels.set(handshakeDataChannel.id!, { channel: handshakeDataChannel, bufferedBytes: 0 }) - handshakeDataChannel = undefined - } - })() - } else { - // Note that the label passed to `createDataChannel` is required to be empty as per the - // libp2p WebRTC specification. - addChannel(pc!.createDataChannel(""), 'outbound') - } + // `openOutSubstream` can only be called after we have called `config.onOpen`, + // therefore `pc` is guaranteed to be non-null. + // Note that the label passed to `createDataChannel` is required to be empty as + // per the libp2p WebRTC specification. + addChannel(pc!.createDataChannel(""), 'outbound') } }; diff --git a/wasm-node/javascript/src/no-auto-bytecode-deno.ts b/wasm-node/javascript/src/no-auto-bytecode-deno.ts index 7135866626..ea66afbbce 100644 --- a/wasm-node/javascript/src/no-auto-bytecode-deno.ts +++ b/wasm-node/javascript/src/no-auto-bytecode-deno.ts @@ -94,7 +94,7 @@ function connect(config: ConnectionConfig): Connection { }; socket.onopen = () => { - config.onOpen({ type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: 1024 * 1024 }); + config.onWritableBytes(1024 * 1024); }; socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); @@ -151,7 +151,7 @@ function connect(config: ConnectionConfig): Connection { return established; established?.setNoDelay(); - config.onOpen({ type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: 1024 * 1024 }); + config.onWritableBytes(1024 * 1024); // Spawns an asynchronous task that continuously reads from the socket. // Every time data is read, the task re-executes itself in order to continue reading. diff --git a/wasm-node/javascript/src/no-auto-bytecode-nodejs.ts b/wasm-node/javascript/src/no-auto-bytecode-nodejs.ts index 90fa69ecbe..681cb8c8d8 100644 --- a/wasm-node/javascript/src/no-auto-bytecode-nodejs.ts +++ b/wasm-node/javascript/src/no-auto-bytecode-nodejs.ts @@ -105,7 +105,7 @@ function connect(config: ConnectionConfig): Connection { }; socket.onopen = () => { - config.onOpen({ type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: 1024 * 1024 }); + config.onWritableBytes(1024 * 1024); }; socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); @@ -161,10 +161,7 @@ function connect(config: ConnectionConfig): Connection { socket.on('connect', () => { if (socket.destroyed) return; - config.onOpen({ - type: 'single-stream', handshake: 'multistream-select-noise-yamux', - initialWritableBytes: socket.writableHighWaterMark - }); + config.onWritableBytes(socket.writableHighWaterMark); }); socket.on('close', (hasError) => { if (socket.destroyed) return; diff --git a/wasm-node/rust/src/bindings.rs b/wasm-node/rust/src/bindings.rs index 79c327454f..60d4f541ab 100644 --- a/wasm-node/rust/src/bindings.rs +++ b/wasm-node/rust/src/bindings.rs @@ -196,20 +196,15 @@ extern "C" { /// The `id` parameter is an identifier for this connection, as chosen by the Rust code. It /// must be passed on every interaction with this connection. /// - /// At any time, a connection can be in one of the three following states: + /// At any time, a connection can be in either the `Open` (the initial state) or the `Reset` + /// state. + /// When in the `Open` state, the connection can transition to the `Reset` state if the remote + /// closes the connection or refuses the connection altogether. When that happens, + /// [`connection_reset`] must be called. Once in the `Reset` state, the connection cannot + /// transition back to the `Open` state. /// - /// - `Opening` (initial state) - /// - `Open` - /// - `Reset` - /// - /// When in the `Opening` or `Open` state, the connection can transition to the `Reset` state - /// if the remote closes the connection or refuses the connection altogether. When that - /// happens, [`connection_reset`] must be called. Once in the `Reset` state, the connection - /// cannot transition back to another state. - /// - /// Initially in the `Opening` state, the connection can transition to the `Open` state if the - /// remote accepts the connection. When that happens, [`connection_open_single_stream`] or - /// [`connection_open_multi_stream`] must be called depending on the type of connection. + /// If the connection is a multistream connection, then + /// [`connection_multi_stream_set_handshake_info`] must later be called as soon as possible. /// /// There exists two kind of connections: single-stream and multi-stream. Single-stream /// connections are assumed to have a single stream open at all time and the encryption and @@ -490,22 +485,7 @@ pub extern "C" fn timer_finished() { crate::timers::timer_finished(); } -/// Called by the JavaScript code if the connection switches to the `Open` state. The connection -/// must be in the `Opening` state. -/// -/// Must be called at most once per connection object. -/// -/// See also [`connection_new`]. -/// -/// When in the `Open` state, the connection can receive messages. Use [`stream_message`] in order -/// to provide to the Rust code the messages received by the connection. -#[no_mangle] -pub extern "C" fn connection_open_single_stream(connection_id: u32, initial_writable_bytes: u32) { - crate::platform::connection_open_single_stream(connection_id, initial_writable_bytes); -} - -/// Called by the JavaScript code if the connection switches to the `Open` state. The connection -/// must be in the `Opening` state. +/// Called by the JavaScript code in order to provide information about a multistream connection. /// /// Must be called at most once per connection object. /// @@ -520,8 +500,11 @@ pub extern "C" fn connection_open_single_stream(connection_id: u32, initial_writ /// the local node's TLS certificate, followed with the SHA-256 hash of the remote node's TLS /// certificate. #[no_mangle] -pub extern "C" fn connection_open_multi_stream(connection_id: u32, handshake_ty_buffer_index: u32) { - crate::platform::connection_open_multi_stream( +pub extern "C" fn connection_multi_stream_set_handshake_info( + connection_id: u32, + handshake_ty_buffer_index: u32, +) { + crate::platform::connection_multi_stream_set_handshake_info( connection_id, get_buffer(handshake_ty_buffer_index), ); @@ -549,13 +532,8 @@ pub extern "C" fn stream_message(connection_id: u32, stream_id: u32, buffer_inde /// stream (and, in the case of a multi-stream connection, the stream itself) must be in the /// `Open` state. /// -/// `total_sent - total_reported_writable_bytes` must always be `>= 0`, where `total_sent` is the -/// total number of bytes sent on the stream using [`stream_send`] and -/// `total_reported_writable_bytes` is the total number of bytes reported using -/// [`stream_writable_bytes`]. -/// In other words, this function is meant to notify that data sent using [`stream_send`̀] has -/// effectively been sent out. It is not possible to exceed the `initial_writable_bytes` provided -/// when the stream was created. +/// The total of writable bytes must not go beyond reasonable values (e.g. a few megabytes). It +/// is not legal to provide a dummy implementation that simply passes an exceedingly large value. /// /// If `connection_id` is a single-stream connection, then the value of `stream_id` is ignored. /// If `connection_id` is a multi-stream connection, then `stream_id` corresponds to the stream @@ -576,18 +554,8 @@ pub extern "C" fn stream_writable_bytes(connection_id: u32, stream_id: u32, num_ /// value other than `0` if the substream has been opened in response to a call to /// [`connection_stream_open`]. #[no_mangle] -pub extern "C" fn connection_stream_opened( - connection_id: u32, - stream_id: u32, - outbound: u32, - initial_writable_bytes: u32, -) { - crate::platform::connection_stream_opened( - connection_id, - stream_id, - outbound, - initial_writable_bytes, - ); +pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { + crate::platform::connection_stream_opened(connection_id, stream_id, outbound); } /// Can be called at any point by the JavaScript code if the connection switches to the `Reset` diff --git a/wasm-node/rust/src/platform.rs b/wasm-node/rust/src/platform.rs index f3e486b6b5..a7ea4ba987 100644 --- a/wasm-node/rust/src/platform.rs +++ b/wasm-node/rust/src/platform.rs @@ -19,7 +19,7 @@ use crate::{bindings, timers::Delay}; use futures_lite::future::FutureExt as _; -use smoldot_light::platform::{read_write, ConnectError, SubstreamDirection}; +use smoldot_light::platform::{read_write, SubstreamDirection}; use core::{future, iter, mem, ops, pin, str, task, time::Duration}; use std::{ @@ -49,16 +49,14 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { type Instant = Duration; type MultiStream = MultiStreamWrapper; // Entry in the ̀`CONNECTIONS` map. type Stream = StreamWrapper; // Entry in the ̀`STREAMS` map and a read buffer. - type StreamConnectFuture = - pin::Pin> + Send>>; + type StreamConnectFuture = future::Ready; type ReadWriteAccess<'a> = ReadWriteAccess<'a>; type StreamErrorRef<'a> = StreamError; type MultiStreamConnectFuture = pin::Pin< Box< dyn future::Future< - Output = Result< - smoldot_light::platform::MultiStreamWebRtcConnection, - ConnectError, + Output = smoldot_light::platform::MultiStreamWebRtcConnection< + Self::MultiStream, >, > + Send, >, @@ -256,63 +254,34 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { let _prev_value = lock.connections.insert( connection_id, Connection { - inner: ConnectionInner::NotOpen, + inner: ConnectionInner::SingleStreamMsNoiseYamux, something_happened: event_listener::Event::new(), }, ); debug_assert!(_prev_value.is_none()); - Box::pin(async move { - // Wait until the connection state is no longer `ConnectionInner::NotOpen`. - let mut lock = loop { - let something_happened = { - let mut lock = STATE.try_lock().unwrap(); - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - if !matches!(connection.inner, ConnectionInner::NotOpen) { - break lock; - } - - connection.something_happened.listen() - }; - - something_happened.await - }; - let lock = &mut *lock; - - let connection = lock.connections.get_mut(&connection_id).unwrap(); + let _prev_value = lock.streams.insert( + (connection_id, None), + Stream { + reset: false, + messages_queue: VecDeque::with_capacity(8), + messages_queue_total_size: 0, + something_happened: event_listener::Event::new(), + writable_bytes_extra: 0, + }, + ); + debug_assert!(_prev_value.is_none()); - match &mut connection.inner { - ConnectionInner::NotOpen | ConnectionInner::MultiStreamWebRtc { .. } => { - unreachable!() - } - ConnectionInner::SingleStreamMsNoiseYamux => { - debug_assert!(lock.streams.contains_key(&(connection_id, None))); - Ok(StreamWrapper { - connection_id, - stream_id: None, - read_buffer: Vec::new(), - inner_expected_incoming_bytes: Some(1), - is_reset: false, - writable_bytes: 0, - write_closable, - write_closed: false, - when_wake_up: None, - }) - } - ConnectionInner::Reset { - message, - connection_handles_alive, - } => { - // Note that it is possible for the state to have transitionned to (for - // example) `ConnectionInner::SingleStreamMsNoiseYamux` and then immediately - // to `Reset`, but we don't really care about that corner case. - debug_assert_eq!(*connection_handles_alive, 0); - let message = mem::take(message); - lock.connections.remove(&connection_id).unwrap(); - Err(ConnectError { message }) - } - } + future::ready(StreamWrapper { + connection_id, + stream_id: None, + read_buffer: Vec::new(), + inner_expected_incoming_bytes: Some(1), + is_reset: false, + writable_bytes: 0, + write_closable, + write_closed: false, + when_wake_up: None, }) } @@ -357,20 +326,26 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { let _prev_value = lock.connections.insert( connection_id, Connection { - inner: ConnectionInner::NotOpen, + inner: ConnectionInner::MultiStreamUnknownHandshake { + opened_substreams_to_pick_up: VecDeque::with_capacity(0), + connection_handles_alive: 1, + }, something_happened: event_listener::Event::new(), }, ); debug_assert!(_prev_value.is_none()); Box::pin(async move { - // Wait until the connection state is no longer `ConnectionInner::NotOpen`. + // Wait until the connection state is no longer "unknown handshake". let mut lock = loop { let something_happened = { let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); - if !matches!(connection.inner, ConnectionInner::NotOpen) { + if matches!( + connection.inner, + ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. } + ) { break lock; } @@ -384,33 +359,27 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { let connection = lock.connections.get_mut(&connection_id).unwrap(); match &mut connection.inner { - ConnectionInner::NotOpen | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { + ConnectionInner::SingleStreamMsNoiseYamux { .. } + | ConnectionInner::MultiStreamUnknownHandshake { .. } => { unreachable!() } ConnectionInner::MultiStreamWebRtc { - connection_handles_alive, local_tls_certificate_sha256, remote_tls_certificate_sha256, .. - } => { - *connection_handles_alive += 1; - Ok(smoldot_light::platform::MultiStreamWebRtcConnection { + } => smoldot_light::platform::MultiStreamWebRtcConnection { + connection: MultiStreamWrapper(connection_id), + local_tls_certificate_sha256: *local_tls_certificate_sha256, + remote_tls_certificate_sha256: *remote_tls_certificate_sha256, + }, + ConnectionInner::Reset { .. } => { + // If the connection was already reset, we proceed anyway but provide a fake + // certificate hash. This has absolutely no consequence. + smoldot_light::platform::MultiStreamWebRtcConnection { connection: MultiStreamWrapper(connection_id), - local_tls_certificate_sha256: *local_tls_certificate_sha256, - remote_tls_certificate_sha256: *remote_tls_certificate_sha256, - }) - } - ConnectionInner::Reset { - message, - connection_handles_alive, - } => { - // Note that it is possible for the state to have transitionned to (for - // example) `ConnectionInner::SingleStreamMsNoiseYamux` and then immediately - // to `Reset`, but we don't really care about that corner case. - debug_assert_eq!(*connection_handles_alive, 0); - let message = mem::take(message); - lock.connections.remove(&connection_id).unwrap(); - Err(ConnectError { message }) + local_tls_certificate_sha256: [0; 32], + remote_tls_certificate_sha256: [0; 32], // TODO: this is very bad, but this field is getting removed asap + } } } }) @@ -423,7 +392,7 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { let connection_id = *connection_id; Box::pin(async move { - let (stream_id, direction, initial_writable_bytes) = loop { + let (stream_id, direction) = loop { let something_happened = { let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); @@ -434,16 +403,20 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { opened_substreams_to_pick_up, connection_handles_alive, .. + } + | ConnectionInner::MultiStreamUnknownHandshake { + opened_substreams_to_pick_up, + connection_handles_alive, + .. } => { - if let Some((substream, direction, initial_writable_bytes)) = + if let Some((substream, direction)) = opened_substreams_to_pick_up.pop_front() { *connection_handles_alive += 1; - break (substream, direction, initial_writable_bytes); + break (substream, direction); } } - ConnectionInner::NotOpen - | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { + ConnectionInner::SingleStreamMsNoiseYamux { .. } => { unreachable!() } } @@ -461,7 +434,7 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { read_buffer: Vec::new(), inner_expected_incoming_bytes: Some(1), is_reset: false, - writable_bytes: usize::try_from(initial_writable_bytes).unwrap(), + writable_bytes: 0, write_closable: false, // Note: this is currently hardcoded for WebRTC. write_closed: false, when_wake_up: None, @@ -480,11 +453,12 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { .unwrap() .inner { - ConnectionInner::MultiStreamWebRtc { .. } => unsafe { + ConnectionInner::MultiStreamWebRtc { .. } + | ConnectionInner::MultiStreamUnknownHandshake { .. } => unsafe { bindings::connection_stream_open(*connection_id) }, ConnectionInner::Reset { .. } => {} - ConnectionInner::NotOpen | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { + ConnectionInner::SingleStreamMsNoiseYamux { .. } => { unreachable!() } } @@ -537,9 +511,9 @@ impl smoldot_light::platform::PlatformRef for PlatformRef { } if stream_inner.writable_bytes_extra != 0 { - // As documented, the number of writable bytes must never exceed the - // initial writable bytes value. As such, this can't overflow unless there - // is a bug on the JavaScript side. + // As documented, the number of writable bytes must never become + // exceedingly large (a few megabytes). As such, this can't overflow + // unless there is a bug on the JavaScript side. stream.writable_bytes += stream_inner.writable_bytes_extra; stream_inner.writable_bytes_extra = 0; shall_return = true; @@ -714,7 +688,6 @@ impl Drop for StreamWrapper { .unwrap(); let remove_connection = match &mut connection.inner { - ConnectionInner::NotOpen => unreachable!(), ConnectionInner::SingleStreamMsNoiseYamux { .. } => { if !removed_stream.reset { unsafe { @@ -728,6 +701,10 @@ impl Drop for StreamWrapper { ConnectionInner::MultiStreamWebRtc { connection_handles_alive, .. + } + | ConnectionInner::MultiStreamUnknownHandshake { + connection_handles_alive, + .. } => { if !removed_stream.reset { unsafe { @@ -769,12 +746,16 @@ impl Drop for MultiStreamWrapper { let connection = lock.connections.get_mut(&self.0).unwrap(); let (remove_connection, reset_connection) = match &mut connection.inner { - ConnectionInner::NotOpen | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { + ConnectionInner::SingleStreamMsNoiseYamux { .. } => { unreachable!() } ConnectionInner::MultiStreamWebRtc { connection_handles_alive, .. + } + | ConnectionInner::MultiStreamUnknownHandshake { + connection_handles_alive, + .. } => { *connection_handles_alive -= 1; let v = *connection_handles_alive == 0; @@ -832,13 +813,21 @@ struct Connection { } enum ConnectionInner { - NotOpen, SingleStreamMsNoiseYamux, + MultiStreamUnknownHandshake { + /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, + /// but that haven't been reported through + /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. + opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, + /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference + /// this connection. If it switches from 1 to 0, the connection must be removed. + connection_handles_alive: u32, + }, MultiStreamWebRtc { /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, /// but that haven't been reported through /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. - opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection, u32)>, + opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference /// this connection. If it switches from 1 to 0, the connection must be removed. connection_handles_alive: u32, @@ -850,7 +839,8 @@ enum ConnectionInner { /// [`bindings::connection_reset`] has been called Reset { /// Message given by the bindings to justify the closure. - message: String, + // TODO: why is this unused? shouldn't it be not unused? + _message: String, /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference /// this connection. If it switches from 1 to 0, the connection must be removed. connection_handles_alive: u32, @@ -860,9 +850,8 @@ enum ConnectionInner { struct Stream { /// `true` if [`bindings::stream_reset`] has been called. reset: bool, - /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] or - /// `initial_writable_bytes` that haven't been processed yet in a call to - /// `update_stream`. + /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] that + /// haven't been processed yet in a call to `update_stream`. writable_bytes_extra: usize, /// List of messages received through [`bindings::stream_message`]. Must never contain /// empty messages. @@ -874,31 +863,10 @@ struct Stream { something_happened: event_listener::Event, } -pub(crate) fn connection_open_single_stream(connection_id: u32, initial_writable_bytes: u32) { - let mut lock = STATE.try_lock().unwrap(); - let lock = &mut *lock; - - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - debug_assert!(matches!(connection.inner, ConnectionInner::NotOpen)); - connection.inner = ConnectionInner::SingleStreamMsNoiseYamux; - - let _prev_value = lock.streams.insert( - (connection_id, None), - Stream { - reset: false, - messages_queue: VecDeque::with_capacity(8), - messages_queue_total_size: 0, - something_happened: event_listener::Event::new(), - writable_bytes_extra: usize::try_from(initial_writable_bytes).unwrap(), - }, - ); - debug_assert!(_prev_value.is_none()); - - connection.something_happened.notify(usize::max_value()); -} - -pub(crate) fn connection_open_multi_stream(connection_id: u32, handshake_ty: Vec) { +pub(crate) fn connection_multi_stream_set_handshake_info( + connection_id: u32, + handshake_ty: Vec, +) { let (_, (local_tls_certificate_sha256, remote_tls_certificate_sha256)) = nom::sequence::preceded( nom::bytes::streaming::tag::<_, _, nom::error::Error<&[u8]>>(&[0]), @@ -911,12 +879,15 @@ pub(crate) fn connection_open_multi_stream(connection_id: u32, handshake_ty: Vec }), )), )(&handshake_ty[..]) - .expect("invalid handshake type provided to connection_open_multi_stream"); + .expect("invalid handshake type provided to connection_multi_stream_set_handshake_info"); let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); - debug_assert!(matches!(connection.inner, ConnectionInner::NotOpen)); + assert!(matches!( + connection.inner, + ConnectionInner::MultiStreamUnknownHandshake { .. } + )); connection.inner = ConnectionInner::MultiStreamWebRtc { opened_substreams_to_pick_up: VecDeque::with_capacity(8), @@ -935,9 +906,10 @@ pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u // For single stream connections, the docs of this function mentions that `stream_id` can be // any value. let actual_stream_id = match connection.inner { - ConnectionInner::MultiStreamWebRtc { .. } => Some(stream_id), + ConnectionInner::MultiStreamWebRtc { .. } + | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, - ConnectionInner::Reset { .. } | ConnectionInner::NotOpen => unreachable!(), + ConnectionInner::Reset { .. } => unreachable!(), }; let stream = lock @@ -946,8 +918,8 @@ pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u .unwrap(); debug_assert!(!stream.reset); - // As documented, the number of writable bytes must never exceed the initial writable bytes - // value. As such, this can't overflow unless there is a bug on the JavaScript side. + // As documented, the number of writable bytes must never become exceedingly large (a few + // megabytes). As such, this can't overflow unless there is a bug on the JavaScript side. stream.writable_bytes_extra += usize::try_from(bytes).unwrap(); stream.something_happened.notify(usize::max_value()); } @@ -960,9 +932,10 @@ pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Vec Some(stream_id), + ConnectionInner::MultiStreamWebRtc { .. } + | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, - ConnectionInner::Reset { .. } | ConnectionInner::NotOpen => unreachable!(), + ConnectionInner::Reset { .. } => unreachable!(), }; let stream = lock @@ -1010,12 +983,7 @@ pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Vec) { let connection = lock.connections.get_mut(&connection_id).unwrap(); let connection_handles_alive = match &connection.inner { - ConnectionInner::NotOpen => 0, ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something ConnectionInner::MultiStreamWebRtc { connection_handles_alive, .. + } + | ConnectionInner::MultiStreamUnknownHandshake { + connection_handles_alive, + .. } => *connection_handles_alive, ConnectionInner::Reset { .. } => unreachable!(), }; connection.inner = ConnectionInner::Reset { connection_handles_alive, - message: str::from_utf8(&message) + _message: str::from_utf8(&message) .unwrap_or_else(|_| panic!("non-UTF-8 message")) .to_owned(), }; From 71d9f3436c1b62ae96cf740099e8bc11b15175c2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 Nov 2023 12:52:27 +0100 Subject: [PATCH 2/4] Typo --- light-base/src/platform.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/light-base/src/platform.rs b/light-base/src/platform.rs index 96f7f0dd7a..d8e4f18bd9 100644 --- a/light-base/src/platform.rs +++ b/light-base/src/platform.rs @@ -128,7 +128,7 @@ pub trait PlatformRef: Clone + Send + Sync + 'static { /// /// This function returns a `Future`. This `Future` **must** return as soon as possible, and /// must **not** wait for the connection to be established. - /// In the most scenarios, the `Future` returned by this function should immediately produce + /// In most scenarios, the `Future` returned by this function should immediately produce /// an output. /// /// # Panic @@ -142,7 +142,7 @@ pub trait PlatformRef: Clone + Send + Sync + 'static { /// /// This function returns a `Future`. This `Future` **must** return as soon as possible, and /// must **not** wait for the connection to be established. - /// In the most scenarios, the `Future` returned by this function should immediately produce + /// In most scenarios, the `Future` returned by this function should immediately produce /// an output. /// /// # Panic From 4ba73c018678373c34f9f6897ddf5b2eb74a9a30 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 Nov 2023 13:13:10 +0100 Subject: [PATCH 3/4] Rustfmt --- light-base/src/platform/default.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/light-base/src/platform/default.rs b/light-base/src/platform/default.rs index a9f8787833..c63e82377d 100644 --- a/light-base/src/platform/default.rs +++ b/light-base/src/platform/default.rs @@ -19,8 +19,8 @@ #![cfg_attr(docsrs, doc(cfg(feature = "std")))] use super::{ - with_buffers, Address, ConnectionType, IpAddr, MultiStreamAddress, - MultiStreamWebRtcConnection, PlatformRef, SubstreamDirection, + with_buffers, Address, ConnectionType, IpAddr, MultiStreamAddress, MultiStreamWebRtcConnection, + PlatformRef, SubstreamDirection, }; use alloc::{borrow::Cow, sync::Arc}; From fb7c27e06b2d02a3925768c548f405b9fccac345 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 Nov 2023 13:14:07 +0100 Subject: [PATCH 4/4] Docfix --- wasm-node/rust/src/bindings.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wasm-node/rust/src/bindings.rs b/wasm-node/rust/src/bindings.rs index 60d4f541ab..8dc6918405 100644 --- a/wasm-node/rust/src/bindings.rs +++ b/wasm-node/rust/src/bindings.rs @@ -522,7 +522,7 @@ pub extern "C" fn connection_multi_stream_set_handshake_info( /// If `connection_id` is a multi-stream connection, then `stream_id` corresponds to the stream /// on which the data was received, as was provided to [`connection_stream_opened`]. /// -/// See also [`connection_open_single_stream`] and [`connection_open_multi_stream`]. +/// See also [`connection_new`]. #[no_mangle] pub extern "C" fn stream_message(connection_id: u32, stream_id: u32, buffer_index: u32) { crate::platform::stream_message(connection_id, stream_id, get_buffer(buffer_index)); @@ -585,7 +585,7 @@ pub extern "C" fn connection_reset(connection_id: u32, buffer_index: u32) { /// /// It is illegal to call this function on a single-stream connections. /// -/// See also [`connection_open_multi_stream`]. +/// See also [`connection_new`]. #[no_mangle] pub extern "C" fn stream_reset(connection_id: u32, stream_id: u32) { crate::platform::stream_reset(connection_id, stream_id);