diff --git a/README.md b/README.md index 60dd588..13e735a 100644 --- a/README.md +++ b/README.md @@ -41,3 +41,81 @@ just deploy-agent - [ ] Build an XDP-based BGP Peering Router - [ ] Implement Service Load Balancing - [ ] Collect Network Telemetry with eBPF + +### TCP Acceleration + +An eBPF program has been applied to accelerate TCP transmission between pods communicating on the same host machine. This avoids unnecessary traversing through the Linux network stack, enabling efficient communication between local socket pairs. + +#### Without eBPF Acceleration + +```sh +Get "http://10.244.1.2" with for 10s using 50 connections +Statistics Avg Stdev Max + Reqs/sec 77688.89 1898.47 80410.00 + Latency 592.08µs 411.46µs 8.69ms + Latency Distribution + 50% 309.77µs + 75% 409.98µs + 90% 490.44µs + 99% 571.86µs + HTTP codes: + 1XX - 0, 2XX - 777807, 3XX - 0, 4XX - 0, 5XX - 0 + others - 0 + Throughput: 84447.90/s + +Get "http://10.244.1.2" with for 30s using 500 connections +Statistics Avg Stdev Max + Reqs/sec 73050.03 1374.04 75450.00 + Latency 791.10µs 822.26µs 54.64ms + Latency Distribution + 50% 361.55µs + 75% 503.02µs + 90% 622.58µs + 99% 749.10µs + HTTP codes: + 1XX - 0, 2XX - 2192021, 3XX - 0, 4XX - 0, 5XX - 0 + others - 0 + Throughput: 632031.35/s +``` + +### With eBPF Acceleration + +```sh +Get "http://10.244.1.2" with for 10s using 50 connections +Statistics Avg Stdev Max + Reqs/sec 81633.44 1638.01 84030.00 + Latency 539.51µs 366.21µs 11.88ms + Latency Distribution + 50% 285.54µs + 75% 377.27µs + 90% 449.91µs + 99% 521.56µs + HTTP codes: + 1XX - 0, 2XX - 812374, 3XX - 0, 4XX - 0, 5XX - 0 + others - 0 + Throughput: 92676.69/s + +Get "http://10.244.1.2" with for 30s using 500 connections +Statistics Avg Stdev Max + Reqs/sec 76810.21 1745.58 79262.00 + Latency 650.09µs 714.47µs 61.40ms + Latency Distribution + 50% 305.78µs + 75% 422.25µs + 90% 518.34µs + 99% 616.42µs + HTTP codes: + 1XX - 0, 2XX - 2305881, 3XX - 0, 4XX - 0, 5XX - 0 + others - 0 + Throughput: 769121.91/s +``` + +Tests were conducted with 50 connections for 10 seconds and 500 connections for 30 seconds. +The results indicate an increase in the average request rate and throughput, and a decrease in latency. + +| Test Case | Requests/sec | Throughput | Latency | +| --- | --- | --- | --- | +| Without Acceleration (10s) | 77688.89 | 84447.90/s | 592.08µs | +| With Acceleration (10s) | 81633.44 | 92676.69/s | 539.51µs | +| Without Acceleration (30s) | 73050.03 | 632031.35/s | 791.10µs | +| With Acceleration (30s) | 76810.21 | 769121.91/s | 650.09µs | diff --git a/agent/Cargo.toml b/agent/Cargo.toml index b1536bd..c79e13a 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -14,10 +14,8 @@ ws = ["kube/ws"] sinabro-config = { path = "../config" } axum = "0.7.2" -aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", features = [ - "async_tokio", -] } -aya-log = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" } +aya = { version = "0.12", features = ["async_tokio"] } +aya-log = "0.2" clap = { version = "4.1", features = ["derive"] } common = { path = "../common", features = ["user"] } anyhow = "1" diff --git a/agent/src/bpf_loader.rs b/agent/src/bpf_loader.rs index ad3464d..4b8a32a 100644 --- a/agent/src/bpf_loader.rs +++ b/agent/src/bpf_loader.rs @@ -1,18 +1,19 @@ use std::net::Ipv4Addr; use anyhow::Result; -use aya::maps::HashMap; -use aya::programs::{tc, SchedClassifier, TcAttachType}; +use aya::maps::{HashMap, SockHash}; +use aya::programs::{tc, SchedClassifier, SkMsg, SockOps, TcAttachType}; use aya::{include_bytes_aligned, Bpf}; -use common::{NetworkInfo, CLUSTER_CIDR_KEY, HOST_IP_KEY}; +use common::{NetworkInfo, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY}; pub struct BpfLoader { pub bpf: Bpf, iface: String, + cgroup_path: String, } impl BpfLoader { - pub fn load(iface: &str) -> Result { + pub fn load(iface: &str, cgroup_path: &str) -> Result { #[cfg(debug_assertions)] let bpf = Bpf::load(include_bytes_aligned!( "../../target/bpfel-unknown-none/debug/ebpf" @@ -25,6 +26,7 @@ impl BpfLoader { Ok(Self { bpf, iface: iface.to_string(), + cgroup_path: cgroup_path.to_string(), }) } @@ -75,6 +77,20 @@ impl BpfLoader { .expect("failed to insert node ip"); }); + let tcp_accelerate: &mut SockOps = + self.bpf.program_mut("tcp_accelerate").unwrap().try_into()?; + let cgroup = std::fs::File::open(&self.cgroup_path)?; + tcp_accelerate.load()?; + tcp_accelerate.attach(cgroup)?; + + let sock_ops_map: SockHash<_, SockKey> = + self.bpf.map("SOCK_OPS_MAP").unwrap().try_into()?; + let map_fd = sock_ops_map.fd().try_clone()?; + + let tcp_bypass: &mut SkMsg = self.bpf.program_mut("tcp_bypass").unwrap().try_into()?; + tcp_bypass.load()?; + tcp_bypass.attach(&map_fd)?; + Ok(()) } } diff --git a/agent/src/main.rs b/agent/src/main.rs index b67024a..070ae68 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -24,6 +24,9 @@ use crate::netlink::Netlink; struct Opt { #[clap(short, long, default_value = "eth0")] iface: String, + + #[clap(short, long, default_value = "/sys/fs/cgroup")] + cgroup_path: String, } #[tokio::main] @@ -42,7 +45,7 @@ async fn main() -> Result<()> { setup_cni_config(&cluster_cidr, &host_route.pod_cidr)?; setup_network(&host_ip, host_route, &node_routes)?; - let mut bpf_loader = BpfLoader::load(&opt.iface)?; + let mut bpf_loader = BpfLoader::load(&opt.iface, &opt.cgroup_path)?; BpfLogger::init(&mut bpf_loader.bpf)?; bpf_loader diff --git a/common/Cargo.toml b/common/Cargo.toml index e2963a3..9ebd25d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -8,7 +8,7 @@ default = [] user = ["aya"] [dependencies] -aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", optional = true } +aya = { version = "0.12", optional = true } [lib] path = "src/lib.rs" diff --git a/common/src/lib.rs b/common/src/lib.rs index a4e4e80..05229b2 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -35,3 +35,16 @@ pub struct NetworkInfo { #[cfg(feature = "user")] unsafe impl aya::Pod for NetworkInfo {} + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct SockKey { + pub src_ip: u32, + pub dst_ip: u32, + pub src_port: u32, + pub dst_port: u32, + pub family: u32, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for SockKey {} diff --git a/ebpf/Cargo.toml b/ebpf/Cargo.toml index ae4d28b..593dd5c 100644 --- a/ebpf/Cargo.toml +++ b/ebpf/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -aya-bpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" } -aya-log-ebpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" } +aya-ebpf = "0.1.0" +aya-log-ebpf = "0.1.0" common = { path = "../common" } network-types = "0.0.5" memoffset = "0.9" diff --git a/ebpf/src/main.rs b/ebpf/src/main.rs index 747abf6..fb80050 100644 --- a/ebpf/src/main.rs +++ b/ebpf/src/main.rs @@ -3,16 +3,21 @@ use core::mem; -use aya_bpf::{ - bindings::{BPF_F_PSEUDO_HDR, TC_ACT_PIPE, TC_ACT_SHOT}, +use aya_ebpf::bindings::sk_action::SK_PASS; +use aya_ebpf::bindings::{ + sk_msg_md, BPF_ANY, BPF_F_INGRESS, BPF_F_PSEUDO_HDR, BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB, + BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB, BPF_SOCK_OPS_STATE_CB_FLAG, TC_ACT_PIPE, TC_ACT_SHOT, +}; +use aya_ebpf::maps::SockHash; +use aya_ebpf::{ cty::c_long, helpers::{bpf_csum_diff, bpf_get_prandom_u32}, - macros::{classifier, map}, + macros::{classifier, map, sk_msg, sock_ops}, maps::HashMap, - programs::TcContext, + programs::{SkMsgContext, SockOpsContext, TcContext}, }; -use aya_log_ebpf::info; -use common::{NatKey, NetworkInfo, OriginValue, CLUSTER_CIDR_KEY, HOST_IP_KEY}; +use aya_log_ebpf::{error, info}; +use common::{NatKey, NetworkInfo, OriginValue, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY}; use memoffset::offset_of; use network_types::{ eth::{EthHdr, EtherType}, @@ -20,6 +25,9 @@ use network_types::{ tcp::TcpHdr, }; +#[map] +pub static mut SOCK_OPS_MAP: SockHash = SockHash::with_max_entries(65535, 0); + #[map] static mut NET_CONFIG_MAP: HashMap = HashMap::with_max_entries(2, 0); @@ -275,6 +283,117 @@ fn is_node_ip(ip: u32) -> bool { unsafe { NODE_MAP.get(&ip).is_some() } } +#[sock_ops] +pub fn tcp_accelerate(ctx: SockOpsContext) -> u32 { + try_tcp_accelerate(ctx).unwrap_or(0) +} + +fn try_tcp_accelerate(ctx: SockOpsContext) -> Result { + let family = ctx.family(); + + // currently only support IPv4 + if family != 2 { + return Ok(0); + } + + match ctx.op() { + BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB | BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB => { + // info!( + // &ctx, + // "<<< ipv4 op = {}, src {:i}:{} => dst {:i}:{}", + // ctx.op(), + // u32::from_be(ctx.local_ip4()), + // ctx.local_port(), + // u32::from_be(ctx.remote_ip4()), + // u32::from_be(ctx.remote_port()) + // ); + + let mut sock_key = extract_sock_key_from(&ctx); + + unsafe { + SOCK_OPS_MAP + .update(&mut sock_key, &mut *ctx.ops, BPF_ANY.into()) + .map_err(|e| { + error!(&ctx, "failed to update SOCK_OPS_MAP: {}", e); + })?; + } + + ctx.set_cb_flags(BPF_SOCK_OPS_STATE_CB_FLAG as i32) + .map_err(|e| { + error!(&ctx, "failed to set BPF_SOCK_OPS_STATE_CB_FLAG: {}", e); + })?; + } + // BPF_SOCK_OPS_STATE_CB => match ctx.arg(1) { + // BPF_TCP_CLOSE | BPF_TCP_CLOSE_WAIT | BPF_TCP_LAST_ACK => { + // // info!( + // // &ctx, + // // ">>> ipv4 op = {}, src {:i}:{} => dst {:i}:{}, state: {}", + // // ctx.op(), + // // u32::from_be(ctx.local_ip4()), + // // ctx.local_port(), + // // u32::from_be(ctx.remote_ip4()), + // // u32::from_be(ctx.remote_port()), + // // ctx.arg(1) + // // ); + // } + // _ => {} + // }, + _ => {} + } + + Ok(0) +} + +fn extract_sock_key_from(ctx: &SockOpsContext) -> SockKey { + SockKey { + src_ip: u32::from_be(ctx.local_ip4()), + dst_ip: u32::from_be(ctx.remote_ip4()), + src_port: ctx.local_port(), + dst_port: u32::from_be(ctx.remote_port()), + family: ctx.family(), + } +} + +#[sk_msg] +pub fn tcp_bypass(ctx: SkMsgContext) -> u32 { + try_tcp_bypass(ctx).unwrap_or(SK_PASS) +} + +fn try_tcp_bypass(ctx: SkMsgContext) -> Result { + // info!(&ctx, "received a message on the socket"); + + let msg = unsafe { &*ctx.msg }; + + if msg.family != 2 { + return Ok(SK_PASS); + } + + let mut sock_key = sk_msg_extract_key(msg); + + unsafe { SOCK_OPS_MAP.redirect_msg(&ctx, &mut sock_key, BPF_F_INGRESS as u64) }; + // info!( + // &ctx, + // "tcp_bypass: {:i}:{} <-> {:i}:{} / ret: {}", + // sock_key.src_ip, + // sock_key.src_port, + // sock_key.dst_ip, + // sock_key.dst_port, + // ret + // ); + + Ok(SK_PASS) +} + +fn sk_msg_extract_key(msg: &sk_msg_md) -> SockKey { + SockKey { + src_ip: u32::from_be(msg.remote_ip4), + dst_ip: u32::from_be(msg.local_ip4), + src_port: u32::from_be(msg.remote_port), + dst_port: msg.local_port, + family: msg.family, + } +} + #[panic_handler] fn panic(_info: &core::panic::PanicInfo) -> ! { unsafe { core::hint::unreachable_unchecked() } diff --git a/justfile b/justfile index c4a1f10..863a986 100644 --- a/justfile +++ b/justfile @@ -7,10 +7,10 @@ setup-kind-cluster: build-image kind create cluster --config tests/e2e/kind-config.yaml kind load docker-image sinabro:test -clean-kind-cluster: +delete-kind-cluster: kind delete cluster -deploy-agent: setup-kind-cluster +create-kind-cluster-with-sinabro: setup-kind-cluster kubectl apply -f tests/e2e/deploy-test/agent.yaml deploy-test-pods: @@ -26,4 +26,7 @@ e2e-test: build-image kubectl kuttl test --config ./tests/kuttl-test.yaml launch-rust-env: - docker run --rm --privileged -it -v $(pwd):/source rust sh \ No newline at end of file + docker run --rm --privileged -it -v $(pwd):/source rust sh + +run-rsb: + kubectl run rsb --image gamelife1314/rsb -- -d 30 -c 500 -l http://$(kubectl get pod nginx-worker -o jsonpath='{.status.podIP}') \ No newline at end of file