Skip to content

Commit

Permalink
remove hyper-tungstenite
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jan 11, 2024
1 parent a806a08 commit 979373c
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 120 deletions.
16 changes: 2 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ http-types = { version = "2", default-features = false }
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper-tungstenite = "0.11"
inotify = "0.10.2"
ipnet = "2.9.0"
itertools = "0.10"
Expand Down Expand Up @@ -156,13 +155,15 @@ tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
tokio-tungstenite = "0.20"
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
tungstenite = "0.20"
url = "2.2"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
Expand Down
3 changes: 2 additions & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ hex.workspace = true
hmac.workspace = true
hostname.workspace = true
humantime.workspace = true
hyper-tungstenite.workspace = true
hyper.workspace = true
ipnet.workspace = true
itertools.workspace = true
Expand Down Expand Up @@ -66,11 +65,13 @@ tls-listener.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio-tungstenite.workspace = true
tokio = { workspace = true, features = ["signal"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
tungstenite.workspace = true
url.workspace = true
utils.workspace = true
uuid.workspace = true
Expand Down
102 changes: 8 additions & 94 deletions proxy/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ mod websocket;
pub use conn_pool::GlobalConnPoolOptions;

use anyhow::bail;
use hyper::ext::Protocol;
use hyper::StatusCode;
use hyper_tungstenite::tungstenite::error::{Error as WSError, ProtocolError};
use hyper_tungstenite::tungstenite::protocol::Role;
use hyper_tungstenite::WebSocketStream;
use metrics::IntCounterPairGuard;
use rand::rngs::StdRng;
use rand::SeedableRng;
Expand All @@ -37,7 +33,6 @@ use hyper::{
};

use std::net::IpAddr;
use std::pin::Pin;
use std::task::Poll;
use std::{future::ready, sync::Arc};
use tls_listener::TlsListener;
Expand Down Expand Up @@ -223,11 +218,13 @@ async fn request_handler(
.and_then(|h| h.split(':').next())
.map(|s| s.to_string());

let ws_config = None;

// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
if websocket::is_upgrade_request(&request) {
info!(session_id = ?session_id, "performing websocket upgrade");

let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
let (response, websocket) = websocket::upgrade(&mut request, ws_config)
.map_err(|e| ApiError::BadRequest(e.into()))?;

ws_connections.spawn(
Expand All @@ -252,11 +249,11 @@ async fn request_handler(

// Return the response so the spawned future can continue.
Ok(response)
} else if is_upgrade2_request(&request) {
} else if websocket::is_connect_request(&request) {
info!(session_id = ?session_id, "performing http2 websocket upgrade");

let (response, websocket) =
upgrade_http2(&mut request).map_err(|e| ApiError::BadRequest(e.into()))?;
let (response, websocket) = websocket::connect(&mut request, ws_config)
.map_err(|e| ApiError::BadRequest(e.into()))?;

ws_connections.spawn(
async move {
Expand Down Expand Up @@ -298,7 +295,7 @@ async fn request_handler(
.header("Access-Control-Allow-Origin", "*")
.header(
"Access-Control-Allow-Headers",
"Neon-Endpoint, Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Neon-Batch-Read-Only, Neon-Batch-Isolation-Level",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Neon-Batch-Read-Only, Neon-Batch-Isolation-Level",
)
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
Expand All @@ -308,86 +305,3 @@ async fn request_handler(
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}

pin_project_lite::pin_project! {
/// A future that resolves to a websocket stream when the associated HTTP2 connect completes.
#[derive(Debug)]
pub struct HyperWebsocket2 {
#[pin]
inner: hyper::upgrade::OnUpgrade,
}
}

/// Try to upgrade a received `hyper::Request` to a websocket connection.
///
/// The function returns a HTTP response and a future that resolves to the websocket stream.
/// The response body *MUST* be sent to the client before the future can be resolved.
///
/// This functions checks `Sec-WebSocket-Version` header.
/// It does not inspect the `Origin`, `Sec-WebSocket-Protocol` or `Sec-WebSocket-Extensions` headers.
/// You can inspect the headers manually before calling this function,
/// and modify the response headers appropriately.
///
/// This function also does not look at the `Connection` or `Upgrade` headers.
/// To check if a request is a websocket upgrade request, you can use [`is_upgrade2_request`].
/// Alternatively you can inspect the `Connection` and `Upgrade` headers manually.
///
fn upgrade_http2<B>(
mut request: impl std::borrow::BorrowMut<Request<B>>,
) -> Result<(Response<Body>, HyperWebsocket2), ProtocolError> {
let request = request.borrow_mut();

if request
.headers()
.get("Sec-WebSocket-Version")
.map(|v| v.as_bytes())
!= Some(b"13")
{
return Err(ProtocolError::MissingSecWebSocketVersionHeader);
}

let response = Response::builder()
.status(hyper::StatusCode::OK)
.body(Body::from("switching to websocket protocol"))
.expect("bug: failed to build response");

let stream = HyperWebsocket2 {
inner: hyper::upgrade::on(request),
};

Ok((response, stream))
}

/// Check if a request is a websocket connect request.
pub fn is_upgrade2_request<B>(request: &hyper::Request<B>) -> bool {
request.method() == Method::CONNECT
&& request
.extensions()
.get::<Protocol>()
.is_some_and(|protocol| protocol.as_str() == "websocket")
}

impl std::future::Future for HyperWebsocket2 {
type Output = Result<WebSocketStream<hyper::upgrade::Upgraded>, WSError>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
let this = self.project();
let upgraded = match this.inner.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(x) => x,
};

let upgraded =
upgraded.map_err(|_| WSError::Protocol(ProtocolError::HandshakeIncomplete))?;

let stream = WebSocketStream::from_raw_socket(upgraded, Role::Server, None);
tokio::pin!(stream);

// The future returned by `from_raw_socket` is always ready.
// Not sure why it is a future in the first place.
match stream.as_mut().poll(cx) {
Poll::Pending => unreachable!("from_raw_socket should always be created ready"),
Poll::Ready(x) => Poll::Ready(Ok(x)),
}
}
}
Loading

0 comments on commit 979373c

Please sign in to comment.