Skip to content

Commit

Permalink
code restructuring - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ishaan26 committed Jan 2, 2025
1 parent acfe62c commit 571cbdd
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 164 deletions.
4 changes: 2 additions & 2 deletions zung_torrent/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use crate::{
meta_info::{FileTree, InfoHash, SortOrd},
sources::{DownloadSources, HttpSeederList, TrackerList},
sources::{DownloadSources, HttpSeederList, Tracker},
MetaInfo,
};

Expand Down Expand Up @@ -356,7 +356,7 @@ impl Client {
/// Prints the download sources generated from the [`MetaInfo`] file to stdout.
pub fn print_download_sources(&self) {
#[inline]
fn print_trackers(tracker_list: TrackerList) {
fn print_trackers(tracker_list: Vec<Tracker>) {

Check warning on line 359 in zung_torrent/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/client/mod.rs#L359

Added line #L359 was not covered by tests
print_header("Trackers");
for (mut i, tracker) in tracker_list.iter().enumerate() {
i += 1;
Expand Down
12 changes: 8 additions & 4 deletions zung_torrent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#[cfg(feature = "client")]
mod client;
pub mod meta_info;
// pub mod parked_sources;
pub mod sources;

pub use client::Client;
pub use client::PeerID;
use colored::Colorize;
use meta_info::MetaInfo;

use clap::{Args, Subcommand};
Expand Down Expand Up @@ -72,10 +72,14 @@ impl TorrentArgs {
}
TorrentCommands::Test { file } => {
let torrent = Client::new(file)?;
torrent
let tracker = torrent
.sources()
.tracker_responses(torrent.info_hash().as_encoded(), torrent.peer_id())
.await;
.connect(torrent.info_hash().as_encoded(), torrent.peer_id())
.await
.unwrap();
for t in tracker {
println!("Recived response: {}", t.url().green())
}
}
}

Expand Down
160 changes: 106 additions & 54 deletions zung_torrent/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@
//! sources from metadata, allowing a torrent client to efficiently pull data from either or both
//! types of sources based on the information contained in the [`MetaInfo`] file.
use std::{
net::Ipv4Addr,
sync::{Arc, Mutex},
};

use crate::{
meta_info::{InfoHashEncoded, MetaInfo},
PeerID,
};

use colored::Colorize;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, StreamExt};

mod http_seeders;
mod trackers;

use anyhow::Result;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use tokio::{net::UdpSocket, task::JoinHandle};

pub use http_seeders::{HttpSeeder, HttpSeederList};
pub use trackers::{Action, Event, Tracker, TrackerList, TrackerRequest};
pub use trackers::{Action, Event, Tracker, TrackerRequest};

/// Representing different data sources (trackers and HTTP seeders) for a torrent.
///
Expand All @@ -26,7 +35,7 @@ pub use trackers::{Action, Event, Tracker, TrackerList, TrackerRequest};
pub enum DownloadSources<'a> {
/// Genarated if only `announce` or `announce_list` keys are specified in the [`MetaInfo`]
/// file.
Trackers { tracker_list: TrackerList },
Trackers { tracker_list: Vec<Tracker> },

/// Genarated if only `url_list` key is specified in the [`MetaInfo`] file.
HttpSeeders {
Expand All @@ -36,24 +45,24 @@ pub enum DownloadSources<'a> {
/// Genarated if both `announce` / `announce_list` and `url_list` keys are specified in the
/// [`MetaInfo`] file.
Hybrid {
tracker_list: TrackerList,
tracker_list: Vec<Tracker>,
http_seeder_list: HttpSeederList<'a>,
},
}

impl<'a> DownloadSources<'a> {
pub fn new(meta_info: &'a MetaInfo) -> Self {
fn tracker_list(meta_info: &MetaInfo) -> TrackerList {
fn tracker_list(meta_info: &MetaInfo) -> Vec<Tracker> {
// As per the torrent specification, if the `announce_list` field is present, the
// `announce` field is ignored.
if let Some(announce_list) = meta_info.announce_list() {
let mut tracker_list = Vec::new();
for tracker_url in announce_list.iter().flatten() {
tracker_list.push(Tracker::new(tracker_url));
}
TrackerList::new(tracker_list)
announce_list
.par_iter()
.flatten()
.map(|announce| Tracker::new(announce))
.collect()
} else if let Some(announce) = meta_info.announce() {
TrackerList::new(vec![Tracker::new(announce)])
vec![Tracker::new(announce)]

Check warning on line 65 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L65

Added line #L65 was not covered by tests
} else {
unreachable!()
}
Expand Down Expand Up @@ -112,7 +121,7 @@ impl<'a> DownloadSources<'a> {
/// }
/// # }
/// ```
pub fn tracker_list(&self) -> Option<&TrackerList> {
pub fn tracker_list(&self) -> Option<&Vec<Tracker>> {
match self {
DownloadSources::Trackers { tracker_list }

Check warning on line 126 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L126

Added line #L126 was not covered by tests
| DownloadSources::Hybrid { tracker_list, .. } => Some(tracker_list),
Expand Down Expand Up @@ -166,7 +175,7 @@ impl<'a> DownloadSources<'a> {
}

/// Returns the hybrid_sources, if any, contained in the [`DownloadSources`].
pub fn hybrid(&self) -> Option<(&TrackerList, &HttpSeederList)> {
pub fn hybrid(&self) -> Option<(&Vec<Tracker>, &HttpSeederList)> {

Check warning on line 178 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L178

Added line #L178 was not covered by tests
if let Self::Hybrid {
tracker_list,
http_seeder_list,
Expand All @@ -186,56 +195,99 @@ impl<'a> DownloadSources<'a> {
matches!(self, Self::Hybrid { .. })
}

pub async fn tracker_requests(
pub async fn connect(

Check warning on line 198 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L198

Added line #L198 was not covered by tests
&self,
info_hash: InfoHashEncoded,
peer_id: PeerID,
) -> Option<Vec<TrackerRequest>> {
) -> Option<Vec<Tracker>> {
if let Some(list) = self.tracker_list() {
let mut result = Vec::with_capacity(list.len());
let mut request_futures = list.generate_requests(info_hash, peer_id).await;
while let Some(request) = request_futures.next().await {
match request {
Ok(Ok(tracker_request)) => result.push(tracker_request),
Err(e) => eprintln!("{e}"),
Ok(Err(e)) => eprintln!("{e}"),
}
}
Some(result)
} else {
None
}
}
let futures: FuturesUnordered<JoinHandle<Result<Tracker>>> = list
.iter()
.cloned()
.map(|tracker| {
tokio::spawn(async move {
let socket =
Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap());
tracker.connect(socket, info_hash, peer_id).await
})
})
.collect();

pub async fn tracker_responses(
&self,
info_hash: InfoHashEncoded,
peer_id: PeerID,
) -> Option<()> {
if let Some(list) = self.tracker_list() {
let request_futures = list.generate_requests(info_hash, peer_id).await;
request_futures
.for_each_concurrent(None, |request| async move {
println!("Connecting");
match request {
Ok(Ok(tracker_request)) => {
let _ = tokio::spawn(async move {
println!("{}", "from thread".green());
let a = tracker_request.make_request().await;
if let Ok(resp) = a {
println!("{resp}")
}
})
.await;
let result = Arc::new(Mutex::new(Vec::with_capacity(list.len())));

futures
.for_each_concurrent(None, |connection| {
let result = Arc::clone(&result);
async move {
match connection {
Ok(Ok(value)) => {
println!("Connected with {}", value.url());
result.lock().expect("thread failed").push(value);
}
Ok(Err(e)) => eprintln!("{}", e.to_string().red()),
Err(e) => eprintln!("{}", e.to_string().red()),

Check warning on line 228 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L202-L228

Added lines #L202 - L228 were not covered by tests
}
Err(e) => eprintln!("{e}"),
Ok(Err(e)) => eprintln!("{e}"),
}
})
.await;

Check warning on line 232 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L230-L232

Added lines #L230 - L232 were not covered by tests
Some(())
} else {
None

let vec = Arc::try_unwrap(result).unwrap().into_inner().unwrap();
return Some(vec);

Check warning on line 235 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L234-L235

Added lines #L234 - L235 were not covered by tests
}

None

Check warning on line 238 in zung_torrent/src/sources/mod.rs

View check run for this annotation

Codecov / codecov/patch

zung_torrent/src/sources/mod.rs#L237-L238

Added lines #L237 - L238 were not covered by tests
}

// pub async fn tracker_requests(
// &self,
// info_hash: InfoHashEncoded,
// peer_id: PeerID,
// ) -> Option<Vec<TrackerRequest>> {
// if let Some(list) = self.tracker_list() {
// let mut result = Vec::with_capacity(list.len());
// let mut request_futures = list.generate_requests(info_hash, peer_id).await;
// while let Some(request) = request_futures.next().await {
// match request {
// Ok(Ok(tracker_request)) => result.push(tracker_request),
// Err(e) => eprintln!("{e}"),
// Ok(Err(e)) => eprintln!("{e}"),
// }
// }
// Some(result)
// } else {
// None
// }
// }
//
// pub async fn tracker_responses(
// &self,
// info_hash: InfoHashEncoded,
// peer_id: PeerID,
// ) -> Option<()> {
// if let Some(list) = self.tracker_list() {
// let request_futures = list.generate_requests(info_hash, peer_id).await;
// request_futures
// .for_each_concurrent(None, |request| async move {
// println!("Connecting");
// match request {
// Ok(Ok(tracker_request)) => {
// let _ = tokio::spawn(async move {
// println!("{}", "from thread".green());
// let a = tracker_request.make_request().await;
// if let Ok(resp) = a {
// println!("{resp}")
// }
// })
// .await;
// }
// Err(e) => eprintln!("{e}"),
// Ok(Err(e)) => eprintln!("{e}"),
// }
// })
// .await;
// Some(())
// } else {
// None
// }
// }
}
Loading

0 comments on commit 571cbdd

Please sign in to comment.