Skip to content

Commit

Permalink
chore(adapter): change trait error type
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op committed Oct 3, 2024
1 parent ce278f7 commit 9fcacc4
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 161 deletions.
16 changes: 8 additions & 8 deletions boltconn/src/adapter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::adapter::{
UdpTransferType,
};
use async_trait::async_trait;
use std::io;
use std::sync::Arc;

use crate::common::duplex_chan::DuplexChan;
use crate::common::StreamOutboundTrait;
use crate::proxy::error::TransportError;
use crate::proxy::ConnAbortHandle;
use crate::transport::UdpSocketAdapter;
use tokio::task::JoinHandle;
Expand All @@ -32,7 +32,7 @@ impl ChainOutbound {
mut inbound_tcp_container: Option<Connector>,
mut inbound_udp_container: Option<AddrConnector>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(async move {
let mut not_first_jump = false;
let mut need_next_jump = true;
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Outbound for ChainOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<std::io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
self.clone().spawn(true, Some(inbound), None, abort_handle)
}

Expand All @@ -146,17 +146,17 @@ impl Outbound for ChainOutbound {
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_tcp_with_outbound() should not be called with ChainOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}

fn spawn_udp(
&self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
self.clone().spawn(false, None, Some(inbound), abort_handle)
}

Expand All @@ -167,8 +167,8 @@ impl Outbound for ChainOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with ChainUdpOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalod outbound"))
}
}
21 changes: 12 additions & 9 deletions boltconn/src/adapter/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::proxy::error::TransportError;
use crate::proxy::{ConnAbortHandle, NetworkAddr};
use crate::transport::UdpSocketAdapter;
use async_trait::async_trait;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
Expand Down Expand Up @@ -37,7 +36,11 @@ impl DirectOutbound {
}
}

async fn run_tcp(self, inbound: Connector, abort_handle: ConnAbortHandle) -> io::Result<()> {
async fn run_tcp(
self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> Result<(), TransportError> {
let dst_addr = if let Some(dst) = self.resolved_dst {
dst
} else {
Expand All @@ -53,7 +56,7 @@ impl DirectOutbound {
self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
) -> io::Result<()> {
) -> Result<(), TransportError> {
let outbound = Arc::new(Egress::new(&self.iface_name).udpv4_socket().await?);
established_udp(
self.id(),
Expand Down Expand Up @@ -81,7 +84,7 @@ impl Outbound for DirectOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(self.clone().run_tcp(inbound, abort_handle))
}

Expand All @@ -91,17 +94,17 @@ impl Outbound for DirectOutbound {
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_tcp_with_outbound() should not be called with DirectOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}

fn spawn_udp(
&self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(self.clone().run_udp(inbound, abort_handle))
}

Expand All @@ -112,9 +115,9 @@ impl Outbound for DirectOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with DirectOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}
}

Expand Down
31 changes: 14 additions & 17 deletions boltconn/src/adapter/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use crate::common::{io_err, StreamOutboundTrait};
use crate::config::AuthData;
use crate::network::dns::Dns;
use crate::network::egress::Egress;
use crate::proxy::error::TransportError;
use crate::proxy::{ConnAbortHandle, NetworkAddr};
use crate::transport::UdpSocketAdapter;
use async_trait::async_trait;
use base64::Engine;
use httparse::Response;
use std::io;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl HttpOutbound {
inbound: Connector,
mut outbound: S,
abort_handle: ConnAbortHandle,
) -> io::Result<()>
) -> Result<(), TransportError>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Expand All @@ -79,25 +79,25 @@ impl HttpOutbound {
let mut resp = String::new();
while !resp.ends_with("\r\n\r\n") {
if buf_reader.read_line(&mut resp).await? == 0 {
return Err(io_err("EOF"));
return Err(TransportError::Http("EOF"));
}
if resp.len() > 4096 {
return Err(io_err("Too long resp"));
return Err(TransportError::Http("Response too long"));
}
}
let mut buf = [httparse::EMPTY_HEADER; 16];
let mut resp_struct = Response::new(buf.as_mut());
resp_struct
.parse(resp.as_bytes())
.map_err(|_| io_err("Parse response failed"))?;
.map_err(|_| TransportError::Http("Parsing failed"))?;
if let Some(200) = resp_struct.code {
let tcp_stream = buf_reader.into_inner();
established_tcp(self.name, inbound, tcp_stream, abort_handle).await;
Ok(())
} else {
Err(io_err(
Err(TransportError::Io(io_err(
format!("Http Connect Failed: {:?}", resp_struct.code).as_str(),
))
)))
}
}
}
Expand All @@ -116,18 +116,15 @@ impl Outbound for HttpOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
let self_clone = self.clone();
tokio::spawn(async move {
let server_addr =
lookup(self_clone.dns.as_ref(), &self_clone.config.server_addr).await?;
let tcp_stream = Egress::new(&self_clone.iface_name)
.tcp_stream(server_addr)
.await?;
self_clone
.run_tcp(inbound, tcp_stream, abort_handle)
.await
.map_err(|e| io_err(e.to_string().as_str()))
self_clone.run_tcp(inbound, tcp_stream, abort_handle).await
})
}

Expand All @@ -137,10 +134,10 @@ impl Outbound for HttpOutbound {
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid HTTP proxy tcp spawn");
return Err(io::ErrorKind::InvalidData.into());
return Err(TransportError::Internal("Invalid outbound"));
}
let self_clone = self.clone();
tokio::spawn(async move {
Expand All @@ -157,7 +154,7 @@ impl Outbound for HttpOutbound {
_inbound: AddrConnector,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tracing::error!("spawn_udp() should not be called with HttpOutbound");
empty_handle()
}
Expand All @@ -169,8 +166,8 @@ impl Outbound for HttpOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with HttpOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}
}
19 changes: 9 additions & 10 deletions boltconn/src/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub use direct::*;
pub use socks5::*;
pub use ssh::*;
use std::future::Future;
use std::io::ErrorKind;
pub use tcp_adapter::*;
pub use trojan::*;
pub use udp_adapter::*;
Expand Down Expand Up @@ -178,7 +177,7 @@ pub trait Outbound: Send + Sync {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>>;
) -> JoinHandle<Result<(), TransportError>>;

/// Return whether outbound is used
async fn spawn_tcp_with_outbound(
Expand All @@ -187,15 +186,15 @@ pub trait Outbound: Send + Sync {
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> io::Result<bool>;
) -> Result<bool, TransportError>;

/// Run with tokio::spawn.
fn spawn_udp(
&self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> JoinHandle<io::Result<()>>;
) -> JoinHandle<Result<(), TransportError>>;

/// Return whether outbound is used
async fn spawn_udp_with_outbound(
Expand All @@ -205,11 +204,11 @@ pub trait Outbound: Send + Sync {
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> io::Result<bool>;
) -> Result<bool, TransportError>;
}

fn empty_handle() -> JoinHandle<io::Result<()>> {
tokio::spawn(async move { Err(io_err("Invalid spawn")) })
fn empty_handle() -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(async move { Err(TransportError::Internal("Invalid spawn")) })
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -451,14 +450,14 @@ pub(super) async fn get_dst(dns: &Dns, dst: &NetworkAddr) -> io::Result<SocketAd
})
}

pub(super) async fn connect_timeout<F: Future<Output = io::Result<()>>>(
pub(super) async fn connect_timeout<F: Future<Output = Result<(), TransportError>>>(
future: F,
component_str: &str,
) -> io::Result<()> {
) -> Result<(), TransportError> {
tokio::time::timeout(Duration::from_secs(10), future)
.await
.unwrap_or_else(|_| {
tracing::debug!("{} timeout after 10s", component_str);
Err(ErrorKind::TimedOut.into())
Err(TransportError::Timeout("connect"))
})
}
22 changes: 12 additions & 10 deletions boltconn/src/adapter/shadowsocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::adapter::{
established_tcp, established_udp, lookup, AddrConnector, Connector, Outbound, OutboundType,
};

use crate::common::{io_err, StreamOutboundTrait};
use crate::common::StreamOutboundTrait;
use crate::network::dns::Dns;
use crate::network::egress::Egress;
use crate::proxy::error::TransportError;
Expand Down Expand Up @@ -101,7 +101,7 @@ impl SSOutbound {
outbound: S,
server_addr: SocketAddr,
abort_handle: ConnAbortHandle,
) -> io::Result<()>
) -> Result<(), TransportError>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Expand All @@ -119,7 +119,7 @@ impl SSOutbound {
server_addr: SocketAddr,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> io::Result<()> {
) -> Result<(), TransportError> {
let (_, context, resolved_config) = self.create_internal(server_addr).await;
let proxy_socket = ShadowsocksUdpAdapter::new(context, &resolved_config, adapter_or_socket);
established_udp(
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Outbound for SSOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
let self_clone = self.clone();
tokio::spawn(async move {
let server_addr = self_clone.get_server_addr().await?;
Expand All @@ -167,10 +167,10 @@ impl Outbound for SSOutbound {
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid Shadowsocks tcp spawn");
return Err(io::ErrorKind::InvalidData.into());
return Err(TransportError::Internal("Invalid outbound"));
}
let self_clone = self.clone();
tokio::spawn(async move {
Expand All @@ -187,14 +187,16 @@ impl Outbound for SSOutbound {
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
let self_clone = self.clone();
tokio::spawn(async move {
let server_addr = self_clone.get_server_addr().await?;
let out_sock = {
let socket = match server_addr {
SocketAddr::V4(_) => Egress::new(&self_clone.iface_name).udpv4_socket().await?,
SocketAddr::V6(_) => return Err(io_err("ss ipv6 udp not supported now")),
SocketAddr::V6(_) => {
return Err(TransportError::Internal("IPv6 not supported"))
}
};
socket.connect(server_addr).await?;
socket
Expand All @@ -218,10 +220,10 @@ impl Outbound for SSOutbound {
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
if tcp_outbound.is_some() || udp_outbound.is_none() {
tracing::error!("Invalid Shadowsocks UDP outbound ancestor");
return Err(io::ErrorKind::InvalidData.into());
return Err(TransportError::Internal("Invalid outbound"));
}
let udp_outbound = udp_outbound.unwrap();
let self_clone = self.clone();
Expand Down
Loading

0 comments on commit 9fcacc4

Please sign in to comment.