Skip to content

Commit

Permalink
Merge branch 'main' into freebsd
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyMusatkin authored Nov 8, 2023
2 parents 90e84ae + 16e6492 commit 5c617e8
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 77 deletions.
72 changes: 63 additions & 9 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,37 @@
from enum import IntEnum


class CrossProcessLock(NativeResource):
"""
Class representing an exclusive cross-process lock, scoped by `lock_scope_name`
Recommended usage is to either explicitly call acquire() followed by release() when the lock is no longer required, or use this in a 'with' statement.
acquire() will throw a RuntimeError with AWS_MUTEX_CALLER_NOT_OWNER as the error code, if the lock could not be acquired.
If the lock has not been explicitly released when the process exits, it will be released by the operating system.
Keyword Args:
lock_scope_name (str): Unique string identifying the caller holding the lock.
"""

def __init__(self, lock_scope_name):
super().__init__()
self._binding = _awscrt.s3_cross_process_lock_new(lock_scope_name)

def acquire(self):
_awscrt.s3_cross_process_lock_acquire(self._binding)

def __enter__(self):
self.acquire()

def release(self):
_awscrt.s3_cross_process_lock_release(self._binding)

def __exit__(self, exc_type, exc_value, exc_tb):
self.release()


class S3RequestType(IntEnum):
"""The type of the AWS S3 request"""

Expand Down Expand Up @@ -137,11 +168,17 @@ class S3Client(NativeResource):
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5MB.
(5*1024*1024 by default)
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB.
(8*1024*1024 by default)
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
Uploads over this size will use the multipart upload strategy.
Uploads this size or less will use a single request.
If not set, `part_size` is used as the threshold.
throughput_target_gbps (Optional[float]): Throughput target in Gbps that we are trying to reach.
(5 Gbps by default)
throughput_target_gbps (Optional[float]): Throughput target in
Gigabits per second (Gbps) that we are trying to reach.
(10.0 Gbps by default)
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -156,6 +193,7 @@ def __init__(
credential_provider=None,
tls_connection_options=None,
part_size=None,
multipart_upload_threshold=None,
throughput_target_gbps=None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
Expand Down Expand Up @@ -193,6 +231,8 @@ def on_shutdown():
tls_mode = 0
if part_size is None:
part_size = 0
if multipart_upload_threshold is None:
multipart_upload_threshold = 0
if throughput_target_gbps is None:
throughput_target_gbps = 0

Expand All @@ -205,6 +245,7 @@ def on_shutdown():
region,
tls_mode,
part_size,
multipart_upload_threshold,
throughput_target_gbps,
s3_client_core)

Expand Down Expand Up @@ -287,10 +328,16 @@ def make_request(
failed because server side sent an unsuccessful response, the headers
of the response is provided here. Else None will be returned.
* `error_body` (Optional[Bytes]): If request failed because server
* `error_body` (Optional[bytes]): If request failed because server
side sent an unsuccessful response, the body of the response is
provided here. Else None will be returned.
* `status_code` (Optional[int]): HTTP response status code (if available).
If request failed because server side sent an unsuccessful response,
this is its status code. If the operation was successful,
this is the final response's status code. If the operation
failed for another reason, None is returned.
* `**kwargs` (dict): Forward-compatibility kwargs.
on_progress: Optional callback invoked when part of the transfer is done to report the progress.
Expand Down Expand Up @@ -461,19 +508,26 @@ def _on_body(self, chunk, offset):
def _on_shutdown(self):
self._shutdown_event.set()

def _on_finish(self, error_code, error_headers, error_body):
def _on_finish(self, error_code, status_code, error_headers, error_body):
# If C layer gives status_code 0, that means "unknown"
if status_code == 0:
status_code = None

error = None
if error_code:
error = awscrt.exceptions.from_code(error_code)
if error_body:
# TODO The error body is XML, will need to parse it to something prettier.
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
try:
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
except BaseException:
pass
self._finished_future.set_exception(error)
else:
self._finished_future.set_result(None)
if self._on_done_cb:
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body)
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body, status_code=status_code)

def _on_progress(self, progress):
if self._on_progress_cb:
Expand Down
3 changes: 3 additions & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(s3_meta_request_cancel, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_get_ec2_instance_type, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_is_crt_s3_optimized_for_system, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_new, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_acquire, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_release, METH_VARARGS),

/* WebSocket */
AWS_PY_METHOD_DEF(websocket_client_connect, METH_VARARGS),
Expand Down
4 changes: 4 additions & 0 deletions source/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args);

PyObject *aws_py_s3_meta_request_cancel(PyObject *self, PyObject *args);

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args);

struct aws_s3_client *aws_py_get_s3_client(PyObject *s3_client);
struct aws_s3_meta_request *aws_py_get_s3_meta_request(PyObject *s3_client);

Expand Down
124 changes: 113 additions & 11 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#include "auth.h"
#include "io.h"
#include <aws/common/cross_process_lock.h>
#include <aws/s3/s3_client.h>

static const char *s_capsule_name_s3_client = "aws_s3_client";
static const char *s_capsule_name_s3_instance_lock = "aws_cross_process_lock";

PyObject *aws_py_s3_get_ec2_instance_type(PyObject *self, PyObject *args) {
(void)self;
Expand Down Expand Up @@ -37,6 +39,103 @@ PyObject *aws_py_s3_is_crt_s3_optimized_for_system(PyObject *self, PyObject *arg
Py_RETURN_FALSE;
}

struct cross_process_lock_binding {
struct aws_cross_process_lock *lock;
struct aws_string *name;
};

/* Invoked when the python object gets cleaned up */
static void s_s3_cross_process_lock_destructor(PyObject *capsule) {
struct cross_process_lock_binding *lock_binding = PyCapsule_GetPointer(capsule, s_capsule_name_s3_instance_lock);

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

if (lock_binding->name) {
aws_string_destroy(lock_binding->name);
}

aws_mem_release(aws_py_get_allocator(), lock_binding);
}

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_byte_cursor lock_name; /* s# */

if (!PyArg_ParseTuple(args, "s#", &lock_name.ptr, &lock_name.len)) {
return NULL;
}

struct cross_process_lock_binding *binding =
aws_mem_calloc(allocator, 1, sizeof(struct cross_process_lock_binding));
binding->name = aws_string_new_from_cursor(allocator, &lock_name);

PyObject *capsule = PyCapsule_New(binding, s_capsule_name_s3_instance_lock, s_s3_cross_process_lock_destructor);
if (!capsule) {
aws_string_destroy(binding->name);
aws_mem_release(allocator, binding);
return PyErr_AwsLastError();
}

return capsule;
}

PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (!lock_binding->lock) {
struct aws_cross_process_lock *lock =
aws_cross_process_lock_try_acquire(allocator, aws_byte_cursor_from_string(lock_binding->name));

if (!lock) {
return PyErr_AwsLastError();
}
lock_binding->lock = lock;
}

Py_RETURN_NONE;
}

PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args) {
PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

Py_RETURN_NONE;
}

struct s3_client_binding {
struct aws_s3_client *native;

Expand Down Expand Up @@ -98,19 +197,20 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOOOs#iKdO",
"OOOOOs#iKKdO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -120,6 +220,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&region.len,
&tls_mode,
&part_size,
&multipart_upload_threshold,
&throughput_target_gbps,
&py_core)) {
return NULL;
Expand Down Expand Up @@ -185,6 +286,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.tls_mode = tls_mode,
.signing_config = signing_config,
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
.tls_connection_options = tls_options,
.throughput_target_gbps = throughput_target_gbps,
.shutdown_callback = s_s3_client_shutdown,
Expand Down
3 changes: 2 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ static void s_s3_request_on_finish(
result = PyObject_CallMethod(
request_binding->py_core,
"_on_finish",
"(iOy#)",
"(iiOy#)",
error_code,
meta_request_result->response_status,
header_list ? header_list : Py_None,
(const char *)(error_body.buffer),
(Py_ssize_t)error_body.len);
Expand Down
Loading

0 comments on commit 5c617e8

Please sign in to comment.