Skip to content

Commit

Permalink
Add network info tab with latency estimates (#16)
Browse files Browse the repository at this point in the history
* Design component for network info summary

* Improve latency display component and give props

* Backend for latency measurements
  • Loading branch information
ekzhang authored Nov 4, 2023
1 parent c296888 commit 2f48d0d
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 13 deletions.
2 changes: 2 additions & 0 deletions crates/sshx-core/proto/sshx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message ClientUpdate {
TerminalData data = 2; // Stream data from the terminal.
NewShell created_shell = 3; // Acknowledge that a new shell was created.
uint32 closed_shell = 4; // Acknowledge that a shell was closed.
fixed64 pong = 14; // Response for latency measurement.
string error = 15;
}
}
Expand All @@ -80,6 +81,7 @@ message ServerUpdate {
uint32 close_shell = 3; // ID of a shell to close.
SequenceNumbers sync = 4; // Periodic sequence number sync.
TerminalSize resize = 5; // Resize a terminal window.
fixed64 ping = 14; // Request a pong, with the timestamp.
string error = 15;
}
}
Expand Down
30 changes: 26 additions & 4 deletions crates/sshx-server/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Defines gRPC routes and application request logic.
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use base64::prelude::{Engine as _, BASE64_STANDARD};
use hmac::Mac;
Expand All @@ -22,6 +22,9 @@ use crate::ServerState;
/// Interval for synchronizing sequence numbers with the client.
pub const SYNC_INTERVAL: Duration = Duration::from_secs(5);

/// Interval for measuring client latency.
pub const PING_INTERVAL: Duration = Duration::from_secs(2);

/// Server that handles gRPC requests from the sshx command-line client.
#[derive(Clone)]
pub struct GrpcServer(Arc<ServerState>);
Expand Down Expand Up @@ -133,17 +136,25 @@ async fn handle_streaming(
session: &Session,
mut stream: Streaming<ClientUpdate>,
) -> Result<(), &'static str> {
let mut interval = time::interval(SYNC_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut sync_interval = time::interval(SYNC_INTERVAL);
sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut ping_interval = time::interval(PING_INTERVAL);
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
tokio::select! {
// Send periodic sync messages to the client.
_ = interval.tick() => {
_ = sync_interval.tick() => {
let msg = ServerMessage::Sync(session.sequence_numbers());
if !send_msg(tx, msg).await {
return Err("failed to send sync message");
}
}
// Send periodic pings to the client.
_ = ping_interval.tick() => {
send_msg(tx, ServerMessage::Ping(get_time_ms())).await;
}
// Send buffered server updates to the client.
Ok(msg) = session.update_rx().recv() => {
if !send_msg(tx, msg).await {
Expand Down Expand Up @@ -195,6 +206,10 @@ async fn handle_update(tx: &ServerTx, session: &Session, update: ClientUpdate) -
return send_err(tx, format!("close shell: {:?}", err)).await;
}
}
Some(ClientMessage::Pong(ts)) => {
let latency = get_time_ms().saturating_sub(ts);
session.send_latency_measurement(latency);
}
Some(ClientMessage::Error(err)) => {
// TODO: Propagate these errors to listeners on the web interface?
error!(?err, "error received from client");
Expand All @@ -216,3 +231,10 @@ async fn send_msg(tx: &ServerTx, message: ServerMessage) -> bool {
async fn send_err(tx: &ServerTx, err: String) -> bool {
send_msg(tx, ServerMessage::Error(err)).await
}

fn get_time_ms() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system time is before the UNIX epoch")
.as_millis() as u64
}
7 changes: 6 additions & 1 deletion crates/sshx-server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Session {
counter: IdCounter::default(),
last_accessed: Mutex::new(now),
source: watch::channel(Vec::new()).0,
broadcast: broadcast::channel(32).0,
broadcast: broadcast::channel(64).0,
update_tx,
update_rx,
sync_notify: Notify::new(),
Expand Down Expand Up @@ -351,6 +351,11 @@ impl Session {
Ok(())
}

/// Send a measurement of the shell latency.
pub fn send_latency_measurement(&self, latency: u64) {
self.broadcast.send(WsServer::ShellLatency(latency)).ok();
}

/// Register a backend client heartbeat, refreshing the timestamp.
pub fn access(&self) {
*self.last_accessed.lock() = Instant::now();
Expand Down
6 changes: 6 additions & 0 deletions crates/sshx-server/src/web/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum WsServer {
Chunks(Sid, u64, Vec<Bytes>),
/// Get a chat message tuple `(uid, name, text)` from the room.
Hear(Uid, String, String),
/// Forward a latency measurement between the server and backend shell.
ShellLatency(u64),
/// Echo back a timestamp, for the the client's own latency measurement.
Pong(u64),
/// Alert the client of an application error.
Error(String),
}
Expand Down Expand Up @@ -87,4 +91,6 @@ pub enum WsClient {
Subscribe(Sid, u64),
/// Send a a chat message to the room.
Chat(String),
/// Send a ping to the server, for latency measurement.
Ping(u64),
}
3 changes: 3 additions & 0 deletions crates/sshx-server/src/web/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ async fn handle_socket(socket: &mut WebSocket, session: Arc<Session>) -> Result<
WsClient::Chat(msg) => {
session.send_chat(user_id, &msg)?;
}
WsClient::Ping(ts) => {
send(socket, WsServer::Pong(ts)).await?;
}
}
}
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crates/sshx-server/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ impl ClientSocket {
WsServer::Hear(id, name, msg) => {
self.messages.push((id, name, msg));
}
WsServer::ShellLatency(_) => {}
WsServer::Pong(_) => {}
WsServer::Error(err) => self.errors.push(err),
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/sshx/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ impl Controller {
warn!(%msg.id, "received resize for non-existing shell");
}
}
ServerMessage::Ping(ts) => {
// Echo back the timestamp, for stateless latency measurement.
send_msg(&tx, ClientMessage::Pong(ts)).await?;
}
ServerMessage::Error(err) => {
error!(?err, "error received from server");
}
Expand Down
72 changes: 66 additions & 6 deletions src/lib/Session.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import Chat, { type ChatMessage } from "./ui/Chat.svelte";
import ChooseName from "./ui/ChooseName.svelte";
import NameList from "./ui/NameList.svelte";
import NetworkInfo from "./ui/NetworkInfo.svelte";
import Settings from "./ui/Settings.svelte";
import Toolbar from "./ui/Toolbar.svelte";
import XTerm from "./ui/XTerm.svelte";
Expand Down Expand Up @@ -51,6 +52,10 @@
let center = [0, 0];
let zoom = INITIAL_ZOOM;
let showChat = false; // @hmr:keep
let settingsOpen = false; // @hmr:keep
let showNetworkInfo = false; // @hmr:keep
onMount(() => {
touchZoom = new TouchZoom(fabricEl);
touchZoom.onMove(() => {
Expand All @@ -67,6 +72,8 @@
(document.activeElement as HTMLElement).blur();
}
}
showNetworkInfo = false;
});
});
Expand All @@ -85,9 +92,6 @@
let connected = false;
let exitReason: string | null = null;
let showChat = false; // @hmr:keep
let settingsOpen = false; // @hmr:keep
/** Bound "write" method for each terminal. */
const writers: Record<number, (data: string) => void> = {};
const termWrappers: Record<number, HTMLDivElement> = {};
Expand All @@ -112,6 +116,9 @@
let chatMessages: ChatMessage[] = [];
let newMessages = false;
let serverLatencies: number[] = [];
let shellLatencies: number[] = [];
onMount(async () => {
// The page hash sets the end-to-end encryption key.
const key = window.location.hash?.slice(1) ?? "";
Expand Down Expand Up @@ -172,6 +179,12 @@
chatMessages.push({ uid, name, msg, sentAt: new Date() });
chatMessages = chatMessages;
if (!showChat) newMessages = true;
} else if (message.shellLatency !== undefined) {
const shellLatency = Number(message.shellLatency);
shellLatencies = [...shellLatencies, shellLatency].slice(-10);
} else if (message.pong !== undefined) {
const serverLatency = Date.now() - Number(message.pong);
serverLatencies = [...serverLatencies, serverLatency].slice(-10);
} else if (message.error) {
console.warn("Server error: " + message.error);
}
Expand All @@ -189,6 +202,8 @@
connected = false;
subscriptions.clear();
users = [];
serverLatencies = [];
shellLatencies = [];
},
onClose(event) {
Expand All @@ -203,6 +218,27 @@
onDestroy(() => srocket?.dispose());
// Send periodic ping messages for latency estimation.
onMount(() => {
const pingIntervalId = window.setInterval(() => {
if (srocket?.connected) {
srocket.send({ ping: BigInt(Date.now()) });
}
}, 2000);
return () => window.clearInterval(pingIntervalId);
});
function integerMedian(values: number[]) {
if (values.length === 0) {
return null;
}
const sorted = values.toSorted();
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 !== 0
? sorted[mid]
: Math.round((sorted[mid - 1] + sorted[mid]) / 2);
}
$: if ($settings.name) {
srocket?.send({ setName: $settings.name });
}
Expand Down Expand Up @@ -350,7 +386,24 @@
on:settings={() => {
settingsOpen = true;
}}
on:networkInfo={() => {
showNetworkInfo = !showNetworkInfo;
}}
/>

{#if showNetworkInfo}
<div class="absolute top-20 translate-x-[116.5px]">
<NetworkInfo
status={connected
? "connected"
: exitReason
? "no-shell"
: "no-server"}
serverLatency={integerMedian(serverLatencies)}
shellLatency={integerMedian(shellLatencies)}
/>
</div>
{/if}
</div>

{#if showChat}
Expand Down Expand Up @@ -426,16 +479,23 @@
const cols = ws.cols + 10;
srocket?.send({ move: [id, { ...ws, rows, cols }] });
}}
on:bringToFront={() => srocket?.send({ move: [id, null] })}
on:bringToFront={() => {
showNetworkInfo = false;
srocket?.send({ move: [id, null] });
}}
on:startMove={({ detail: event }) => {
const [x, y] = normalizePosition(event);
moving = id;
movingOrigin = [x - ws.x, y - ws.y];
movingSize = ws;
movingIsDone = false;
}}
on:focus={() => (focused = [...focused, id])}
on:blur={() => (focused = focused.filter((i) => i !== id))}
on:focus={() => {
focused = [...focused, id];
}}
on:blur={() => {
focused = focused.filter((i) => i !== id);
}}
/>

<!-- User avatars -->
Expand Down
3 changes: 3 additions & 0 deletions src/lib/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type WsServer = {
shells?: [Sid, WsWinsize][];
chunks?: [Sid, number, Uint8Array[]];
hear?: [Uid, string, string];
shellLatency?: number | bigint;
pong?: number | bigint;
error?: string;
};

Expand All @@ -40,4 +42,5 @@ export type WsClient = {
data?: [Sid, Uint8Array, bigint];
subscribe?: [Sid, number];
chat?: string;
ping?: bigint;
};
4 changes: 4 additions & 0 deletions src/lib/srocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export class Srocket<T, U> {
this.#reconnect();
}

get connected() {
return this.#connected;
}

/** Queue a message to send to the server, with "at-most-once" semantics. */
send(message: U) {
const data = encode(message);
Expand Down
Loading

0 comments on commit 2f48d0d

Please sign in to comment.