Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sockets causing actix hangs #3089

Merged
merged 4 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions apps/labrinth/src/file_hosting/s3_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ impl FileHost for S3Host {
content_type,
)
.await
.map_err(|_| {
FileHostingError::S3Error(
"Error while uploading file to S3".to_string(),
)
.map_err(|err| {
FileHostingError::S3Error(format!(
"Error while uploading file {file_name} to S3: {err}"
))
})?;

Ok(UploadFileData {
Expand All @@ -100,10 +100,10 @@ impl FileHost for S3Host {
self.bucket
.delete_object(format!("/{file_name}"))
.await
.map_err(|_| {
FileHostingError::S3Error(
"Error while deleting file from S3".to_string(),
)
.map_err(|err| {
FileHostingError::S3Error(format!(
"Error while deleting file {file_name} to S3: {err}"
))
})?;

Ok(DeleteFileData {
Expand Down
1 change: 1 addition & 0 deletions apps/labrinth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ async fn main() -> std::io::Result<()> {

let prometheus = PrometheusMetricsBuilder::new("labrinth")
.endpoint("/metrics")
.exclude("/_internal/launcher_socket")
.build()
.expect("Failed to create prometheus metrics middleware");

Expand Down
34 changes: 18 additions & 16 deletions apps/labrinth/src/routes/internal/statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::queue::socket::ActiveSockets;
use crate::routes::ApiError;
use actix_web::web::{Data, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use actix_ws::Message;
use chrono::Utc;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

Expand Down Expand Up @@ -128,13 +128,13 @@ pub async fn ws_init(
)
.await?;

let mut stream = msg_stream.aggregate_continuations();
let mut stream = msg_stream.into_stream();

actix_web::rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
Ok(Message::Text(text)) => {
if let Ok(message) =
serde_json::from_str::<ClientToServerMessage>(&text)
{
Expand All @@ -159,10 +159,14 @@ pub async fn ws_init(
status.profile_name = profile_name;
status.last_update = Utc::now();

let user_status = status.clone();
// We drop the pair to avoid holding the lock for too long
drop(pair);

let _ = broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate {
status: status.clone(),
status: user_status,
},
&pool,
&db,
Expand All @@ -175,15 +179,14 @@ pub async fn ws_init(
}
}

Ok(AggregatedMessage::Close(_)) => {
Ok(Message::Close(_)) => {
let _ = close_socket(user.id, &pool, &db).await;
}

Ok(AggregatedMessage::Ping(msg)) => {
if let Some(mut socket) = db.auth_sockets.get_mut(&user.id)
{
let (_, socket) = socket.value_mut();
let _ = socket.pong(&msg).await;
Ok(Message::Ping(msg)) => {
if let Some(socket) = db.auth_sockets.get(&user.id) {
let (_, socket) = socket.value();
let _ = socket.clone().pong(&msg).await;
}
}

Expand Down Expand Up @@ -218,12 +221,11 @@ pub async fn broadcast_friends(
};

if friend.accepted {
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
{
let (_, socket) = socket.value_mut();
if let Some(socket) = sockets.auth_sockets.get(&friend_id.into()) {
let (_, socket) = socket.value();

let _ = socket.text(serde_json::to_string(&message)?).await;
let _ =
socket.clone().text(serde_json::to_string(&message)?).await;
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions apps/labrinth/src/routes/v3/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ pub async fn add_friend(
) -> Result<(), ApiError> {
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
let (friend_status, _) = pair.value();
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
if let Some(socket) =
sockets.auth_sockets.get(&friend_id.into())
{
let (_, socket) = socket.value_mut();
let (_, socket) = socket.value();

let _ = socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::StatusUpdate {
status: friend_status.clone(),
Expand Down Expand Up @@ -120,11 +121,11 @@ pub async fn add_friend(
.insert(&mut transaction)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into())
{
let (_, socket) = socket.value_mut();
if let Some(socket) = db.auth_sockets.get(&friend.id.into()) {
let (_, socket) = socket.value();

if socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequest { from: user.id },
)?)
Expand Down Expand Up @@ -177,10 +178,11 @@ pub async fn remove_friend(
)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) {
let (_, socket) = socket.value_mut();
if let Some(socket) = db.auth_sockets.get(&friend.id.into()) {
let (_, socket) = socket.value();

let _ = socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequestRejected {
from: user.id,
Expand Down
Loading