From 28fae411ccc8185ed9a3939d5d80c47ee8f45308 Mon Sep 17 00:00:00 2001 From: Jai A Date: Fri, 27 Dec 2024 16:56:05 -0700 Subject: [PATCH 1/4] Fix sockets causing actix hangs --- apps/labrinth/src/routes/internal/statuses.rs | 32 +++++++++++-------- apps/labrinth/src/routes/v3/friends.rs | 18 ++++++----- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index b54af1869..3ac6ee923 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -10,9 +10,9 @@ use crate::queue::socket::ActiveSockets; use crate::routes::ApiError; use actix_web::web::{Data, Payload}; use actix_web::{get, web, HttpRequest, HttpResponse}; -use actix_ws::AggregatedMessage; +use actix_ws::Message; use chrono::Utc; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -128,13 +128,13 @@ pub async fn ws_init( ) .await?; - let mut stream = msg_stream.aggregate_continuations(); + let mut stream = msg_stream.into_stream(); actix_web::rt::spawn(async move { // receive messages from websocket while let Some(msg) = stream.next().await { match msg { - Ok(AggregatedMessage::Text(text)) => { + Ok(Message::Text(text)) => { if let Ok(message) = serde_json::from_str::(&text) { @@ -159,10 +159,14 @@ pub async fn ws_init( status.profile_name = profile_name; status.last_update = Utc::now(); + let user_status = status.clone(); + // We drop the pair to avoid holding the lock for too long + drop(pair); + let _ = broadcast_friends( user.id, ServerToClientMessage::StatusUpdate { - status: status.clone(), + status: user_status, }, &pool, &db, @@ -175,15 +179,14 @@ pub async fn ws_init( } } - Ok(AggregatedMessage::Close(_)) => { + Ok(Message::Close(_)) => { let _ = close_socket(user.id, &pool, &db).await; } - Ok(AggregatedMessage::Ping(msg)) => { - if let Some(mut socket) = db.auth_sockets.get_mut(&user.id) - { - let (_, socket) = socket.value_mut(); - let _ = socket.pong(&msg).await; + Ok(Message::Ping(msg)) => { + if let Some(mut socket) = db.auth_sockets.get(&user.id) { + let (_, socket) = socket.value(); + let _ = socket.clone().pong(&msg).await; } } @@ -219,11 +222,12 @@ pub async fn broadcast_friends( if friend.accepted { if let Some(mut socket) = - sockets.auth_sockets.get_mut(&friend_id.into()) + sockets.auth_sockets.get(&friend_id.into()) { - let (_, socket) = socket.value_mut(); + let (_, socket) = socket.value(); - let _ = socket.text(serde_json::to_string(&message)?).await; + let _ = + socket.clone().text(serde_json::to_string(&message)?).await; } } } diff --git a/apps/labrinth/src/routes/v3/friends.rs b/apps/labrinth/src/routes/v3/friends.rs index 9541c3dd5..f7e5a96ef 100644 --- a/apps/labrinth/src/routes/v3/friends.rs +++ b/apps/labrinth/src/routes/v3/friends.rs @@ -78,12 +78,13 @@ pub async fn add_friend( ) -> Result<(), ApiError> { if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) { let (friend_status, _) = pair.value(); - if let Some(mut socket) = - sockets.auth_sockets.get_mut(&friend_id.into()) + if let Some(socket) = + sockets.auth_sockets.get(&friend_id.into()) { - let (_, socket) = socket.value_mut(); + let (_, socket) = socket.value(); let _ = socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::StatusUpdate { status: friend_status.clone(), @@ -120,11 +121,11 @@ pub async fn add_friend( .insert(&mut transaction) .await?; - if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) - { - let (_, socket) = socket.value_mut(); + if let Some(socket) = db.auth_sockets.get(&friend.id.into()) { + let (_, socket) = socket.value(); if socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::FriendRequest { from: user.id }, )?) @@ -177,10 +178,11 @@ pub async fn remove_friend( ) .await?; - if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) { - let (_, socket) = socket.value_mut(); + if let Some(mut socket) = db.auth_sockets.get(&friend.id.into()) { + let (_, socket) = socket.value(); let _ = socket + .clone() .text(serde_json::to_string( &ServerToClientMessage::FriendRequestRejected { from: user.id, From f195a9dd52623f6a83d4f17ba4ae8f27451a0f00 Mon Sep 17 00:00:00 2001 From: Jai A Date: Fri, 27 Dec 2024 17:30:08 -0700 Subject: [PATCH 2/4] Fix fmt issues --- apps/labrinth/src/routes/internal/statuses.rs | 6 ++---- apps/labrinth/src/routes/v3/friends.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index 3ac6ee923..4b595b1cc 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -184,7 +184,7 @@ pub async fn ws_init( } Ok(Message::Ping(msg)) => { - if let Some(mut socket) = db.auth_sockets.get(&user.id) { + if let Some(socket) = db.auth_sockets.get(&user.id) { let (_, socket) = socket.value(); let _ = socket.clone().pong(&msg).await; } @@ -221,9 +221,7 @@ pub async fn broadcast_friends( }; if friend.accepted { - if let Some(mut socket) = - sockets.auth_sockets.get(&friend_id.into()) - { + if let Some(socket) = sockets.auth_sockets.get(&friend_id.into()) { let (_, socket) = socket.value(); let _ = diff --git a/apps/labrinth/src/routes/v3/friends.rs b/apps/labrinth/src/routes/v3/friends.rs index f7e5a96ef..552e75831 100644 --- a/apps/labrinth/src/routes/v3/friends.rs +++ b/apps/labrinth/src/routes/v3/friends.rs @@ -178,7 +178,7 @@ pub async fn remove_friend( ) .await?; - if let Some(mut socket) = db.auth_sockets.get(&friend.id.into()) { + if let Some(socket) = db.auth_sockets.get(&friend.id.into()) { let (_, socket) = socket.value(); let _ = socket From 736b11371305004a7783c5f338b01f910b0d9bf0 Mon Sep 17 00:00:00 2001 From: Jai A Date: Fri, 27 Dec 2024 17:58:16 -0700 Subject: [PATCH 3/4] Retry failed S3 uploads --- apps/labrinth/src/file_hosting/s3_host.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/labrinth/src/file_hosting/s3_host.rs b/apps/labrinth/src/file_hosting/s3_host.rs index 87be229ab..c48937811 100644 --- a/apps/labrinth/src/file_hosting/s3_host.rs +++ b/apps/labrinth/src/file_hosting/s3_host.rs @@ -74,10 +74,10 @@ impl FileHost for S3Host { content_type, ) .await - .map_err(|_| { - FileHostingError::S3Error( - "Error while uploading file to S3".to_string(), - ) + .map_err(|err| { + FileHostingError::S3Error(format!( + "Error while uploading file {file_name} to S3: {err}" + )) })?; Ok(UploadFileData { @@ -100,10 +100,10 @@ impl FileHost for S3Host { self.bucket .delete_object(format!("/{file_name}")) .await - .map_err(|_| { - FileHostingError::S3Error( - "Error while deleting file from S3".to_string(), - ) + .map_err(|err| { + FileHostingError::S3Error(format!( + "Error while deleting file {file_name} to S3: {err}" + )) })?; Ok(DeleteFileData { From 6ff145c5bb822758d25fa40b586a364ad1ecc599 Mon Sep 17 00:00:00 2001 From: Jai A Date: Fri, 27 Dec 2024 18:05:02 -0700 Subject: [PATCH 4/4] Ignore launcher socket from sentry --- apps/labrinth/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index 336150c8f..6be1aebcf 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -92,6 +92,7 @@ async fn main() -> std::io::Result<()> { let prometheus = PrometheusMetricsBuilder::new("labrinth") .endpoint("/metrics") + .exclude("/_internal/launcher_socket") .build() .expect("Failed to create prometheus metrics middleware");