Skip to content

Commit

Permalink
Merge pull request #38 from wqld/feat/apply-tcp-acceleration
Browse files Browse the repository at this point in the history
feat: apply eBPF acceleration for TCP transmission
  • Loading branch information
wqld authored May 1, 2024
2 parents c7a6407 + 4a7f3ff commit 3b6d46f
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 21 deletions.
78 changes: 78 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,81 @@ just deploy-agent
- [ ] Build an XDP-based BGP Peering Router
- [ ] Implement Service Load Balancing
- [ ] Collect Network Telemetry with eBPF

### TCP Acceleration

An eBPF program has been applied to accelerate TCP transmission between pods communicating on the same host machine. This avoids unnecessary traversing through the Linux network stack, enabling efficient communication between local socket pairs.

#### Without eBPF Acceleration

```sh
Get "http://10.244.1.2" with for 10s using 50 connections
Statistics Avg Stdev Max
Reqs/sec 77688.89 1898.47 80410.00
Latency 592.08µs 411.46µs 8.69ms
Latency Distribution
50% 309.77µs
75% 409.98µs
90% 490.44µs
99% 571.86µs
HTTP codes:
1XX - 0, 2XX - 777807, 3XX - 0, 4XX - 0, 5XX - 0
others - 0
Throughput: 84447.90/s

Get "http://10.244.1.2" with for 30s using 500 connections
Statistics Avg Stdev Max
Reqs/sec 73050.03 1374.04 75450.00
Latency 791.10µs 822.26µs 54.64ms
Latency Distribution
50% 361.55µs
75% 503.02µs
90% 622.58µs
99% 749.10µs
HTTP codes:
1XX - 0, 2XX - 2192021, 3XX - 0, 4XX - 0, 5XX - 0
others - 0
Throughput: 632031.35/s
```

### With eBPF Acceleration

```sh
Get "http://10.244.1.2" with for 10s using 50 connections
Statistics Avg Stdev Max
Reqs/sec 81633.44 1638.01 84030.00
Latency 539.51µs 366.21µs 11.88ms
Latency Distribution
50% 285.54µs
75% 377.27µs
90% 449.91µs
99% 521.56µs
HTTP codes:
1XX - 0, 2XX - 812374, 3XX - 0, 4XX - 0, 5XX - 0
others - 0
Throughput: 92676.69/s

Get "http://10.244.1.2" with for 30s using 500 connections
Statistics Avg Stdev Max
Reqs/sec 76810.21 1745.58 79262.00
Latency 650.09µs 714.47µs 61.40ms
Latency Distribution
50% 305.78µs
75% 422.25µs
90% 518.34µs
99% 616.42µs
HTTP codes:
1XX - 0, 2XX - 2305881, 3XX - 0, 4XX - 0, 5XX - 0
others - 0
Throughput: 769121.91/s
```

Tests were conducted with 50 connections for 10 seconds and 500 connections for 30 seconds.
The results indicate an increase in the average request rate and throughput, and a decrease in latency.

| Test Case | Requests/sec | Throughput | Latency |
| --- | --- | --- | --- |
| Without Acceleration (10s) | 77688.89 | 84447.90/s | 592.08µs |
| With Acceleration (10s) | 81633.44 | 92676.69/s | 539.51µs |
| Without Acceleration (30s) | 73050.03 | 632031.35/s | 791.10µs |
| With Acceleration (30s) | 76810.21 | 769121.91/s | 650.09µs |
6 changes: 2 additions & 4 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ ws = ["kube/ws"]
sinabro-config = { path = "../config" }

axum = "0.7.2"
aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", features = [
"async_tokio",
] }
aya-log = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
aya = { version = "0.12", features = ["async_tokio"] }
aya-log = "0.2"
clap = { version = "4.1", features = ["derive"] }
common = { path = "../common", features = ["user"] }
anyhow = "1"
Expand Down
24 changes: 20 additions & 4 deletions agent/src/bpf_loader.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::net::Ipv4Addr;

use anyhow::Result;
use aya::maps::HashMap;
use aya::programs::{tc, SchedClassifier, TcAttachType};
use aya::maps::{HashMap, SockHash};
use aya::programs::{tc, SchedClassifier, SkMsg, SockOps, TcAttachType};
use aya::{include_bytes_aligned, Bpf};
use common::{NetworkInfo, CLUSTER_CIDR_KEY, HOST_IP_KEY};
use common::{NetworkInfo, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY};

pub struct BpfLoader {
pub bpf: Bpf,
iface: String,
cgroup_path: String,
}

impl BpfLoader {
pub fn load(iface: &str) -> Result<Self> {
pub fn load(iface: &str, cgroup_path: &str) -> Result<Self> {
#[cfg(debug_assertions)]
let bpf = Bpf::load(include_bytes_aligned!(
"../../target/bpfel-unknown-none/debug/ebpf"
Expand All @@ -25,6 +26,7 @@ impl BpfLoader {
Ok(Self {
bpf,
iface: iface.to_string(),
cgroup_path: cgroup_path.to_string(),
})
}

Expand Down Expand Up @@ -75,6 +77,20 @@ impl BpfLoader {
.expect("failed to insert node ip");
});

let tcp_accelerate: &mut SockOps =
self.bpf.program_mut("tcp_accelerate").unwrap().try_into()?;
let cgroup = std::fs::File::open(&self.cgroup_path)?;
tcp_accelerate.load()?;
tcp_accelerate.attach(cgroup)?;

let sock_ops_map: SockHash<_, SockKey> =
self.bpf.map("SOCK_OPS_MAP").unwrap().try_into()?;
let map_fd = sock_ops_map.fd().try_clone()?;

let tcp_bypass: &mut SkMsg = self.bpf.program_mut("tcp_bypass").unwrap().try_into()?;
tcp_bypass.load()?;
tcp_bypass.attach(&map_fd)?;

Ok(())
}
}
5 changes: 4 additions & 1 deletion agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use crate::netlink::Netlink;
struct Opt {
#[clap(short, long, default_value = "eth0")]
iface: String,

#[clap(short, long, default_value = "/sys/fs/cgroup")]
cgroup_path: String,
}

#[tokio::main]
Expand All @@ -42,7 +45,7 @@ async fn main() -> Result<()> {
setup_cni_config(&cluster_cidr, &host_route.pod_cidr)?;
setup_network(&host_ip, host_route, &node_routes)?;

let mut bpf_loader = BpfLoader::load(&opt.iface)?;
let mut bpf_loader = BpfLoader::load(&opt.iface, &opt.cgroup_path)?;
BpfLogger::init(&mut bpf_loader.bpf)?;

bpf_loader
Expand Down
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ default = []
user = ["aya"]

[dependencies]
aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", optional = true }
aya = { version = "0.12", optional = true }

[lib]
path = "src/lib.rs"
13 changes: 13 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,16 @@ pub struct NetworkInfo {

#[cfg(feature = "user")]
unsafe impl aya::Pod for NetworkInfo {}

#[repr(C)]
#[derive(Clone, Copy)]
pub struct SockKey {
pub src_ip: u32,
pub dst_ip: u32,
pub src_port: u32,
pub dst_port: u32,
pub family: u32,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for SockKey {}
4 changes: 2 additions & 2 deletions ebpf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
aya-bpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
aya-log-ebpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
aya-ebpf = "0.1.0"
aya-log-ebpf = "0.1.0"
common = { path = "../common" }
network-types = "0.0.5"
memoffset = "0.9"
Expand Down
131 changes: 125 additions & 6 deletions ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@

use core::mem;

use aya_bpf::{
bindings::{BPF_F_PSEUDO_HDR, TC_ACT_PIPE, TC_ACT_SHOT},
use aya_ebpf::bindings::sk_action::SK_PASS;
use aya_ebpf::bindings::{
sk_msg_md, BPF_ANY, BPF_F_INGRESS, BPF_F_PSEUDO_HDR, BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB,
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB, BPF_SOCK_OPS_STATE_CB_FLAG, TC_ACT_PIPE, TC_ACT_SHOT,
};
use aya_ebpf::maps::SockHash;
use aya_ebpf::{
cty::c_long,
helpers::{bpf_csum_diff, bpf_get_prandom_u32},
macros::{classifier, map},
macros::{classifier, map, sk_msg, sock_ops},
maps::HashMap,
programs::TcContext,
programs::{SkMsgContext, SockOpsContext, TcContext},
};
use aya_log_ebpf::info;
use common::{NatKey, NetworkInfo, OriginValue, CLUSTER_CIDR_KEY, HOST_IP_KEY};
use aya_log_ebpf::{error, info};
use common::{NatKey, NetworkInfo, OriginValue, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY};
use memoffset::offset_of;
use network_types::{
eth::{EthHdr, EtherType},
ip::{IpProto, Ipv4Hdr},
tcp::TcpHdr,
};

#[map]
pub static mut SOCK_OPS_MAP: SockHash<SockKey> = SockHash::with_max_entries(65535, 0);

#[map]
static mut NET_CONFIG_MAP: HashMap<u8, NetworkInfo> = HashMap::with_max_entries(2, 0);

Expand Down Expand Up @@ -275,6 +283,117 @@ fn is_node_ip(ip: u32) -> bool {
unsafe { NODE_MAP.get(&ip).is_some() }
}

#[sock_ops]
pub fn tcp_accelerate(ctx: SockOpsContext) -> u32 {
try_tcp_accelerate(ctx).unwrap_or(0)
}

fn try_tcp_accelerate(ctx: SockOpsContext) -> Result<u32, ()> {
let family = ctx.family();

// currently only support IPv4
if family != 2 {
return Ok(0);
}

match ctx.op() {
BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB | BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB => {
// info!(
// &ctx,
// "<<< ipv4 op = {}, src {:i}:{} => dst {:i}:{}",
// ctx.op(),
// u32::from_be(ctx.local_ip4()),
// ctx.local_port(),
// u32::from_be(ctx.remote_ip4()),
// u32::from_be(ctx.remote_port())
// );

let mut sock_key = extract_sock_key_from(&ctx);

unsafe {
SOCK_OPS_MAP
.update(&mut sock_key, &mut *ctx.ops, BPF_ANY.into())
.map_err(|e| {
error!(&ctx, "failed to update SOCK_OPS_MAP: {}", e);
})?;
}

ctx.set_cb_flags(BPF_SOCK_OPS_STATE_CB_FLAG as i32)
.map_err(|e| {
error!(&ctx, "failed to set BPF_SOCK_OPS_STATE_CB_FLAG: {}", e);
})?;
}
// BPF_SOCK_OPS_STATE_CB => match ctx.arg(1) {
// BPF_TCP_CLOSE | BPF_TCP_CLOSE_WAIT | BPF_TCP_LAST_ACK => {
// // info!(
// // &ctx,
// // ">>> ipv4 op = {}, src {:i}:{} => dst {:i}:{}, state: {}",
// // ctx.op(),
// // u32::from_be(ctx.local_ip4()),
// // ctx.local_port(),
// // u32::from_be(ctx.remote_ip4()),
// // u32::from_be(ctx.remote_port()),
// // ctx.arg(1)
// // );
// }
// _ => {}
// },
_ => {}
}

Ok(0)
}

fn extract_sock_key_from(ctx: &SockOpsContext) -> SockKey {
SockKey {
src_ip: u32::from_be(ctx.local_ip4()),
dst_ip: u32::from_be(ctx.remote_ip4()),
src_port: ctx.local_port(),
dst_port: u32::from_be(ctx.remote_port()),
family: ctx.family(),
}
}

#[sk_msg]
pub fn tcp_bypass(ctx: SkMsgContext) -> u32 {
try_tcp_bypass(ctx).unwrap_or(SK_PASS)
}

fn try_tcp_bypass(ctx: SkMsgContext) -> Result<u32, ()> {
// info!(&ctx, "received a message on the socket");

let msg = unsafe { &*ctx.msg };

if msg.family != 2 {
return Ok(SK_PASS);
}

let mut sock_key = sk_msg_extract_key(msg);

unsafe { SOCK_OPS_MAP.redirect_msg(&ctx, &mut sock_key, BPF_F_INGRESS as u64) };
// info!(
// &ctx,
// "tcp_bypass: {:i}:{} <-> {:i}:{} / ret: {}",
// sock_key.src_ip,
// sock_key.src_port,
// sock_key.dst_ip,
// sock_key.dst_port,
// ret
// );

Ok(SK_PASS)
}

fn sk_msg_extract_key(msg: &sk_msg_md) -> SockKey {
SockKey {
src_ip: u32::from_be(msg.remote_ip4),
dst_ip: u32::from_be(msg.local_ip4),
src_port: u32::from_be(msg.remote_port),
dst_port: msg.local_port,
family: msg.family,
}
}

#[panic_handler]
fn panic(_info: &core::panic::PanicInfo) -> ! {
unsafe { core::hint::unreachable_unchecked() }
Expand Down
9 changes: 6 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ setup-kind-cluster: build-image
kind create cluster --config tests/e2e/kind-config.yaml
kind load docker-image sinabro:test

clean-kind-cluster:
delete-kind-cluster:
kind delete cluster

deploy-agent: setup-kind-cluster
create-kind-cluster-with-sinabro: setup-kind-cluster
kubectl apply -f tests/e2e/deploy-test/agent.yaml

deploy-test-pods:
Expand All @@ -26,4 +26,7 @@ e2e-test: build-image
kubectl kuttl test --config ./tests/kuttl-test.yaml

launch-rust-env:
docker run --rm --privileged -it -v $(pwd):/source rust sh
docker run --rm --privileged -it -v $(pwd):/source rust sh

run-rsb:
kubectl run rsb --image gamelife1314/rsb -- -d 30 -c 500 -l http://$(kubectl get pod nginx-worker -o jsonpath='{.status.podIP}')

0 comments on commit 3b6d46f

Please sign in to comment.