Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio for pynamodb #968

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pynamodb/async_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import asyncio
import functools


def wrap_secretly_sync_async_fn(async_fn):
@functools.wraps(async_fn)
def wrap(*args, **kwargs):
asyncio.run(async_fn(*args, **kwargs))
return wrap
205 changes: 115 additions & 90 deletions pynamodb/connection/base.py

Large diffs are not rendered by default.

74 changes: 43 additions & 31 deletions pynamodb/connection/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""

import asyncio
from pynamodb.async_util import wrap_secretly_sync_async_fn
from typing import Any, Dict, Mapping, Optional, Sequence

from pynamodb.connection.base import Connection, MetaTable, OperationSettings
from pynamodb.constants import DEFAULT_BILLING_MODE, KEY
from pynamodb.expressions.condition import Condition
from pynamodb.expressions.update import Action

class TableMeta(type):
def __init__(self, name, bases, attrs):
super().__init__(name, bases, attrs)

class TableConnection:
for attr_name, attr_value in attrs.items():
suffix = "_async"
if attr_name.endswith(suffix) and asyncio.iscoroutinefunction(attr_value):
setattr(self, attr_name[:-len(suffix)], wrap_secretly_sync_async_fn(attr_value))


class TableConnection(metaclass=TableMeta):
"""
A higher level abstraction over botocore
"""
Expand All @@ -32,6 +43,7 @@ def __init__(
aws_session_token: Optional[str] = None,
) -> None:
self.table_name = table_name

self.connection = Connection(region=region,
host=host,
connect_timeout_seconds=connect_timeout_seconds,
Expand All @@ -46,13 +58,13 @@ def __init__(
aws_secret_access_key,
aws_session_token)

def get_meta_table(self, refresh: bool = False) -> MetaTable:
async def get_meta_table_async(self, refresh: bool = False) -> MetaTable:
"""
Returns a MetaTable
"""
return self.connection.get_meta_table(self.table_name, refresh=refresh)
return await self.connection.get_meta_table_async(self.table_name, refresh=refresh)

def get_operation_kwargs(
async def get_operation_kwargs_async(
self,
hash_key: str,
range_key: Optional[str] = None,
Expand All @@ -67,7 +79,7 @@ def get_operation_kwargs(
return_item_collection_metrics: Optional[str] = None,
return_values_on_condition_failure: Optional[str] = None,
) -> Dict:
return self.connection.get_operation_kwargs(
return await self.connection.get_operation_kwargs(
self.table_name,
hash_key,
range_key=range_key,
Expand All @@ -83,7 +95,7 @@ def get_operation_kwargs(
return_values_on_condition_failure=return_values_on_condition_failure
)

def delete_item(
async def delete_item_async(
self,
hash_key: str,
range_key: Optional[str] = None,
Expand All @@ -96,7 +108,7 @@ def delete_item(
"""
Performs the DeleteItem operation and returns the result
"""
return self.connection.delete_item(
return await self.connection.delete_item(
self.table_name,
hash_key,
range_key=range_key,
Expand All @@ -107,7 +119,7 @@ def delete_item(
settings=settings,
)

def update_item(
async def update_item_async(
self,
hash_key: str,
range_key: Optional[str] = None,
Expand All @@ -121,7 +133,7 @@ def update_item(
"""
Performs the UpdateItem operation
"""
return self.connection.update_item(
return await self.connection.update_item(
self.table_name,
hash_key,
range_key=range_key,
Expand All @@ -133,7 +145,7 @@ def update_item(
settings=settings,
)

def put_item(
async def put_item_async(
self,
hash_key: str,
range_key: Optional[str] = None,
Expand All @@ -147,7 +159,7 @@ def put_item(
"""
Performs the PutItem operation and returns the result
"""
return self.connection.put_item(
return await self.connection.put_item(
self.table_name,
hash_key,
range_key=range_key,
Expand All @@ -159,7 +171,7 @@ def put_item(
settings=settings,
)

def batch_write_item(
async def batch_write_item_async(
self,
put_items: Optional[Any] = None,
delete_items: Optional[Any] = None,
Expand All @@ -170,7 +182,7 @@ def batch_write_item(
"""
Performs the batch_write_item operation
"""
return self.connection.batch_write_item(
return await self.connection.batch_write_item(
self.table_name,
put_items=put_items,
delete_items=delete_items,
Expand All @@ -179,7 +191,7 @@ def batch_write_item(
settings=settings,
)

def batch_get_item(
async def batch_get_item_async(
self,
keys: Sequence[str],
consistent_read: Optional[bool] = None,
Expand All @@ -190,7 +202,7 @@ def batch_get_item(
"""
Performs the batch get item operation
"""
return self.connection.batch_get_item(
return await self.connection.batch_get_item(
self.table_name,
keys,
consistent_read=consistent_read,
Expand All @@ -199,7 +211,7 @@ def batch_get_item(
settings=settings,
)

def get_item(
async def get_item_async(
self,
hash_key: str,
range_key: Optional[str] = None,
Expand All @@ -210,7 +222,7 @@ def get_item(
"""
Performs the GetItem operation and returns the result
"""
return self.connection.get_item(
return await self.connection.get_item(
self.table_name,
hash_key,
range_key=range_key,
Expand All @@ -219,7 +231,7 @@ def get_item(
settings=settings,
)

def scan(
async def scan_async(
self,
filter_condition: Optional[Any] = None,
attributes_to_get: Optional[Any] = None,
Expand All @@ -235,7 +247,7 @@ def scan(
"""
Performs the scan operation
"""
return self.connection.scan(
return await self.connection.scan(
self.table_name,
filter_condition=filter_condition,
attributes_to_get=attributes_to_get,
Expand All @@ -249,7 +261,7 @@ def scan(
settings=settings,
)

def query(
async def query_async(
self,
hash_key: str,
range_key_condition: Optional[Condition] = None,
Expand All @@ -267,7 +279,7 @@ def query(
"""
Performs the Query operation and returns the result
"""
return self.connection.query(
return await self.connection.query(
self.table_name,
hash_key,
range_key_condition=range_key_condition,
Expand All @@ -283,25 +295,25 @@ def query(
settings=settings,
)

def describe_table(self) -> Dict:
async def describe_table_async(self) -> Dict:
"""
Performs the DescribeTable operation and returns the result
"""
return self.connection.describe_table(self.table_name)
return await self.connection.describe_table_async(self.table_name)

def delete_table(self) -> Dict:
async def delete_table_async(self) -> Dict:
"""
Performs the DeleteTable operation and returns the result
"""
return self.connection.delete_table(self.table_name)
return await self.connection.delete_table_async(self.table_name)

def update_time_to_live(self, ttl_attr_name: str) -> Dict:
async def update_time_to_live_async(self, ttl_attr_name: str) -> Dict:
"""
Performs the UpdateTimeToLive operation and returns the result
"""
return self.connection.update_time_to_live(self.table_name, ttl_attr_name)
return await self.connection.update_time_to_live_async(self.table_name, ttl_attr_name)

def update_table(
async def update_table_async(
self,
read_capacity_units: Optional[int] = None,
write_capacity_units: Optional[int] = None,
Expand All @@ -310,13 +322,13 @@ def update_table(
"""
Performs the UpdateTable operation and returns the result
"""
return self.connection.update_table(
return await self.connection.update_table_async(
self.table_name,
read_capacity_units=read_capacity_units,
write_capacity_units=write_capacity_units,
global_secondary_index_updates=global_secondary_index_updates)

def create_table(
async def create_table_async(
self,
attribute_definitions: Optional[Any] = None,
key_schema: Optional[Any] = None,
Expand All @@ -331,7 +343,7 @@ def create_table(
"""
Performs the CreateTable operation and returns the result
"""
return self.connection.create_table(
return await self.connection.create_table_async(
self.table_name,
attribute_definitions=attribute_definitions,
key_schema=key_schema,
Expand Down
2 changes: 1 addition & 1 deletion pynamodb/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IndexMeta(GenericMeta):
that contains the index settings
"""
def __init__(self, name, bases, attrs, *args, **kwargs):
super().__init__(name, bases, attrs, *args, **kwargs) # type: ignore
super().__init__(name, bases, attrs, *args, **kwargs)
if isinstance(attrs, dict):
for attr_name, attr_obj in attrs.items():
if attr_name == META_CLASS_NAME:
Expand Down
Loading