Skip to content

Commit

Permalink
Merge pull request #487 from squeek502/udp-recvmmsg
Browse files Browse the repository at this point in the history
luv_new_udp: Add support for UV_UDP_RECVMMSG
  • Loading branch information
squeek502 authored Oct 21, 2020
2 parents ad576ff + 407f9c3 commit 464b735
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 13 deletions.
22 changes: 18 additions & 4 deletions docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1888,12 +1888,26 @@ UDP handles encapsulate UDP communication for both clients and servers.
### `uv.new_udp([flags])`

**Parameters:**
- `flags`: `string` or `nil`
- `flags`: `table` or `nil`
- `family`: `string` or `nil`
- `mmsgs`: `integer` or `nil` (default: `1`)

Creates and initializes a new `uv_udp_t`. Returns the Lua userdata wrapping
it. The actual socket is created lazily. Flags may be a family string:
`"unix"`, `"inet"`, `"inet6"`, `"ipx"`, `"netlink"`, `"x25"`, `"ax25"`,
`"atmpvc"`, `"appletalk"`, or `"packet"`.
it. The actual socket is created lazily.

When specified, `family` must be one of `"unix"`, `"inet"`, `"inet6"`,
`"ipx"`, `"netlink"`, `"x25"`, `"ax25"`, `"atmpvc"`, `"appletalk"`, or
`"packet"`.

When specified, `mmsgs` determines the number of messages able to be received
at one time via `recvmmsg(2)` (the allocated buffer will be sized to be able
to fit the specified number of max size dgrams). Only has an effect on
platforms that support `recvmmsg(2)`.

**Note:** For backwards compatibility reasons, `flags` can also be a string or
integer. When it is a string, it will be treated like the `family` key above.
When it is an integer, it will be used directly as the `flags` parameter when
calling `uv_udp_init_ex`.

**Returns:** `uv_udp_t userdata` or `fail`

Expand Down
99 changes: 90 additions & 9 deletions src/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,69 @@ static int luv_new_udp(lua_State* L) {
lua_settop(L, 1);
uv_udp_t* handle = (uv_udp_t*)luv_newuserdata(L, sizeof(*handle));
int ret;
#if LUV_UV_VERSION_GEQ(1, 39, 0)
// TODO: This default can potentially be increased, but it's
// not clear what the best default would be, or if unconditionally
// using recvmmsg is always an improvement.
//
// Would probably need to do some extensive benchmarking to
// figure out what a good default might be.
int mmsg_num_msgs = 1;
#endif
#if LUV_UV_VERSION_GEQ(1, 7, 0)
if (lua_isnoneornil(L, 1)) {
ret = uv_udp_init(ctx->loop, handle);
}
else {
unsigned int flags = AF_UNSPEC;
unsigned int flags = AF_UNSPEC;
if (!lua_isnoneornil(L, 1)) {
if (lua_isnumber(L, 1)) {
flags = lua_tointeger(L, 1);
}
else if (lua_isstring(L, 1)) {
const char* family = lua_tostring(L, 1);
flags = luv_af_string_to_num(family);
if (!flags) {
luaL_argerror(L, 1, lua_pushfstring(L, "invalid or unknown address family: '%s'", family));
} else if (lua_istable(L, 1)) {
lua_getfield(L, 1, "family");
if (lua_isnumber(L, -1)) {
// The lower 8 bits of the flags parameter are used as the socket domain
flags = lua_tointeger(L, -1) & 0xFF;
}
else if (lua_isstring(L, -1)) {
flags = luv_af_string_to_num(lua_tostring(L, -1));
}
else if (!lua_isnil(L, -1)) {
luaL_argerror(L, 1, "family must be string or integer if set");
}
lua_pop(L, 1);

#if LUV_UV_VERSION_GEQ(1, 39, 0)
lua_getfield(L, 1, "mmsgs");
if (lua_isnumber(L, -1)) {
mmsg_num_msgs = lua_tonumber(L, -1);
} else if (!lua_isnil(L, -1)) {
luaL_argerror(L, 1, "mmsgs must be integer if set");
}
lua_pop(L, 1);
#endif
}
else {
luaL_argerror(L, 1, "expected string or integer");
luaL_argerror(L, 1, "expected table, string, or integer");
}
ret = uv_udp_init_ex(ctx->loop, handle, flags);
}
#if LUV_UV_VERSION_GEQ(1, 39, 0)
// Libuv intended to enable this by default, but it caused a backwards-incompatibility with how
// the buffer is freed in udp_recv_cb, so it had to be put behind a flag to avoid breaking
// existing libuv users. However, because luv handles UV_UDP_MMSG_CHUNK in luv_udp_recv_cb, we can
// always enable this flag and get the benefits of recvmmsg for platforms that support it.
//
// Relevant links:
// - https://github.com/libuv/libuv/issues/2791
// - https://github.com/libuv/libuv/pull/2792
// - https://github.com/libuv/libuv/pull/2532
// - https://github.com/libuv/libuv/issues/419
//
// But we should only set the flag if we can actually take advantage of it.
if (mmsg_num_msgs > 1)
flags |= UV_UDP_RECVMMSG;
#endif
ret = uv_udp_init_ex(ctx->loop, handle, flags);
#else
ret = uv_udp_init(ctx->loop, handle);
#endif
Expand All @@ -56,6 +98,16 @@ static int luv_new_udp(lua_State* L) {
return luv_error(L, ret);
}
handle->data = luv_setup_handle(L, ctx);
#if LUV_UV_VERSION_GEQ(1, 39, 0)
if (flags & UV_UDP_RECVMMSG) {
// store the number of msgs to be received for use in alloc_cb
int* extra_data = malloc(sizeof(int));
assert(extra_data);
*extra_data = mmsg_num_msgs;
((luv_handle_t*)handle->data)->extra = extra_data;
((luv_handle_t*)handle->data)->extra_gc = free;
}
#endif
return 1;
}

Expand Down Expand Up @@ -276,6 +328,16 @@ static void luv_udp_recv_cb(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf
luv_handle_t* data = (luv_handle_t*)handle->data;
lua_State* L = data->ctx->L;

#if LUV_UV_VERSION_GEQ(1, 40, 0)
// If UV_UDP_MMSG_FREE is set, we can skip calling the callback
// and return early because we know the only purpose of this recv_cb call
// is to free the buffer that was being used by recvmmsg
if (flags & UV_UDP_MMSG_FREE) {
free(buf->base);
return;
}
#endif

// err
if (nread < 0) {
luv_status(L, nread);
Expand Down Expand Up @@ -333,11 +395,30 @@ static void luv_udp_recv_cb(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf
luv_call_callback(L, (luv_handle_t*)handle->data, LUV_RECV, 4);
}

#if LUV_UV_VERSION_GEQ(1, 39, 0)
#define MAX_DGRAM_SIZE (64*1024)

static void luv_udp_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
size_t buffer_size = suggested_size;
if (uv_udp_using_recvmmsg((uv_udp_t*)handle)) {
int num_msgs = *(int*)(((luv_handle_t*)handle->data)->extra);
buffer_size = MAX_DGRAM_SIZE * num_msgs;
}
buf->base = (char*)malloc(buffer_size);
assert(buf->base);
buf->len = buffer_size;
}
#endif

static int luv_udp_recv_start(lua_State* L) {
uv_udp_t* handle = luv_check_udp(L, 1);
int ret;
luv_check_callback(L, (luv_handle_t*)handle->data, LUV_RECV, 2);
#if LUV_UV_VERSION_GEQ(1, 39, 0)
ret = uv_udp_recv_start(handle, luv_udp_alloc_cb, luv_udp_recv_cb);
#else
ret = uv_udp_recv_start(handle, luv_alloc_cb, luv_udp_recv_cb);
#endif
#if LUV_UV_VERSION_LEQ(1, 23, 0)
#if LUV_UV_VERSION_GEQ(1, 10, 0)
// in Libuv <= 1.23.0, uv_udp_recv_start will return untranslated error codes on Windows
Expand Down
36 changes: 36 additions & 0 deletions tests/test-udp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,40 @@ return require('lib/tap')(function (test)
local testfn = multicast_join_test("::", "ff02::1", nil)
return testfn(print, p, expect, uv)
end)

test("udp recvmmsg", function(print, p, expect, uv)
local NUM_SENDS = 8
local NUM_MSGS_PER_ALLOC = 4

local recver = uv.new_udp({mmsgs = NUM_MSGS_PER_ALLOC})
assert(recver:bind("0.0.0.0", TEST_PORT))

local sender = uv.new_udp()

local msgs_recved = 0
local recv_cb = function(err, data, addr, flags)
assert(not err, err)
p(data, addr)

-- empty callback can happen, just return early
if data == nil and addr == nil then
return
end

assert(addr)
assert(data == "PING")

msgs_recved = msgs_recved + 1
if msgs_recved == NUM_SENDS then
sender:close()
recver:close()
end
end

assert(recver:recv_start(recv_cb))

for i=1,NUM_SENDS do
assert(sender:try_send("PING", "127.0.0.1", TEST_PORT))
end
end, "1.39.0")
end)

0 comments on commit 464b735

Please sign in to comment.