Skip to content

Commit

Permalink
expose S3 multipart-threshold, and status-code for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
graebm committed Oct 31, 2023
1 parent d62491f commit 0d464f3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
41 changes: 32 additions & 9 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,17 @@ class S3Client(NativeResource):
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5MB.
(5*1024*1024 by default)
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB.
(8*1024*1024 by default)
throughput_target_gbps (Optional[float]): Throughput target in Gbps that we are trying to reach.
(5 Gbps by default)
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
Uploads over this size will use the multipart upload strategy.
Uploads this size or less will use a single request.
If not set, `part_size` is used as the threshold.
throughput_target_gbps (Optional[float]): Throughput target in
Gigabits per second (Gbps) that we are trying to reach.
(10.0 Gbps by default)
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -156,6 +162,7 @@ def __init__(
credential_provider=None,
tls_connection_options=None,
part_size=None,
multipart_upload_threshold=None,
throughput_target_gbps=None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
Expand Down Expand Up @@ -193,6 +200,8 @@ def on_shutdown():
tls_mode = 0
if part_size is None:
part_size = 0
if multipart_upload_threshold is None:
multipart_upload_threshold = 0
if throughput_target_gbps is None:
throughput_target_gbps = 0

Expand All @@ -205,6 +214,7 @@ def on_shutdown():
region,
tls_mode,
part_size,
multipart_upload_threshold,
throughput_target_gbps,
s3_client_core)

Expand Down Expand Up @@ -287,10 +297,16 @@ def make_request(
failed because server side sent an unsuccessful response, the headers
of the response is provided here. Else None will be returned.
* `error_body` (Optional[Bytes]): If request failed because server
* `error_body` (Optional[bytes]): If request failed because server
side sent an unsuccessful response, the body of the response is
provided here. Else None will be returned.
* `status_code` (Optional[int]): HTTP response status code (if available).
If request failed because server side sent an unsuccessful response,
this is its status code. If the operation was successful,
this is the final response's status code. If the operation
failed for another reason, None is returned.
* `**kwargs` (dict): Forward-compatibility kwargs.
on_progress: Optional callback invoked when part of the transfer is done to report the progress.
Expand Down Expand Up @@ -461,19 +477,26 @@ def _on_body(self, chunk, offset):
def _on_shutdown(self):
self._shutdown_event.set()

def _on_finish(self, error_code, error_headers, error_body):
def _on_finish(self, error_code, status_code, error_headers, error_body):
# If C layer gives status_code 0, that means "unknown"
if status_code == 0:
status_code = None

error = None
if error_code:
error = awscrt.exceptions.from_code(error_code)
if error_body:
# TODO The error body is XML, will need to parse it to something prettier.
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
try:
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
except BaseException:
pass
self._finished_future.set_exception(error)
else:
self._finished_future.set_result(None)
if self._on_done_cb:
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body)
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body, status_code=status_code)

def _on_progress(self, progress):
if self._on_progress_cb:
Expand Down
25 changes: 14 additions & 11 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,20 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOOOs#iKdO",
"OOOOOs#iKKdO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -120,6 +121,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&region.len,
&tls_mode,
&part_size,
&multipart_upload_threshold,
&throughput_target_gbps,
&py_core)) {
return NULL;
Expand Down Expand Up @@ -185,6 +187,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.tls_mode = tls_mode,
.signing_config = signing_config,
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
.tls_connection_options = tls_options,
.throughput_target_gbps = throughput_target_gbps,
.shutdown_callback = s_s3_client_shutdown,
Expand Down
3 changes: 2 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ static void s_s3_request_on_finish(
result = PyObject_CallMethod(
request_binding->py_core,
"_on_finish",
"(iOy#)",
"(iiOy#)",
error_code,
meta_request_result->response_status,
header_list ? header_list : Py_None,
(const char *)(error_body.buffer),
(Py_ssize_t)error_body.len);
Expand Down
42 changes: 33 additions & 9 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,20 @@ def _on_request_headers(self, status_code, headers, **kargs):
def _on_request_body(self, chunk, offset, **kargs):
self.received_body_len = self.received_body_len + len(chunk)

def _on_request_done(self, error, error_headers, error_body, status_code, **kwargs):
self.done_error = error
self.done_error_headers = error_headers
self.done_error_body = error_body
self.done_status_code = status_code

def _on_progress(self, progress):
self.transferred_len += progress

def _validate_successful_response(self, is_put_object):
self.assertEqual(self.response_status_code, 200, "status code is not 200")
self.assertEqual(self.done_status_code, self.response_status_code,
"status-code from on_done doesn't match code from on_headers")
self.assertIsNone(self.done_error)
headers = HttpHeaders(self.response_headers)
self.assertIsNone(headers.get("Content-Range"))
body_length = headers.get("Content-Length")
Expand All @@ -235,19 +244,22 @@ def _test_s3_put_get_object(
type=request_type,
on_headers=self._on_request_headers,
on_body=self._on_request_body,
on_done=self._on_request_done,
**kwargs)
finished_future = s3_request.finished_future
try:
finished_future.result(self.timeout)
except Exception as e:
self.assertEqual(e.name, exception_name)
else:
self._validate_successful_response(request_type is S3RequestType.PUT_OBJECT)

finished_future = s3_request.finished_future
shutdown_event = s3_request.shutdown_event
s3_request = None
self.assertTrue(shutdown_event.wait(self.timeout))

if exception_name is None:
finished_future.result()
self._validate_successful_response(request_type is S3RequestType.PUT_OBJECT)
else:
e = finished_future.exception()
self.assertEqual(e.name, exception_name)
self.assertEqual(e, self.done_error)

def test_get_object(self):
request = self._get_object_request(self.get_test_object_path)
self._test_s3_put_get_object(request, S3RequestType.GET_OBJECT)
Expand Down Expand Up @@ -505,11 +517,23 @@ def test_put_object_quick_cancel(self):
return self._put_object_cancel_helper(False)

def test_multipart_upload_with_invalid_request(self):
put_body_stream = open(self.temp_put_obj_file_path, "r+b")
content_length = os.stat(self.temp_put_obj_file_path).st_size
# send upload with incorrect Content-MD5
# need to do single-part upload so the Content-MD5 header is sent along as-is.
content_length = 100
file_path = self.files.create_file_with_size("temp_file", content_length)
put_body_stream = open(file_path, "r+b")
request = self._put_object_request(put_body_stream, content_length)
request.headers.set("Content-MD5", "something")
self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, "AWS_ERROR_S3_INVALID_RESPONSE_STATUS")

# check that data from on_done callback came through correctly
self.assertIsNotNone(self.done_error)
self.assertEqual(self.done_status_code, 400)
self.assertIsNotNone(self.done_error_headers)
self.assertTrue(any(h[0].lower() == 'x-amz-request-id' for h in self.done_error_headers))
self.assertIsNotNone(self.done_error_body)
self.assertTrue(b"InvalidDigest" in self.done_error_body)

put_body_stream.close()

def test_special_filepath_upload(self):
Expand Down

0 comments on commit 0d464f3

Please sign in to comment.