From 9754f2d1c51dee70c3833192926809fbd2608619 Mon Sep 17 00:00:00 2001 From: Jai A Date: Tue, 4 Apr 2023 21:17:19 -0700 Subject: [PATCH] Add limiter for forge downloading --- .env | 1 - daedalus_client/src/forge.rs | 36 +++++++++++++++++++++++++++++--- daedalus_client/src/main.rs | 22 +++++++++---------- daedalus_client/src/minecraft.rs | 17 ++++++++++++++- 4 files changed, 59 insertions(+), 17 deletions(-) diff --git a/.env b/.env index 34bf98c..c7298bc 100644 --- a/.env +++ b/.env @@ -1,7 +1,6 @@ RUST_LOG=info,error BASE_URL=https://modrinth-cdn-staging.nyc3.digitaloceanspaces.com -BASE_FOLDER=gamedata S3_ACCESS_TOKEN=none S3_SECRET=none diff --git a/daedalus_client/src/forge.rs b/daedalus_client/src/forge.rs index 0de8cf1..7cbe847 100644 --- a/daedalus_client/src/forge.rs +++ b/daedalus_client/src/forge.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::io::Read; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::{Mutex, Semaphore}; lazy_static! { @@ -458,7 +458,22 @@ pub async fn retrieve_data( }.await }); - futures::future::try_join_all(loaders_futures).await?; + { + let mut versions = loaders_futures.into_iter().peekable(); + let mut chunk_index = 0; + while versions.peek().is_some() { + let now = Instant::now(); + + let chunk: Vec<_> = versions.by_ref().take(1).collect(); + futures::future::try_join_all(chunk).await?; + + chunk_index += 1; + + let elapsed = now.elapsed(); + info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed); + } + } + //futures::future::try_join_all(loaders_futures).await?; } versions.lock().await.push(daedalus::modded::Version { @@ -472,7 +487,22 @@ pub async fn retrieve_data( } } - futures::future::try_join_all(version_futures).await?; + { + let mut versions = version_futures.into_iter().peekable(); + let mut chunk_index = 0; + while versions.peek().is_some() { + let now = Instant::now(); + + let chunk: Vec<_> = versions.by_ref().take(10).collect(); + futures::future::try_join_all(chunk).await?; + + chunk_index += 1; + + let elapsed = now.elapsed(); + info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed); + } + } + //futures::future::try_join_all(version_futures).await?; if let Ok(versions) = Arc::try_unwrap(versions) { let mut versions = versions.into_inner(); diff --git a/daedalus_client/src/main.rs b/daedalus_client/src/main.rs index de97bc0..56b524b 100644 --- a/daedalus_client/src/main.rs +++ b/daedalus_client/src/main.rs @@ -1,4 +1,4 @@ -use log::{error, warn}; +use log::{error, info, warn}; use s3::creds::Credentials; use s3::error::S3Error; use s3::{Bucket, Region}; @@ -112,7 +112,6 @@ fn check_env_vars() -> bool { } failed |= check_var::("BASE_URL"); - failed |= check_var::("BASE_FOLDER"); failed |= check_var::("S3_ACCESS_TOKEN"); failed |= check_var::("S3_SECRET"); @@ -154,7 +153,8 @@ pub async fn upload_file_to_bucket( semaphore: Arc, ) -> Result<(), Error> { let _permit = semaphore.acquire().await?; - let key = format!("{}/{}", &*dotenvy::var("BASE_FOLDER").unwrap(), path); + info!("{} started uploading", path); + let key = path.clone(); for attempt in 1..=4 { let result = if let Some(ref content_type) = content_type { @@ -166,16 +166,13 @@ pub async fn upload_file_to_bucket( } .map_err(|err| Error::S3Error { inner: err, - file: format!( - "{}/{}", - &*dotenvy::var("BASE_FOLDER").unwrap(), - path - ), + file: path.clone(), }); match result { Ok(_) => { { + info!("{} done uploading", path); let mut uploaded_files = uploaded_files.lock().await; uploaded_files.push(key); } @@ -188,15 +185,13 @@ pub async fn upload_file_to_bucket( } } } - unreachable!() } pub fn format_url(path: &str) -> String { format!( - "{}/{}/{}", + "{}/{}", &*dotenvy::var("BASE_URL").unwrap(), - &*dotenvy::var("BASE_FOLDER").unwrap(), path ) } @@ -207,7 +202,9 @@ pub async fn download_file( semaphore: Arc, ) -> Result { let _permit = semaphore.acquire().await?; + info!("{} started downloading", url); let val = daedalus::download_file(url, sha1).await?; + info!("{} finished downloading", url); Ok(val) } @@ -218,8 +215,9 @@ pub async fn download_file_mirrors( semaphore: Arc, ) -> Result { let _permit = semaphore.acquire().await?; - + info!("{} started downloading", base); let val = daedalus::download_file_mirrors(base, mirrors, sha1).await?; + info!("{} finished downloading", base); Ok(val) } diff --git a/daedalus_client/src/minecraft.rs b/daedalus_client/src/minecraft.rs index 256eef7..024ce3d 100644 --- a/daedalus_client/src/minecraft.rs +++ b/daedalus_client/src/minecraft.rs @@ -147,7 +147,22 @@ pub async fn retrieve_data( }) } - futures::future::try_join_all(version_futures).await?; + { + let mut versions = version_futures.into_iter().peekable(); + let mut chunk_index = 0; + while versions.peek().is_some() { + let now = Instant::now(); + + let chunk: Vec<_> = versions.by_ref().take(100).collect(); + futures::future::try_join_all(chunk).await?; + + chunk_index += 1; + + let elapsed = now.elapsed(); + info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed); + } + } + //futures::future::try_join_all(version_futures).await?; upload_file_to_bucket( format!(