Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support machines with multiple NICs #576

Merged
merged 26 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ class S3Client(NativeResource):
client can use for buffering data for requests.
Default values scale with target throughput and are currently
between 2GiB and 8GiB (may change in future)

network_interface_names: (Optional[list(str)]) A list of network interface names. The client will distribute the
connections across network interfaces provided in this list. If any interface name is invalid, goes down,
or has any issues like network access, you will see connection failures.
This option is only supported on Linux, MacOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It
is not supported on Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. On
Linux, SO_BINDTODEVICE is used and requires kernel version >= 5.7 or root privileges.
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -222,13 +229,17 @@ def __init__(
multipart_upload_threshold=None,
throughput_target_gbps=None,
enable_s3express=False,
memory_limit=None):
memory_limit=None,
network_interface_names=None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
assert isinstance(credential_provider, AwsCredentialsProvider) or credential_provider is None
assert isinstance(tls_connection_options, TlsConnectionOptions) or tls_connection_options is None
assert isinstance(part_size, int) or part_size is None
assert isinstance(network_interface_names, list) and all(isinstance(name, str)
for name in network_interface_names) or network_interface_names is None
graebm marked this conversation as resolved.
Show resolved Hide resolved

assert isinstance(
throughput_target_gbps,
int) or isinstance(
Expand Down Expand Up @@ -284,7 +295,8 @@ def on_shutdown():
throughput_target_gbps,
enable_s3express,
memory_limit,
s3_client_core)
s3_client_core,
network_interface_names)
graebm marked this conversation as resolved.
Show resolved Hide resolved

def make_request(
self,
Expand Down
74 changes: 53 additions & 21 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,24 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *py_core; /* O */
PyObject *network_interface_names_py; /* O */

if (!PyArg_ParseTuple(
args,
"OOOOOs#iKKdpKO",
"OOOOOs#iKKdpKOO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -274,7 +276,8 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&throughput_target_gbps,
&enable_s3express,
&mem_limit,
&py_core)) {
&py_core,
&network_interface_names_py)) {
return NULL;
}

Expand Down Expand Up @@ -319,9 +322,22 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
signing_config = &default_signing_config;
}

struct aws_byte_cursor *network_interface_names = NULL;
graebm marked this conversation as resolved.
Show resolved Hide resolved
int num_network_interface_names = 0;

if (network_interface_names_py != Py_None) {
if (!PyList_Check(network_interface_names_py)) {
PyErr_SetString(PyExc_TypeError, "Expected network_interface_names to be a list");
return NULL;
}
Py_ssize_t listSize = PyList_Size(network_interface_names_py);
graebm marked this conversation as resolved.
Show resolved Hide resolved
num_network_interface_names = (size_t)listSize;
}

struct s3_client_binding *s3_client = aws_mem_calloc(allocator, 1, sizeof(struct s3_client_binding));

/* From hereon, we need to clean up if errors occur */
int result = AWS_OP_ERR;
graebm marked this conversation as resolved.
Show resolved Hide resolved

PyObject *capsule = PyCapsule_New(s3_client, s_capsule_name_s3_client, s_s3_client_capsule_destructor);
if (!capsule) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -335,6 +351,17 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

s3_client->py_core = py_core;
Py_INCREF(s3_client->py_core);
if (num_network_interface_names > 0) {
network_interface_names =
aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor));
for (Py_ssize_t i = 0; i < num_network_interface_names; ++i) {
PyObject *strObj = PyList_GetItem(network_interface_names_py, i);
network_interface_names[i] = aws_byte_cursor_from_pyunicode(strObj);
if (network_interface_names[i].ptr == NULL) {
goto cleanup;
}
}
}

struct aws_s3_client_config s3_config = {
.region = region,
Expand All @@ -349,18 +376,23 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.shutdown_callback = s_s3_client_shutdown,
.shutdown_callback_user_data = s3_client,
.enable_s3express = enable_s3express,
.network_interface_names_array = network_interface_names,
.num_network_interface_names = num_network_interface_names,
};

s3_client->native = aws_s3_client_new(allocator, &s3_config);
if (s3_client->native == NULL) {
PyErr_SetAwsLastError();
goto error;
goto cleanup;
}
aws_credentials_release(anonymous_credentials);
return capsule;
result = AWS_OP_SUCCESS;

error:
cleanup:
aws_credentials_release(anonymous_credentials);
Py_DECREF(capsule);
return NULL;
aws_mem_release(allocator, network_interface_names);
if (result != AWS_OP_SUCCESS) {
Py_DECREF(capsule);
return NULL;
}
return capsule;
}
10 changes: 8 additions & 2 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def s3_client_new(
part_size=0,
is_cancel_test=False,
enable_s3express=False,
mem_limit=None):
mem_limit=None,
network_interface_names=None):

if is_cancel_test:
# for cancellation tests, make things slow, so it's less likely that
Expand Down Expand Up @@ -189,7 +190,8 @@ def s3_client_new(
part_size=part_size,
throughput_target_gbps=throughput_target_gbps,
enable_s3express=enable_s3express,
memory_limit=mem_limit)
memory_limit=mem_limit,
network_interface_names=network_interface_names)

return s3_client

Expand Down Expand Up @@ -221,6 +223,10 @@ def test_sanity_secure(self):
s3_client = s3_client_new(True, self.region)
self.assertIsNotNone(s3_client)

def test_sanity_network_interface_names(self):
s3_client = s3_client_new(True, self.region, network_interface_names=["eth0", "eth1"])
self.assertIsNotNone(s3_client)

def test_wait_shutdown(self):
s3_client = s3_client_new(False, self.region)
self.assertIsNotNone(s3_client)
Expand Down
Loading