-
Notifications
You must be signed in to change notification settings - Fork 401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: add msk to batch processing #5362
Comments
Thanks for opening your first issue here! We'll come back to you as soon as we can. |
Hey @cpossinger! Thanks for opening this issue to ask for this support to improve Powertools. Let me share some information with you and explain why we can't add this support now. The Batch Processing utility is designed to handle PartialFailures when processing batches from some specific AWS services, which are: SQS, DynamoDB Stream and Kineses. It reports batch item failures to reduce the number of retries for failed records, providing a simple interface to process each batch record individually sync or async. This approach allows for more efficient processing by only retrying failed items rather than the entire batch. Streaming from Apache Kafka (MSK) to AWS Lambda does not support partial batch failure reporting, and If any record in the batch fails to process, Lambda considers the entire batch as failed and retries the whole batch. So, there is no gain in adding partial failure reporting for BatchProcessing from Amazon MSK to Lambda, as this feature is not supported. We will continue to monitor AWS updates for any future support of this feature and then add this in Powertools. Event Source Mapping documentation - https://docs.aws.amazon.com/cli/latest/reference/lambda/create-event-source-mapping.html
Please let me know if you have any additional question. |
I see this makes sense thanks for the clarification. You should also let your principal solutions architect know because that article is misleading if you're trying to use msk. It looks like the msk event source has an on_failure destination which can send the failed batch to a dead letter queue which is fine. I'd like to still have independent concurrent processes for each record in the batch with the multiprocess module and log failed processes in the lambda function. Have the child process errors be logged and raise a parent exception if there are one or more child errors. Do you have any guidance/best practices for how to do this? |
I have read the article a few times, but I couldn't find the part that might be misleading about the use of MSK. However, I understand that I might have missed something. Could you please help me identify the specific areas you think need improvement? Then we can work on making it better, if necessary.
Interesting question. If I understood correctly, you want to process each record in the batch independently using the multiprocess module to take advantage of parallelism and speed up execution with many tasks. Additionally, you need to be able to catch exceptions from each task and log them, right? I worked on some quick code to do this. While I wouldn't recommend you put this into production without proper testing and changes, it might be a good starting point to try. codeimport json
import base64
import asyncio
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
tracer = Tracer()
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext):
try:
records = event['records']['mytopic-0']
logger.info(f"Processing {len(records)} records from mytopic-0")
results = process_batch(records)
logger.info(f"Successfully processed {len(results)} records")
except Exception as e:
logger.exception(e)
def process_batch(records):
logger.info(f"Starting batch processing with {len(records)} records")
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(
*[process_record(record) for record in records]
))
errors = [r for r in results if r is None]
if errors:
raise Exception(f"{len(errors)} records failed processing")
return results
@tracer.capture_method
async def process_record(record):
try:
decoded_key = base64.b64decode(record['key']).decode('utf-8')
decoded_value = base64.b64decode(record['value']).decode('utf-8')
json_value = json.loads(decoded_value)
logger.info(f"Processing record: Key={decoded_key}, Offset={record['offset']}")
# Simulate some work
await asyncio.sleep(0.5)
# Simulate an error for keys containing 'recordKey5'.. Comment it to process the entire batch
if 'recordKey5' in decoded_key:
raise ValueError(f"Simulated error for key: {decoded_key}")
logger.info(f"Completed processing record: Key={decoded_key}, Offset={record['offset']}")
return json_value
except Exception as e:
logger.error(f"Error processing record with key {decoded_key}: {str(e)}")
return None payload{
"eventSource":"aws:SelfManagedKafka",
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5MQ==",
"value": "eyJpZCI6IDEsICJuYW1lIjogIkpvaG4gRG9lIiwgImFnZSI6IDMwfQ==",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 16,
"timestamp": 1545084651000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5Mg==",
"value": "eyJpZCI6IDIsICJuYW1lIjogIkphbmUgU21pdGgiLCAiYWdlIjogMjV9",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 17,
"timestamp": 1545084652000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5Mw==",
"value": "eyJpZCI6IDMsICJuYW1lIjogIkFsaWNlIEpvaG5zb24iLCAiYWdlIjogMjh9",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 18,
"timestamp": 1545084653000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5NA==",
"value": "eyJpZCI6IDQsICJuYW1lIjogIkJvYiBCcm93biIsICJhZ2UiOiA0MH0=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 19,
"timestamp": 1545084654000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5NQ==",
"value": "eyJpZCI6IDUsICJuYW1lIjogIkNoYXJsaWUgV2lsc29uIiwgImFnZSI6IDM1fQ==",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 20,
"timestamp": 1545084655000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5Ng==",
"value": "eyJpZCI6IDYsICJuYW1lIjogIkRhdmlkIExlZSIsICJhZ2UiOiA0NX0=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 21,
"timestamp": 1545084656000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5Nw==",
"value": "eyJpZCI6IDcsICJuYW1lIjogIkV2YSBHcmVlbiIsICJhZ2UiOiAzM30=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 22,
"timestamp": 1545084657000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5OA==",
"value": "eyJpZCI6IDgsICJuYW1lIjogIkZyYW5rIFdoaXRlIiwgImFnZSI6IDM4fQ==",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 23,
"timestamp": 1545084658000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5OQ==",
"value": "eyJpZCI6IDksICJuYW1lIjogIkdyYWNlIFRheWxvciIsICJhZ2UiOiAyOX0=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 24,
"timestamp": 1545084659000,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5MTA=",
"value": "eyJpZCI6IDEwLCAibmFtZSI6ICJIYW5uYWggQnJvd24iLCAiYWdlIjogMzF9",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
}
]
}
} outputSTART RequestId: e40a3ae3-fdc4-4071-8737-a4b79650bf0b Version: $LATEST
{"level":"INFO","location":"lambda_handler:14","message":"Processing 10 records from mytopic-0","timestamp":"2024-10-10 22:04:36,562+0000","service":"service_undefined"}
{"level":"INFO","location":"process_batch:24","message":"Starting batch processing with 10 records","timestamp":"2024-10-10 22:04:36,564+0000","service":"service_undefined"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey1, Offset=15","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-1"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey2, Offset=16","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-2"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey3, Offset=17","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-3"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey4, Offset=18","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-4"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey5, Offset=19","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-5"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey6, Offset=20","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-6"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey7, Offset=21","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-7"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey8, Offset=22","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-8"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey9, Offset=23","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-9"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey10, Offset=24","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-10"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey1, Offset=15","timestamp":"2024-10-10 22:04:37,075+0000","service":"service_undefined","taskName":"Task-1"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey2, Offset=16","timestamp":"2024-10-10 22:04:37,078+0000","service":"service_undefined","taskName":"Task-2"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey3, Offset=17","timestamp":"2024-10-10 22:04:37,079+0000","service":"service_undefined","taskName":"Task-3"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey4, Offset=18","timestamp":"2024-10-10 22:04:37,080+0000","service":"service_undefined","taskName":"Task-4"}
{"level":"ERROR","location":"process_record:56","message":"Error processing record with key recordKey5: Simulated error for key: recordKey5","timestamp":"2024-10-10 22:04:37,080+0000","service":"service_undefined","taskName":"Task-5"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey6, Offset=20","timestamp":"2024-10-10 22:04:37,081+0000","service":"service_undefined","taskName":"Task-6"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey7, Offset=21","timestamp":"2024-10-10 22:04:37,082+0000","service":"service_undefined","taskName":"Task-7"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey8, Offset=22","timestamp":"2024-10-10 22:04:37,082+0000","service":"service_undefined","taskName":"Task-8"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey9, Offset=23","timestamp":"2024-10-10 22:04:37,083+0000","service":"service_undefined","taskName":"Task-9"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey10, Offset=24","timestamp":"2024-10-10 22:04:37,083+0000","service":"service_undefined","taskName":"Task-10"}
{"level":"ERROR","location":"lambda_handler:21","message":"1 records failed processing","timestamp":"2024-10-10 22:04:37,085+0000","service":"service_undefined","exception":"Traceback (most recent call last):\n File \"/var/task/app.py\", line 16, in lambda_handler\n results = process_batch(records)\n ^^^^^^^^^^^^^^^^^^^^^^\n File \"/var/task/app.py\", line 33, in process_batch\n raise Exception(f\"{len(errors)} records failed processing\")\nException: 1 records failed processing","exception_name":"Exception","stack_trace":{"type":"Exception","value":"1 records failed processing","module":"builtins","frames":[{"file":"/var/task/app.py","line":16,"function":"lambda_handler","statement":"results = process_batch(records)"},{"file":"/var/task/app.py","line":33,"function":"process_batch","statement":"raise Exception(f\"{len(errors)} records failed processing\")"}]}}
END RequestId: ac8f886e-9a73-4955-b1db-c3d265c37cdc
REPORT RequestId: ac8f886e-9a73-4955-b1db-c3d265c37cdc Init Duration: 0.24 ms Duration: 828.81 ms Billed Duration: 829 ms Memory Size: 128 MB Max Memory Used: 128 MB
null You can improve this code by adding the event source class for better type hinting, for example. Or using the parser for validation, and implementing the Idempotency decorator to ensure each record is processed only once. Thanks for this great discussion; I really enjoy talking about these topics! |
Thanks for the code snippet I ended up changing some things for my use case and got things working. I reduced my average lambda execution time from 80ms to 50ms and achieved better fault tolerance with the added concurrency. I think my confusion with the article is in the first paragraph where msk is grouped with the other streams and queues. I assumed that powertools could do batch processing with all of those tools listed together. |
Hi @cpossinge it's great to hear that you're now processing messages faster! |
|
Use case
msk uses the same polling lambda invocation as sqs, dynamodbstream, and kinesis streams and would benefit from batch processing but is not available as an option in the batch processor class.
This article makes it seem like it's available and should be used as best practice but it's not for msk.
Solution/User Experience
KafkaMskEventModel and KafkaEvent are already implemented so it should be straight forward to add I assume.
Alternative solutions
No response
Acknowledgment
The text was updated successfully, but these errors were encountered: