From 571cbddb867ed11d9b28216ab47901cea4cc3da7 Mon Sep 17 00:00:00 2001 From: Ishaan Goel Date: Thu, 2 Jan 2025 10:54:44 +0530 Subject: [PATCH] code restructuring - WIP --- zung_torrent/src/client/mod.rs | 4 +- zung_torrent/src/lib.rs | 12 +- zung_torrent/src/sources/mod.rs | 160 ++++++++++----- zung_torrent/src/sources/trackers/mod.rs | 198 ++++++++++++------- zung_torrent/src/sources/trackers/request.rs | 2 +- zung_torrent/tests/sources.rs | 28 --- 6 files changed, 240 insertions(+), 164 deletions(-) diff --git a/zung_torrent/src/client/mod.rs b/zung_torrent/src/client/mod.rs index d9a5fb7..c71f82f 100644 --- a/zung_torrent/src/client/mod.rs +++ b/zung_torrent/src/client/mod.rs @@ -15,7 +15,7 @@ use std::{ use crate::{ meta_info::{FileTree, InfoHash, SortOrd}, - sources::{DownloadSources, HttpSeederList, TrackerList}, + sources::{DownloadSources, HttpSeederList, Tracker}, MetaInfo, }; @@ -356,7 +356,7 @@ impl Client { /// Prints the download sources generated from the [`MetaInfo`] file to stdout. pub fn print_download_sources(&self) { #[inline] - fn print_trackers(tracker_list: TrackerList) { + fn print_trackers(tracker_list: Vec) { print_header("Trackers"); for (mut i, tracker) in tracker_list.iter().enumerate() { i += 1; diff --git a/zung_torrent/src/lib.rs b/zung_torrent/src/lib.rs index 92fdd82..2f28323 100644 --- a/zung_torrent/src/lib.rs +++ b/zung_torrent/src/lib.rs @@ -3,11 +3,11 @@ #[cfg(feature = "client")] mod client; pub mod meta_info; -// pub mod parked_sources; pub mod sources; pub use client::Client; pub use client::PeerID; +use colored::Colorize; use meta_info::MetaInfo; use clap::{Args, Subcommand}; @@ -72,10 +72,14 @@ impl TorrentArgs { } TorrentCommands::Test { file } => { let torrent = Client::new(file)?; - torrent + let tracker = torrent .sources() - .tracker_responses(torrent.info_hash().as_encoded(), torrent.peer_id()) - .await; + .connect(torrent.info_hash().as_encoded(), torrent.peer_id()) + .await + .unwrap(); + for t in tracker { + println!("Recived response: {}", t.url().green()) + } } } diff --git a/zung_torrent/src/sources/mod.rs b/zung_torrent/src/sources/mod.rs index 102ebfa..600f5af 100644 --- a/zung_torrent/src/sources/mod.rs +++ b/zung_torrent/src/sources/mod.rs @@ -5,19 +5,28 @@ //! sources from metadata, allowing a torrent client to efficiently pull data from either or both //! types of sources based on the information contained in the [`MetaInfo`] file. +use std::{ + net::Ipv4Addr, + sync::{Arc, Mutex}, +}; + use crate::{ meta_info::{InfoHashEncoded, MetaInfo}, PeerID, }; use colored::Colorize; -use futures::StreamExt; +use futures::{stream::FuturesUnordered, StreamExt}; mod http_seeders; mod trackers; +use anyhow::Result; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use tokio::{net::UdpSocket, task::JoinHandle}; + pub use http_seeders::{HttpSeeder, HttpSeederList}; -pub use trackers::{Action, Event, Tracker, TrackerList, TrackerRequest}; +pub use trackers::{Action, Event, Tracker, TrackerRequest}; /// Representing different data sources (trackers and HTTP seeders) for a torrent. /// @@ -26,7 +35,7 @@ pub use trackers::{Action, Event, Tracker, TrackerList, TrackerRequest}; pub enum DownloadSources<'a> { /// Genarated if only `announce` or `announce_list` keys are specified in the [`MetaInfo`] /// file. - Trackers { tracker_list: TrackerList }, + Trackers { tracker_list: Vec }, /// Genarated if only `url_list` key is specified in the [`MetaInfo`] file. HttpSeeders { @@ -36,24 +45,24 @@ pub enum DownloadSources<'a> { /// Genarated if both `announce` / `announce_list` and `url_list` keys are specified in the /// [`MetaInfo`] file. Hybrid { - tracker_list: TrackerList, + tracker_list: Vec, http_seeder_list: HttpSeederList<'a>, }, } impl<'a> DownloadSources<'a> { pub fn new(meta_info: &'a MetaInfo) -> Self { - fn tracker_list(meta_info: &MetaInfo) -> TrackerList { + fn tracker_list(meta_info: &MetaInfo) -> Vec { // As per the torrent specification, if the `announce_list` field is present, the // `announce` field is ignored. if let Some(announce_list) = meta_info.announce_list() { - let mut tracker_list = Vec::new(); - for tracker_url in announce_list.iter().flatten() { - tracker_list.push(Tracker::new(tracker_url)); - } - TrackerList::new(tracker_list) + announce_list + .par_iter() + .flatten() + .map(|announce| Tracker::new(announce)) + .collect() } else if let Some(announce) = meta_info.announce() { - TrackerList::new(vec![Tracker::new(announce)]) + vec![Tracker::new(announce)] } else { unreachable!() } @@ -112,7 +121,7 @@ impl<'a> DownloadSources<'a> { /// } /// # } /// ``` - pub fn tracker_list(&self) -> Option<&TrackerList> { + pub fn tracker_list(&self) -> Option<&Vec> { match self { DownloadSources::Trackers { tracker_list } | DownloadSources::Hybrid { tracker_list, .. } => Some(tracker_list), @@ -166,7 +175,7 @@ impl<'a> DownloadSources<'a> { } /// Returns the hybrid_sources, if any, contained in the [`DownloadSources`]. - pub fn hybrid(&self) -> Option<(&TrackerList, &HttpSeederList)> { + pub fn hybrid(&self) -> Option<(&Vec, &HttpSeederList)> { if let Self::Hybrid { tracker_list, http_seeder_list, @@ -186,56 +195,99 @@ impl<'a> DownloadSources<'a> { matches!(self, Self::Hybrid { .. }) } - pub async fn tracker_requests( + pub async fn connect( &self, info_hash: InfoHashEncoded, peer_id: PeerID, - ) -> Option> { + ) -> Option> { if let Some(list) = self.tracker_list() { - let mut result = Vec::with_capacity(list.len()); - let mut request_futures = list.generate_requests(info_hash, peer_id).await; - while let Some(request) = request_futures.next().await { - match request { - Ok(Ok(tracker_request)) => result.push(tracker_request), - Err(e) => eprintln!("{e}"), - Ok(Err(e)) => eprintln!("{e}"), - } - } - Some(result) - } else { - None - } - } + let futures: FuturesUnordered>> = list + .iter() + .cloned() + .map(|tracker| { + tokio::spawn(async move { + let socket = + Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); + tracker.connect(socket, info_hash, peer_id).await + }) + }) + .collect(); - pub async fn tracker_responses( - &self, - info_hash: InfoHashEncoded, - peer_id: PeerID, - ) -> Option<()> { - if let Some(list) = self.tracker_list() { - let request_futures = list.generate_requests(info_hash, peer_id).await; - request_futures - .for_each_concurrent(None, |request| async move { - println!("Connecting"); - match request { - Ok(Ok(tracker_request)) => { - let _ = tokio::spawn(async move { - println!("{}", "from thread".green()); - let a = tracker_request.make_request().await; - if let Ok(resp) = a { - println!("{resp}") - } - }) - .await; + let result = Arc::new(Mutex::new(Vec::with_capacity(list.len()))); + + futures + .for_each_concurrent(None, |connection| { + let result = Arc::clone(&result); + async move { + match connection { + Ok(Ok(value)) => { + println!("Connected with {}", value.url()); + result.lock().expect("thread failed").push(value); + } + Ok(Err(e)) => eprintln!("{}", e.to_string().red()), + Err(e) => eprintln!("{}", e.to_string().red()), } - Err(e) => eprintln!("{e}"), - Ok(Err(e)) => eprintln!("{e}"), } }) .await; - Some(()) - } else { - None + + let vec = Arc::try_unwrap(result).unwrap().into_inner().unwrap(); + return Some(vec); } + + None } + + // pub async fn tracker_requests( + // &self, + // info_hash: InfoHashEncoded, + // peer_id: PeerID, + // ) -> Option> { + // if let Some(list) = self.tracker_list() { + // let mut result = Vec::with_capacity(list.len()); + // let mut request_futures = list.generate_requests(info_hash, peer_id).await; + // while let Some(request) = request_futures.next().await { + // match request { + // Ok(Ok(tracker_request)) => result.push(tracker_request), + // Err(e) => eprintln!("{e}"), + // Ok(Err(e)) => eprintln!("{e}"), + // } + // } + // Some(result) + // } else { + // None + // } + // } + // + // pub async fn tracker_responses( + // &self, + // info_hash: InfoHashEncoded, + // peer_id: PeerID, + // ) -> Option<()> { + // if let Some(list) = self.tracker_list() { + // let request_futures = list.generate_requests(info_hash, peer_id).await; + // request_futures + // .for_each_concurrent(None, |request| async move { + // println!("Connecting"); + // match request { + // Ok(Ok(tracker_request)) => { + // let _ = tokio::spawn(async move { + // println!("{}", "from thread".green()); + // let a = tracker_request.make_request().await; + // if let Ok(resp) = a { + // println!("{resp}") + // } + // }) + // .await; + // } + // Err(e) => eprintln!("{e}"), + // Ok(Err(e)) => eprintln!("{e}"), + // } + // }) + // .await; + // Some(()) + // } else { + // None + // } + // } } diff --git a/zung_torrent/src/sources/trackers/mod.rs b/zung_torrent/src/sources/trackers/mod.rs index 3bfeac7..7ebffad 100644 --- a/zung_torrent/src/sources/trackers/mod.rs +++ b/zung_torrent/src/sources/trackers/mod.rs @@ -9,25 +9,71 @@ mod request; pub use request::*; +use zung_parsers::bencode; use std::sync::Arc; -use std::{net::Ipv4Addr, ops::Deref}; use crate::meta_info::InfoHashEncoded; use crate::PeerID; use anyhow::{bail, Result}; -use futures::stream::FuturesUnordered; -use tokio::{net::UdpSocket, task::JoinHandle}; +use tokio::net::UdpSocket; + +#[derive(Debug, Clone)] +pub struct Tracker { + url: TrackerUrl, + response: Box, + connected: bool, + trys: u8, +} + +impl Tracker { + pub fn new(url: &str) -> Self { + Self { + url: TrackerUrl::new(url), + response: Box::new(bencode::Value::Integer(0)), + connected: false, + trys: 0, + } + } + + pub fn url(&self) -> &str { + self.url.url() + } + + pub async fn connect( + mut self, + socket: Arc, + info_hash: InfoHashEncoded, + peer_id: PeerID, + ) -> Result { + // Generate Tracker request. + // - Http => Generates a url. + // - UDP => Sends a UDP connect request + let request = self + .url + .generate_request(socket, info_hash, peer_id) + .await?; + + // Make the HTTP or UDP request to recive a TrackerResponse + let response = request.make_request().await?; + + self.response = Box::new(response); + self.connected = true; + self.trys += 1; + + Ok(self) + } +} // TODO: Look into SmallStr #[derive(Debug)] -pub enum Tracker { +pub enum TrackerUrl { Http(Arc), Udp(Arc), Invalid(Arc), } -impl Clone for Tracker { +impl Clone for TrackerUrl { fn clone(&self) -> Self { match self { Self::Http(arg0) => Self::Http(Arc::clone(arg0)), @@ -37,7 +83,7 @@ impl Clone for Tracker { } } -impl Tracker { +impl TrackerUrl { pub fn new(tracker_url: &str) -> Self { if tracker_url.starts_with("http") { Self::Http(Arc::from(tracker_url)) @@ -50,9 +96,9 @@ impl Tracker { pub fn url(&self) -> &str { match self { - Tracker::Http(s) => s, - Tracker::Udp(s) => s, - Tracker::Invalid(s) => s, + TrackerUrl::Http(s) => s, + TrackerUrl::Udp(s) => s, + TrackerUrl::Invalid(s) => s, } } @@ -63,11 +109,11 @@ impl Tracker { peer_id: PeerID, ) -> Result { match self { - Tracker::Http(url) => Ok(TrackerRequest::Http { + TrackerUrl::Http(url) => Ok(TrackerRequest::Http { url: url.clone(), params: HttpTrackerRequestParams::new(info_hash, peer_id), }), - Tracker::Udp(url) => { + TrackerUrl::Udp(url) => { let udp_url = url.strip_prefix("udp://").unwrap(); let udp_url = match udp_url.split_once("/") { Some(s) => s.0, @@ -82,73 +128,75 @@ impl Tracker { params: UdpTrackerRequestParams::new(connection_id, info_hash, peer_id), }) } - Tracker::Invalid(url) => bail!("Unsupproted : {url}"), + TrackerUrl::Invalid(url) => bail!("Unsupproted : {url}"), } } } -#[derive(Debug, Clone)] -pub struct TrackerList { - tracker_list: Vec, -} - -impl TrackerList { - pub(crate) fn new(tracker_list: Vec) -> Self { - Self { tracker_list } - } - - fn as_array(&self) -> &[Tracker] { - &self.tracker_list - } - - /// Consumes the tracker list and returns the internal Vec of [`Tracker`]s. - pub fn into_vec(self) -> Vec { - self.tracker_list - } - - /// Asyncly generates the [`TrackerRequest`] - /// - // TODO: Revisit this if there is a faster more efficient way. - pub async fn generate_requests( - &self, - info_hash: InfoHashEncoded, - peer_id: PeerID, - ) -> FuturesUnordered>> { - let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); - - self.as_array() - .iter() - .cloned() // The clone here is just Arc::clone - .map(|tracker| { - let socket = Arc::clone(&socket); - tokio::spawn( - async move { tracker.generate_request(socket, info_hash, peer_id).await }, - ) - }) - .collect() - } -} - -impl Deref for TrackerList { - type Target = [Tracker]; - - fn deref(&self) -> &Self::Target { - self.as_array() - } -} - -// Iterator implementation -impl<'a> IntoIterator for &'a TrackerList { - type Item = &'a Tracker; - type IntoIter = std::slice::Iter<'a, Tracker>; - - fn into_iter(self) -> Self::IntoIter { - self.tracker_list.iter() - } -} +// #[derive(Debug, Clone)] +// pub struct TrackerList { +// tracker_list: Vec, +// } +// +// impl TrackerList { +// pub(crate) fn new(tracker_list: Vec) -> Self { +// Self { tracker_list } +// } +// +// fn as_array(&self) -> &[TrackerUrl] { +// &self.tracker_list +// } +// +// /// Consumes the tracker list and returns the internal Vec of [`Tracker`]s. +// pub fn into_vec(self) -> Vec { +// self.tracker_list +// } +// +// /// Asyncly generates the [`TrackerRequest`] +// /// +// // TODO: Revisit this if there is a faster more efficient way. +// pub async fn generate_requests( +// &self, +// info_hash: InfoHashEncoded, +// peer_id: PeerID, +// ) -> FuturesUnordered>> { +// let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); +// +// self.as_array() +// .iter() +// .cloned() // The clone here is just Arc::clone +// .map(|tracker| { +// let socket = Arc::clone(&socket); +// tokio::spawn( +// async move { tracker.generate_request(socket, info_hash, peer_id).await }, +// ) +// }) +// .collect() +// } +// } +// +// impl Deref for TrackerList { +// type Target = [TrackerUrl]; +// +// fn deref(&self) -> &Self::Target { +// self.as_array() +// } +// } +// +// // Iterator implementation +// impl<'a> IntoIterator for &'a TrackerList { +// type Item = &'a TrackerUrl; +// type IntoIter = std::slice::Iter<'a, TrackerUrl>; +// +// fn into_iter(self) -> Self::IntoIter { +// self.tracker_list.iter() +// } +// } #[cfg(test)] mod tracker_tests { + use std::net::Ipv4Addr; + use super::*; use crate::meta_info::InfoHash; @@ -159,7 +207,7 @@ mod tracker_tests { let info_hash = InfoHash::new(b"test info_hash").as_encoded(); let peer_id = PeerID::default(); let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); - let tracker_request = Tracker::new(sample_url); + let tracker_request = TrackerUrl::new(sample_url); let tracker_request = tracker_request .generate_request(socket, info_hash, peer_id) .await @@ -190,7 +238,7 @@ mod tracker_tests { let info_hash = InfoHash::new(b"test info_hash").as_encoded(); let peer_id = PeerID::default(); let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); - let tracker_request = Tracker::new(url); + let tracker_request = TrackerUrl::new(url); let tracker_request = tracker_request .generate_request(socket, info_hash, peer_id) .await @@ -221,7 +269,7 @@ mod tracker_tests { let info_hash = InfoHash::new(b"test info_hash").as_encoded(); let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); let peer_id = PeerID::default(); - let tracker_request = Tracker::new(url); + let tracker_request = TrackerUrl::new(url); let mut tracker_request = tracker_request .generate_request(socket, info_hash, peer_id) .await @@ -265,7 +313,7 @@ mod tracker_tests { let info_hash = InfoHash::new(b"test info_hash").as_encoded(); let socket = Arc::new(UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await.unwrap()); let peer_id = PeerID::default(); - let tracker_request = Tracker::new(url); + let tracker_request = TrackerUrl::new(url); let mut tracker_request = tracker_request .generate_request(socket, info_hash, peer_id) .await diff --git a/zung_torrent/src/sources/trackers/request.rs b/zung_torrent/src/sources/trackers/request.rs index 5842fc4..87d4701 100644 --- a/zung_torrent/src/sources/trackers/request.rs +++ b/zung_torrent/src/sources/trackers/request.rs @@ -95,7 +95,7 @@ impl TrackerRequest { Ok(response) } TrackerRequest::Udp { .. } => { - println!("To be implemented"); + println!("To be implemented {}", self.to_url()?); Ok(bencode::Value::Integer(3)) } } diff --git a/zung_torrent/tests/sources.rs b/zung_torrent/tests/sources.rs index 8aa0974..0e4ce25 100644 --- a/zung_torrent/tests/sources.rs +++ b/zung_torrent/tests/sources.rs @@ -1,4 +1,3 @@ -use futures::StreamExt; use utilities::torrent::CLIENT; use zung_torrent::sources::DownloadSources; @@ -45,30 +44,3 @@ fn mit_source() { } } } - -#[tokio::test] -async fn kali_source() { - let kali = &CLIENT.kali; - - let mut list = kali - .sources() - .tracker_list() - .unwrap() - .generate_requests(kali.info_hash().as_encoded(), kali.peer_id()) - .await; - - // Waits for ALL futures to complete - while let Some(result) = list.next().await { - let Ok(a) = result else { continue }; - if let Ok(a) = a { - if a.is_http() { - assert!(a - .to_url() - .unwrap() - .contains(&kali.info_hash().to_url_encoded())) - } else if a.is_udp() { - assert!(a.connection_id().is_some()) - } - } - } -}