diff --git a/aws_lambda_powertools/utilities/idempotency/__init__.py b/aws_lambda_powertools/utilities/idempotency/__init__.py index ae27330cc1f..0c46553cc59 100644 --- a/aws_lambda_powertools/utilities/idempotency/__init__.py +++ b/aws_lambda_powertools/utilities/idempotency/__init__.py @@ -2,6 +2,9 @@ Utility for adding idempotency to lambda functions """ +from aws_lambda_powertools.utilities.idempotency.hook import ( + IdempotentHookFunction, +) from aws_lambda_powertools.utilities.idempotency.persistence.base import ( BasePersistenceLayer, ) @@ -17,4 +20,5 @@ "idempotent", "idempotent_function", "IdempotencyConfig", + "IdempotentHookFunction", ) diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 771547fe33c..f5ed9e2e476 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -3,7 +3,9 @@ from copy import deepcopy from typing import Any, Callable, Dict, Optional, Tuple -from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig +from aws_lambda_powertools.utilities.idempotency.config import ( + IdempotencyConfig, +) from aws_lambda_powertools.utilities.idempotency.exceptions import ( IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, @@ -227,7 +229,15 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Any]: ) response_dict: Optional[dict] = data_record.response_json_as_dict() if response_dict is not None: - return self.output_serializer.from_dict(response_dict) + serialized_response = self.output_serializer.from_dict(response_dict) + if self.config.response_hook is not None: + logger.debug("Response hook configured, invoking function") + return self.config.response_hook( + serialized_response, + data_record, + ) + return serialized_response + return None def _get_function_response(self): diff --git a/aws_lambda_powertools/utilities/idempotency/config.py b/aws_lambda_powertools/utilities/idempotency/config.py index e78f339fdc9..826dbbe4089 100644 --- a/aws_lambda_powertools/utilities/idempotency/config.py +++ b/aws_lambda_powertools/utilities/idempotency/config.py @@ -1,5 +1,6 @@ from typing import Dict, Optional +from aws_lambda_powertools.utilities.idempotency import IdempotentHookFunction from aws_lambda_powertools.utilities.typing import LambdaContext @@ -15,6 +16,7 @@ def __init__( local_cache_max_items: int = 256, hash_function: str = "md5", lambda_context: Optional[LambdaContext] = None, + response_hook: Optional[IdempotentHookFunction] = None, ): """ Initialize the base persistence layer @@ -37,6 +39,8 @@ def __init__( Function to use for calculating hashes, by default md5. lambda_context: LambdaContext, optional Lambda Context containing information about the invocation, function and execution environment. + response_hook: IdempotentHookFunction, optional + Hook function to be called when an idempotent response is returned from the idempotent store. """ self.event_key_jmespath = event_key_jmespath self.payload_validation_jmespath = payload_validation_jmespath @@ -47,6 +51,7 @@ def __init__( self.local_cache_max_items = local_cache_max_items self.hash_function = hash_function self.lambda_context: Optional[LambdaContext] = lambda_context + self.response_hook: Optional[IdempotentHookFunction] = response_hook def register_lambda_context(self, lambda_context: LambdaContext): """Captures the Lambda context, to calculate the remaining time before the invocation times out""" diff --git a/aws_lambda_powertools/utilities/idempotency/hook.py b/aws_lambda_powertools/utilities/idempotency/hook.py new file mode 100644 index 00000000000..0027399b937 --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/hook.py @@ -0,0 +1,13 @@ +from typing import Any + +from aws_lambda_powertools.shared.types import Protocol +from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord + + +class IdempotentHookFunction(Protocol): + """ + The IdempotentHookFunction. + This class defines the calling signature for IdempotentHookFunction callbacks. + """ + + def __call__(self, response: Any, idempotent_data: DataRecord) -> Any: ... diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 17848a7828b..e448d82e28e 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -73,8 +73,8 @@ We currently support Amazon DynamoDB and Redis as a storage layer. The following If you're not [changing the default configuration for the DynamoDB persistence layer](#dynamodbpersistencelayer), this is the expected default configuration: | Configuration | Value | Notes | -| ------------------ | ------------ | ----------------------------------------------------------------------------------- | -| Partition key | `id` | +| ------------------ | ------------ |-------------------------------------------------------------------------------------| +| Partition key | `id` | | | TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console | ???+ tip "Tip: You can share a single state table for all functions" @@ -454,6 +454,40 @@ sequenceDiagram Idempotent successful request cached +#### Successful request with response_hook configured + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Response hook + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Client: Response sent to client + else retried request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Persistence Layer-->>Response hook: Already exists in persistence layer. + deactivate Persistence Layer + Note over Response hook,Persistence Layer: Record status is COMPLETE and not expired + Response hook->>Lambda: Response hook invoked + Lambda-->>Client: Manipulated idempotent response sent to client + end +``` +Successful idempotent request with a response hook +
+ #### Expired idempotency records
@@ -699,15 +733,16 @@ For advanced configurations, such as setting up SSL certificates or customizing Idempotent decorator can be further configured with **`IdempotencyConfig`** as seen in the previous example. These are the available options for further configuration -| Parameter | Default | Description | -| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} | -| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload | -| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request | -| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired | -| **use_local_cache** | `False` | Whether to locally cache idempotency results | -| **local_cache_max_items** | 256 | Max number of items to store in local cache | -| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. | +| Parameter | Default | Description | +|---------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} | +| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload | +| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request | +| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired | +| **use_local_cache** | `False` | Whether to locally cache idempotency results | +| **local_cache_max_items** | 256 | Max number of items to store in local cache | +| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. | +| **response_hook** | `None` | Function to use for processing the stored Idempotent response. This function hook is called when an existing idempotent response is found. See [Manipulating The Idempotent Response](idempotency.md#manipulating-the-idempotent-response) | ### Handling concurrent executions with the same payload @@ -909,6 +944,36 @@ You can create your own persistent store from scratch by inheriting the `BasePer For example, the `_put_record` method needs to raise an exception if a non-expired record already exists in the data store with a matching key. +### Manipulating the Idempotent Response + +You can set up a `response_hook` in the `IdempotentConfig` class to manipulate the returned data when an operation is idempotent. The hook function will be called with the current deserialized response object and the Idempotency record. + +=== "Using an Idempotent Response Hook" + + ```python hl_lines="18 20 23 32" + --8<-- "examples/idempotency/src/working_with_response_hook.py" + ``` + +=== "Sample event" + + ```json + --8<-- "examples/idempotency/src/working_with_response_hook_payload.json" + ``` + +???+ info "Info: Using custom de-serialization?" + + The response_hook is called after the custom de-serialization so the payload you process will be the de-serialized version. + +#### Being a good citizen + +When using response hooks to manipulate returned data from idempotent operations, it's important to follow best practices to avoid introducing complexity or issues. Keep these guidelines in mind: + +1. **Response hook works exclusively when operations are idempotent.** The hook will not be called when an operation is not idempotent, or when the idempotent logic fails. + +2. **Catch and Handle Exceptions.** Your response hook code should catch and handle any exceptions that may arise from your logic. Unhandled exceptions will cause the Lambda function to fail unexpectedly. + +3. **Keep Hook Logic Simple** Response hooks should consist of minimal and straightforward logic for manipulating response data. Avoid complex conditional branching and aim for hooks that are easy to reason about. + ## Compatibility with other utilities ### Batch diff --git a/examples/idempotency/src/working_with_response_hook.py b/examples/idempotency/src/working_with_response_hook.py new file mode 100644 index 00000000000..725a56f32ba --- /dev/null +++ b/examples/idempotency/src/working_with_response_hook.py @@ -0,0 +1,56 @@ +import datetime +import uuid +from typing import Dict + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.idempotency import ( + DynamoDBPersistenceLayer, + IdempotencyConfig, + idempotent_function, +) +from aws_lambda_powertools.utilities.idempotency.persistence.base import ( + DataRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + + +def my_response_hook(response: Dict, idempotent_data: DataRecord) -> Dict: + # Return inserted Header data into the Idempotent Response + response["x-idempotent-key"] = idempotent_data.idempotency_key + + # expiry_timestamp could be None so include if set + expiry_timestamp = idempotent_data.expiry_timestamp + if expiry_timestamp: + expiry_time = datetime.datetime.fromtimestamp(int(expiry_timestamp)) + response["x-idempotent-expiration"] = expiry_time.isoformat() + + # Must return the response here + return response + + +dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +config = IdempotencyConfig(response_hook=my_response_hook) + + +@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) +def process_order(order: dict) -> dict: + # create the order_id + order_id = str(uuid.uuid4()) + + # create your logic to save the order + # append the order_id created + order["order_id"] = order_id + + # return the order + return {"order": order} + + +def lambda_handler(event: dict, context: LambdaContext): + config.register_lambda_context(context) # see Lambda timeouts section + try: + logger.info(f"Processing order id {event.get('order_id')}") + return process_order(order=event.get("order")) + except Exception as err: + return {"status_code": 400, "error": f"Error processing {str(err)}"} diff --git a/examples/idempotency/src/working_with_response_hook_payload.json b/examples/idempotency/src/working_with_response_hook_payload.json new file mode 100644 index 00000000000..85fdd958d59 --- /dev/null +++ b/examples/idempotency/src/working_with_response_hook_payload.json @@ -0,0 +1,8 @@ +{ + "order" : { + "user_id": "xyz", + "product_id": "123456789", + "quantity": 2, + "value": 30 + } +} diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 2591cf8e043..d33469d680f 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -1,7 +1,8 @@ import copy import datetime import warnings -from unittest.mock import MagicMock +from typing import Any +from unittest.mock import MagicMock, Mock import jmespath import pytest @@ -240,6 +241,39 @@ def lambda_handler(event, context): stubber.deactivate() +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_expired( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + lambda_response, + expected_params_update_item, + expected_params_put_item, + lambda_context, +): + """ + Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the + expiry window + """ + + stubber = stub.Stubber(persistence_store.client) + + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, lambda_context) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + @pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True) def test_idempotent_lambda_first_execution_cached( idempotency_config: IdempotencyConfig, @@ -324,39 +358,6 @@ def lambda_handler(event, context): stubber.deactivate() -@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) -def test_idempotent_lambda_expired( - idempotency_config: IdempotencyConfig, - persistence_store: DynamoDBPersistenceLayer, - lambda_apigw_event, - lambda_response, - expected_params_update_item, - expected_params_put_item, - lambda_context, -): - """ - Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the - expiry window - """ - - stubber = stub.Stubber(persistence_store.client) - - ddb_response = {} - - stubber.add_response("put_item", ddb_response, expected_params_put_item) - stubber.add_response("update_item", ddb_response, expected_params_update_item) - stubber.activate() - - @idempotent(config=idempotency_config, persistence_store=persistence_store) - def lambda_handler(event, context): - return lambda_response - - lambda_handler(lambda_apigw_event, lambda_context) - - stubber.assert_no_pending_responses() - stubber.deactivate() - - @pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) def test_idempotent_lambda_exception( idempotency_config: IdempotencyConfig, @@ -1986,3 +1987,89 @@ def lambda_handler(event, context): # THEN we should not cache a transaction that failed validation assert cache_spy.call_count == 0 + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_responsehook_lambda_first_execution( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + expected_params_update_item, + expected_params_put_item, + lambda_response, + lambda_context, +): + """ + Test response_hook is not called for the idempotent decorator when lambda is executed + with an event with a previously unknown event key + """ + + idempotent_response_hook = Mock() + + stubber = stub.Stubber(persistence_store.client) + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + idempotency_config.response_hook = idempotent_response_hook + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, lambda_context) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + assert not idempotent_response_hook.called + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_already_completed_response_hook_is_called( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + timestamp_future, + hashed_idempotency_key, + serialized_lambda_response, + deserialized_lambda_response, + lambda_context, +): + """ + Test idempotent decorator where event with matching event key has already been successfully processed + """ + + def idempotent_response_hook(response: Any, idempotent_data: DataRecord) -> Any: + """Modify the response provided by adding a new key""" + response["idempotent_response"] = True + + return response + + idempotency_config.response_hook = idempotent_response_hook + + stubber = stub.Stubber(persistence_store.client) + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "data": {"S": serialized_lambda_response}, + "status": {"S": "COMPLETED"}, + }, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response) + stubber.activate() + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + raise Exception + + lambda_resp = lambda_handler(lambda_apigw_event, lambda_context) + + # Then idempotent_response value will be added to the response + assert lambda_resp["idempotent_response"] + + stubber.assert_no_pending_responses() + stubber.deactivate()