Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 committed Jul 8, 2024
1 parent 14efed9 commit e21ae79
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 61 deletions.
27 changes: 14 additions & 13 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class S3Client(NativeResource):
client can use for buffering data for requests.
Default values scale with target throughput and are currently
between 2GiB and 8GiB (may change in future)
network_interface_names: (Optional[list(str)])
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -222,13 +225,17 @@ def __init__(
multipart_upload_threshold=None,
throughput_target_gbps=None,
enable_s3express=False,
memory_limit=None):
memory_limit=None,
network_interface_names=None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
assert isinstance(credential_provider, AwsCredentialsProvider) or credential_provider is None
assert isinstance(tls_connection_options, TlsConnectionOptions) or tls_connection_options is None
assert isinstance(part_size, int) or part_size is None
assert isinstance(network_interface_names, list) and all(isinstance(name, str)
for name in network_interface_names) or network_interface_names is None

assert isinstance(
throughput_target_gbps,
int) or isinstance(
Expand Down Expand Up @@ -284,7 +291,8 @@ def on_shutdown():
throughput_target_gbps,
enable_s3express,
memory_limit,
s3_client_core)
s3_client_core,
network_interface_names)

def make_request(
self,
Expand All @@ -302,8 +310,7 @@ def make_request(
on_headers=None,
on_body=None,
on_done=None,
on_progress=None,
network_interface_names=None):
on_progress=None):
"""Create the Request to the the S3 server,
:attr:`~S3RequestType.GET_OBJECT`/:attr:`~S3RequestType.PUT_OBJECT` requests are split it into multi-part
requests under the hood for acceleration.
Expand Down Expand Up @@ -450,8 +457,6 @@ def make_request(
* `**kwargs` (dict): Forward-compatibility kwargs.
network_interface_names: (Optional[list(str)])
Returns:
S3Request
"""
Expand All @@ -471,8 +476,7 @@ def make_request(
on_body=on_body,
on_done=on_done,
on_progress=on_progress,
region=self._region,
network_interface_names=network_interface_names)
region=self._region)


class S3Request(NativeResource):
Expand Down Expand Up @@ -509,16 +513,14 @@ def __init__(
on_body=None,
on_done=None,
on_progress=None,
region=None,
network_interface_names=None):
region=None):
assert isinstance(client, S3Client)
assert isinstance(request, HttpRequest)
assert callable(on_headers) or on_headers is None
assert callable(on_body) or on_body is None
assert callable(on_done) or on_done is None
assert isinstance(part_size, int) or part_size is None
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None
assert isinstance(network_interface_names, list) and all(isinstance(name, str) for name in network_interface_names) or network_interface_names is None

if type == S3RequestType.DEFAULT and not operation_name:
raise ValueError("'operation_name' must be set when using S3RequestType.DEFAULT")
Expand Down Expand Up @@ -570,8 +572,7 @@ def __init__(
validate_response_checksum,
part_size,
multipart_upload_threshold,
s3_request_core,
network_interface_names)
s3_request_core)

@property
def finished_future(self):
Expand Down
58 changes: 43 additions & 15 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,24 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *py_core; /* O */
PyObject *network_interface_names_py; /* O */

if (!PyArg_ParseTuple(
args,
"OOOOOs#iKKdpKO",
"OOOOOs#iKKdpKOO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -274,7 +276,8 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&throughput_target_gbps,
&enable_s3express,
&mem_limit,
&py_core)) {
&py_core,
&network_interface_names_py)) {
return NULL;
}

Expand Down Expand Up @@ -319,6 +322,19 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
signing_config = &default_signing_config;
}

struct aws_byte_cursor *network_interface_names = NULL;
int num_network_interface_names = 0;

if (network_interface_names_py != Py_None) {
if (!PyList_Check(network_interface_names_py)) {
// waahm7: todo, correct way to raise errors?
PyErr_SetString(PyExc_TypeError, "Expected a list");
return NULL;
}
Py_ssize_t listSize = PyList_Size(network_interface_names_py);
num_network_interface_names = (size_t)listSize;
}

struct s3_client_binding *s3_client = aws_mem_calloc(allocator, 1, sizeof(struct s3_client_binding));

/* From hereon, we need to clean up if errors occur */
Expand All @@ -335,6 +351,14 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

s3_client->py_core = py_core;
Py_INCREF(s3_client->py_core);
if (num_network_interface_names > 0) {
network_interface_names =
aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor));
for (Py_ssize_t i = 0; i < num_network_interface_names; ++i) {
PyObject *strObj = PyList_GetItem(network_interface_names_py, i);
network_interface_names[i] = aws_byte_cursor_from_pyunicode(strObj);
}
}

struct aws_s3_client_config s3_config = {
.region = region,
Expand All @@ -349,9 +373,13 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.shutdown_callback = s_s3_client_shutdown,
.shutdown_callback_user_data = s3_client,
.enable_s3express = enable_s3express,
.network_interface_names_array = network_interface_names,
.num_network_interface_names = num_network_interface_names,
};

s3_client->native = aws_s3_client_new(allocator, &s3_config);
aws_mem_release(allocator, network_interface_names);

if (s3_client->native == NULL) {
PyErr_SetAwsLastError();
goto error;
Expand Down
32 changes: 2 additions & 30 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,9 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
PyObject *py_core; /* O */
PyObject *network_interface_names_py; /* O */

if (!PyArg_ParseTuple(
args,
"OOOizOOzzs#iipKKOO",
"OOOizOOzzs#iipKKO",
&py_s3_request,
&s3_client_py,
&http_request_py,
Expand All @@ -398,8 +396,7 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
&validate_response_checksum,
&part_size,
&multipart_upload_threshold,
&py_core,
&network_interface_names_py)) {
&py_core)) {
return NULL;
}
struct aws_s3_client *s3_client = aws_py_get_s3_client(s3_client_py);
Expand Down Expand Up @@ -441,19 +438,6 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
.validate_response_checksum = validate_response_checksum != 0,
};

struct aws_byte_cursor *network_interface_names = NULL;
int num_network_interface_names = 0;

if (network_interface_names_py != Py_None) {
if (!PyList_Check(network_interface_names_py)) {
// waahm7: todo, correct way to raise errors?
PyErr_SetString(PyExc_TypeError, "Expected a list");
return NULL;
}
Py_ssize_t listSize = PyList_Size(listObj);
num_network_interface_names = (size_t)listSize;
}

struct s3_meta_request_binding *meta_request = aws_mem_calloc(allocator, 1, sizeof(struct s3_meta_request_binding));
if (!meta_request) {
return PyErr_AwsLastError();
Expand All @@ -480,15 +464,6 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
}
}

if (num_network_interface_names > 0) {
network_interface_names =
aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor));
for (Py_ssize_t i = 0; i < listSize; ++i) {
PyObject *strObj = PyList_GetItem(listObj, i);
network_interface_names[i] = aws_byte_cursor_from_pyunicode(strObj);
}
}

struct aws_s3_meta_request_options s3_meta_request_opt = {
.type = type,
.operation_name = aws_byte_cursor_from_c_str(operation_name),
Expand All @@ -504,15 +479,12 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
.user_data = meta_request,
.network_interface_names = network_interface_names,
.num_network_interface_names = num_network_interface_names,
};

if (aws_high_res_clock_get_ticks(&meta_request->last_sampled_time)) {
goto error;
}
meta_request->native = aws_s3_client_make_meta_request(s3_client, &s3_meta_request_opt);
aws_mem_cleanup(allocator, network_interface_names);
if (meta_request->native == NULL) {
PyErr_SetAwsLastError();
goto error;
Expand Down

0 comments on commit e21ae79

Please sign in to comment.