Skip to content

Commit

Permalink
feat(Idempotency): add feature for manipulating idempotent responses (#…
Browse files Browse the repository at this point in the history
…4037)

* feat(idempotent-response-manipulation): Added capability of providing an IdempotentHook functiont to be called when an idempotent response is being returned.

* chore(mypy): resolve myopy static typing issues, make response+hook properly optional

* feat(response_hook): added some documentation, call response_hook after custom de-serialization

* feat(response_hook): review items

* chore(mypy): resolve type erro r in example code - expiry_timestamp can be None

* chore(docs): fix formatting error in markdown

* chore(docs): fix highlighting of example code - lines moved

* Improving doc

* Improving doc

* Addressing Ruben's feedback

---------

Co-authored-by: Leandro Damascena <[email protected]>
  • Loading branch information
walmsles and leandrodamascena authored Apr 4, 2024
1 parent 5a6ae52 commit b149b15
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 47 deletions.
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.hook import (
IdempotentHookFunction,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
Expand All @@ -17,4 +20,5 @@
"idempotent",
"idempotent_function",
"IdempotencyConfig",
"IdempotentHookFunction",
)
14 changes: 12 additions & 2 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple

from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.config import (
IdempotencyConfig,
)
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
Expand Down Expand Up @@ -227,7 +229,15 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Any]:
)
response_dict: Optional[dict] = data_record.response_json_as_dict()
if response_dict is not None:
return self.output_serializer.from_dict(response_dict)
serialized_response = self.output_serializer.from_dict(response_dict)
if self.config.response_hook is not None:
logger.debug("Response hook configured, invoking function")
return self.config.response_hook(
serialized_response,
data_record,
)
return serialized_response

return None

def _get_function_response(self):
Expand Down
5 changes: 5 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.idempotency import IdempotentHookFunction
from aws_lambda_powertools.utilities.typing import LambdaContext


Expand All @@ -15,6 +16,7 @@ def __init__(
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
response_hook: Optional[IdempotentHookFunction] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -37,6 +39,8 @@ def __init__(
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
response_hook: IdempotentHookFunction, optional
Hook function to be called when an idempotent response is returned from the idempotent store.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -47,6 +51,7 @@ def __init__(
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context
self.response_hook: Optional[IdempotentHookFunction] = response_hook

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
Expand Down
13 changes: 13 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Any

from aws_lambda_powertools.shared.types import Protocol
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord


class IdempotentHookFunction(Protocol):
"""
The IdempotentHookFunction.
This class defines the calling signature for IdempotentHookFunction callbacks.
"""

def __call__(self, response: Any, idempotent_data: DataRecord) -> Any: ...
87 changes: 76 additions & 11 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ We currently support Amazon DynamoDB and Redis as a storage layer. The following
If you're not [changing the default configuration for the DynamoDB persistence layer](#dynamodbpersistencelayer), this is the expected default configuration:

| Configuration | Value | Notes |
| ------------------ | ------------ | ----------------------------------------------------------------------------------- |
| Partition key | `id` |
| ------------------ | ------------ |-------------------------------------------------------------------------------------|
| Partition key | `id` | |
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |

???+ tip "Tip: You can share a single state table for all functions"
Expand Down Expand Up @@ -454,6 +454,40 @@ sequenceDiagram
<i>Idempotent successful request cached</i>
</center>

#### Successful request with response_hook configured

<center>
```mermaid
sequenceDiagram
participant Client
participant Lambda
participant Response hook
participant Persistence Layer
alt initial request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
activate Persistence Layer
Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
Lambda-->>Lambda: Call your function
Lambda->>Persistence Layer: Update record with result
deactivate Persistence Layer
Persistence Layer-->>Persistence Layer: Update record
Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
Lambda-->>Client: Response sent to client
else retried request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
activate Persistence Layer
Persistence Layer-->>Response hook: Already exists in persistence layer.
deactivate Persistence Layer
Note over Response hook,Persistence Layer: Record status is COMPLETE and not expired
Response hook->>Lambda: Response hook invoked
Lambda-->>Client: Manipulated idempotent response sent to client
end
```
<i>Successful idempotent request with a response hook</i>
</center>

#### Expired idempotency records

<center>
Expand Down Expand Up @@ -699,15 +733,16 @@ For advanced configurations, such as setting up SSL certificates or customizing

Idempotent decorator can be further configured with **`IdempotencyConfig`** as seen in the previous example. These are the available options for further configuration

| Parameter | Default | Description |
| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| Parameter | Default | Description |
|---------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| **response_hook** | `None` | Function to use for processing the stored Idempotent response. This function hook is called when an existing idempotent response is found. See [Manipulating The Idempotent Response](idempotency.md#manipulating-the-idempotent-response) |

### Handling concurrent executions with the same payload

Expand Down Expand Up @@ -909,6 +944,36 @@ You can create your own persistent store from scratch by inheriting the `BasePer

For example, the `_put_record` method needs to raise an exception if a non-expired record already exists in the data store with a matching key.

### Manipulating the Idempotent Response

You can set up a `response_hook` in the `IdempotentConfig` class to manipulate the returned data when an operation is idempotent. The hook function will be called with the current deserialized response object and the Idempotency record.

=== "Using an Idempotent Response Hook"

```python hl_lines="18 20 23 32"
--8<-- "examples/idempotency/src/working_with_response_hook.py"
```

=== "Sample event"

```json
--8<-- "examples/idempotency/src/working_with_response_hook_payload.json"
```

???+ info "Info: Using custom de-serialization?"

The response_hook is called after the custom de-serialization so the payload you process will be the de-serialized version.

#### Being a good citizen

When using response hooks to manipulate returned data from idempotent operations, it's important to follow best practices to avoid introducing complexity or issues. Keep these guidelines in mind:

1. **Response hook works exclusively when operations are idempotent.** The hook will not be called when an operation is not idempotent, or when the idempotent logic fails.

2. **Catch and Handle Exceptions.** Your response hook code should catch and handle any exceptions that may arise from your logic. Unhandled exceptions will cause the Lambda function to fail unexpectedly.

3. **Keep Hook Logic Simple** Response hooks should consist of minimal and straightforward logic for manipulating response data. Avoid complex conditional branching and aim for hooks that are easy to reason about.

## Compatibility with other utilities

### Batch
Expand Down
56 changes: 56 additions & 0 deletions examples/idempotency/src/working_with_response_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime
import uuid
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
IdempotencyConfig,
idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
DataRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


def my_response_hook(response: Dict, idempotent_data: DataRecord) -> Dict:
# Return inserted Header data into the Idempotent Response
response["x-idempotent-key"] = idempotent_data.idempotency_key

# expiry_timestamp could be None so include if set
expiry_timestamp = idempotent_data.expiry_timestamp
if expiry_timestamp:
expiry_time = datetime.datetime.fromtimestamp(int(expiry_timestamp))
response["x-idempotent-expiration"] = expiry_time.isoformat()

# Must return the response here
return response


dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(response_hook=my_response_hook)


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: dict) -> dict:
# create the order_id
order_id = str(uuid.uuid4())

# create your logic to save the order
# append the order_id created
order["order_id"] = order_id

# return the order
return {"order": order}


def lambda_handler(event: dict, context: LambdaContext):
config.register_lambda_context(context) # see Lambda timeouts section
try:
logger.info(f"Processing order id {event.get('order_id')}")
return process_order(order=event.get("order"))
except Exception as err:
return {"status_code": 400, "error": f"Error processing {str(err)}"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"order" : {
"user_id": "xyz",
"product_id": "123456789",
"quantity": 2,
"value": 30
}
}
Loading

0 comments on commit b149b15

Please sign in to comment.