Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind out cross-process lock with unit tests. #519

Merged
merged 12 commits into from
Nov 7, 2023
38 changes: 38 additions & 0 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,44 @@
from typing import Optional
from enum import IntEnum

class InstanceLock(NativeResource):
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
"""
Class representing an exclusive cross-process lock.
It is created by calling crt_instance_lock_acquire().

Recommended usage is to either explicitly call release() when the lock is no longer required,
or use this in a 'with' statement.

If the lock has not been explicitly released when the process exits, it will be released by
the operating system.
"""
def __init__(self, lock_handle):
super().__init__()
self._binding = lock_handle

def __enter__(self):
# do nothing as we already have the lock
return

def release(self):
if self._binding != None:
_awscrt.s3_instance_lock_release(self._binding)
self._binding = None

def __exit__(self, exc_type, exc_value, exc_tb):
self.release()


def crt_instance_lock_acquire(lock_scope_name):
"""
Acquires an exclusive cross-process lock scoped by 'lock_scope_name'.
Throws a Runtime Error with error code: AWS_ERROR_MUTEX_CALLER_NOT_OWNER, if
the lock is already held by another caller. Callers should use this value
in a with block for automatic release when they're finished with it, or explicitly call
release(). When the process exits, this lock will be released regardless of if
release has been invoked. """
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
return InstanceLock(_awscrt.s3_instance_lock_acquire(lock_scope_name))


class S3RequestType(IntEnum):
"""The type of the AWS S3 request"""
Expand Down
6 changes: 5 additions & 1 deletion source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ static void s_install_crash_handler(void) {
******************************************************************************/

#define AWS_PY_METHOD_DEF(NAME, FLAGS) \
{ #NAME, aws_py_##NAME, (FLAGS), NULL }
{ \
# NAME, aws_py_##NAME, (FLAGS), NULL \
}

static PyMethodDef s_module_methods[] = {
/* Common */
Expand Down Expand Up @@ -801,6 +803,8 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(s3_meta_request_cancel, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_get_ec2_instance_type, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_is_crt_s3_optimized_for_system, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_instance_lock_acquire, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_instance_lock_release, METH_VARARGS),

/* WebSocket */
AWS_PY_METHOD_DEF(websocket_client_connect, METH_VARARGS),
Expand Down
3 changes: 3 additions & 0 deletions source/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args);

PyObject *aws_py_s3_meta_request_cancel(PyObject *self, PyObject *args);

PyObject *aws_py_s3_instance_lock_acquire(PyObject *self, PyObject *args);
PyObject *aws_py_s3_instance_lock_release(PyObject *self, PyObject *args);

struct aws_s3_client *aws_py_get_s3_client(PyObject *s3_client);
struct aws_s3_meta_request *aws_py_get_s3_meta_request(PyObject *s3_client);

Expand Down
66 changes: 66 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#include "auth.h"
#include "io.h"
#include <aws/common/cross_process_lock.h>
#include <aws/s3/s3_client.h>

static const char *s_capsule_name_s3_client = "aws_s3_client";
static const char *s_capsule_name_s3_instance_lock = "aws_cross_process_lock";

PyObject *aws_py_s3_get_ec2_instance_type(PyObject *self, PyObject *args) {
(void)self;
Expand Down Expand Up @@ -37,6 +39,70 @@ PyObject *aws_py_s3_is_crt_s3_optimized_for_system(PyObject *self, PyObject *arg
Py_RETURN_FALSE;
}

struct instance_lock_binding {
struct aws_cross_process_lock *lock;
};

/* Invoked when the python object gets cleaned up */
static void s_s3_instance_lock_destructor(PyObject *capsule) {
struct instance_lock_binding *lock_binding = PyCapsule_GetPointer(capsule, s_capsule_name_s3_instance_lock);

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

aws_mem_release(aws_py_get_allocator(), lock_binding);
}

PyObject *aws_py_s3_instance_lock_acquire(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_byte_cursor lock_name; /* s# */

if (!PyArg_ParseTuple(args, "s#", &lock_name.ptr, &lock_name.len)) {
return NULL;
}

struct aws_cross_process_lock *lock = aws_cross_process_lock_try_acquire(allocator, lock_name);

if (!lock) {
return PyErr_AwsLastError();
}

struct instance_lock_binding *binding = aws_mem_calloc(allocator, 1, sizeof(struct instance_lock_binding));
binding->lock = lock;

PyObject *capsule = PyCapsule_New(binding, s_capsule_name_s3_instance_lock, s_s3_instance_lock_destructor);
if (!capsule) {
aws_mem_release(allocator, binding);
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
return PyErr_AwsLastError();
}

return capsule;
}

PyObject *aws_py_s3_instance_lock_release(PyObject *self, PyObject *args) {
struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct instance_lock_binding *lock_binding = PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

Py_RETURN_NONE;
}

struct s3_client_binding {
struct aws_s3_client *native;

Expand Down
52 changes: 51 additions & 1 deletion test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import tempfile
import math
import shutil
import time
from test import NativeResourceTest
from concurrent.futures import Future
from multiprocessing import Process

from awscrt.http import HttpHeaders, HttpRequest
from awscrt.s3 import (
Expand All @@ -18,6 +20,8 @@
S3ChecksumLocation,
S3Client,
S3RequestType,
InstanceLock,
crt_instance_lock_acquire,
create_default_s3_signing_config,
)
from awscrt.io import (
Expand All @@ -41,6 +45,53 @@
MB = 1024 ** 2
GB = 1024 ** 3

cross_process_lock_name = "instance_lock_test"

def cross_proc_task():
try:
lock = crt_instance_lock_acquire(cross_process_lock_name)
lock.release()
exit(0)
except RuntimeError as e:
exit(-1)

class InstanceLockTest(NativeResourceTest):
def setUp(self):
self.nonce = time.time()
super().setUp()

def test_with_statement(self):
nonce_str = 'lock_a_{}'.format(self.nonce)
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
with crt_instance_lock_acquire(nonce_str) as lock:
try:
new_lock = crt_instance_lock_acquire(nonce_str)
self.fail("Acquiring a lock by the same nonce should fail when it's already held")
except RuntimeError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the CRT have any formalized error hierarchy in Python? I haven't dug too deeply yet. My only concern here is RuntimeError is very generic so it will be difficult to do control flow handling with this without message introspection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graebm any thoughts here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not great. It's the same problem we have in every language binding. We didn't code-gen error classes, so catching specific CRT errors means users must catch a RuntimeError and introspect it.

In this specific case, we could dance around the issue and just return None or False if the C function returns NULL, since we do often expect it to fail 🤷‍♀️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy about this. I really want to do a mildly-breaking-change to all our bindings and address this issue. But it's non-trivial :|

unique_nonce_str = 'lock_b{}'.format(self.nonce)
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
new_lock = crt_instance_lock_acquire(unique_nonce_str)
self.assertTrue(new_lock != None)
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
new_lock.release()

lock_after_with_same_nonce = crt_instance_lock_acquire(nonce_str)
self.assertTrue(lock_after_with_same_nonce != None)
JonathanHenson marked this conversation as resolved.
Show resolved Hide resolved
lock_after_with_same_nonce.release()

def test_cross_proc(self):
with crt_instance_lock_acquire(cross_process_lock_name) as lock:
process = Process(target=cross_proc_task)
process.start()
process.join()
# aquiring this lock in a sub-process should fail since we
# already hold the lock in this process.
self.assertNotEqual(0, process.exitcode)

# now that we've released the lock above, the same sub-process path
# should now succeed.
unlocked_process = Process(target=cross_proc_task)
unlocked_process.start()
unlocked_process.join()
self.assertEqual(0, unlocked_process.exitcode)


class FileCreator(object):
def __init__(self):
Expand Down Expand Up @@ -161,7 +212,6 @@ def test_wait_shutdown(self):
del s3_client
self.assertTrue(shutdown_event.wait(self.timeout))


@unittest.skipUnless(os.environ.get('AWS_TEST_S3'), 'set env var to run test: AWS_TEST_S3')
class S3RequestTest(NativeResourceTest):
def setUp(self):
Expand Down
Loading