Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: add msk to batch processing #5362

Closed
2 tasks done
cpossinger opened this issue Oct 10, 2024 · 7 comments
Closed
2 tasks done

Feature request: add msk to batch processing #5362

cpossinger opened this issue Oct 10, 2024 · 7 comments
Labels
feature-request feature request

Comments

@cpossinger
Copy link

Use case

msk uses the same polling lambda invocation as sqs, dynamodbstream, and kinesis streams and would benefit from batch processing but is not available as an option in the batch processor class.

This article makes it seem like it's available and should be used as best practice but it's not for msk.

Solution/User Experience

KafkaMskEventModel and KafkaEvent are already implemented so it should be straight forward to add I assume.

Alternative solutions

No response

Acknowledgment

@cpossinger cpossinger added feature-request feature request triage Pending triage from maintainers labels Oct 10, 2024
Copy link

boring-cyborg bot commented Oct 10, 2024

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

@leandrodamascena
Copy link
Contributor

Hey @cpossinger! Thanks for opening this issue to ask for this support to improve Powertools. Let me share some information with you and explain why we can't add this support now.

The Batch Processing utility is designed to handle PartialFailures when processing batches from some specific AWS services, which are: SQS, DynamoDB Stream and Kineses. It reports batch item failures to reduce the number of retries for failed records, providing a simple interface to process each batch record individually sync or async. This approach allows for more efficient processing by only retrying failed items rather than the entire batch.

Streaming from Apache Kafka (MSK) to AWS Lambda does not support partial batch failure reporting, and If any record in the batch fails to process, Lambda considers the entire batch as failed and retries the whole batch. So, there is no gain in adding partial failure reporting for BatchProcessing from Amazon MSK to Lambda, as this feature is not supported.

We will continue to monitor AWS updates for any future support of this feature and then add this in Powertools.

Event Source Mapping documentation - https://docs.aws.amazon.com/cli/latest/reference/lambda/create-event-source-mapping.html

--function-response-types (list)
(Kinesis, DynamoDB Streams, and Amazon SQS) A list of current response type enums applied to the event source mapping.
Where valid values are:
ReportBatchItemFailures

Please let me know if you have any additional question.

@leandrodamascena leandrodamascena removed the triage Pending triage from maintainers label Oct 10, 2024
@cpossinger
Copy link
Author

I see this makes sense thanks for the clarification. You should also let your principal solutions architect know because that article is misleading if you're trying to use msk.

It looks like the msk event source has an on_failure destination which can send the failed batch to a dead letter queue which is fine.

I'd like to still have independent concurrent processes for each record in the batch with the multiprocess module and log failed processes in the lambda function. Have the child process errors be logged and raise a parent exception if there are one or more child errors.

Do you have any guidance/best practices for how to do this?

@leandrodamascena
Copy link
Contributor

I see this makes sense thanks for the clarification. You should also let your principal solutions architect know because that article is misleading if you're trying to use msk.

I have read the article a few times, but I couldn't find the part that might be misleading about the use of MSK. However, I understand that I might have missed something. Could you please help me identify the specific areas you think need improvement? Then we can work on making it better, if necessary.

I'd like to still have independent concurrent processes for each record in the batch with the multiprocess module and log failed processes in the lambda function. Have the child process errors be logged and raise a parent exception if there are one or more child errors.

Interesting question. If I understood correctly, you want to process each record in the batch independently using the multiprocess module to take advantage of parallelism and speed up execution with many tasks. Additionally, you need to be able to catch exceptions from each task and log them, right? I worked on some quick code to do this. While I wouldn't recommend you put this into production without proper testing and changes, it might be a good starting point to try.

code

import json
import base64
import asyncio
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()
tracer = Tracer()

@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext):
    try:
        records = event['records']['mytopic-0']
        logger.info(f"Processing {len(records)} records from mytopic-0")
        
        results = process_batch(records)
        
        logger.info(f"Successfully processed {len(results)} records")

    except Exception as e:
        logger.exception(e)
        
def process_batch(records):
    logger.info(f"Starting batch processing with {len(records)} records")
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(
        *[process_record(record) for record in records]
    ))
    
    errors = [r for r in results if r is None]
    if errors:
        raise Exception(f"{len(errors)} records failed processing")
    
    return results

@tracer.capture_method
async def process_record(record):
    try:
        decoded_key = base64.b64decode(record['key']).decode('utf-8')
        decoded_value = base64.b64decode(record['value']).decode('utf-8')
        json_value = json.loads(decoded_value)
        
        logger.info(f"Processing record: Key={decoded_key}, Offset={record['offset']}")
        
        # Simulate some work
        await asyncio.sleep(0.5)
        
        # Simulate an error for keys containing 'recordKey5'.. Comment it to process the entire batch
        if 'recordKey5' in decoded_key:
            raise ValueError(f"Simulated error for key: {decoded_key}")
        
        logger.info(f"Completed processing record: Key={decoded_key}, Offset={record['offset']}")
        return json_value
    except Exception as e:
        logger.error(f"Error processing record with key {decoded_key}: {str(e)}")
        return None

payload

{
    "eventSource":"aws:SelfManagedKafka",
    "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5MQ==",  
                "value": "eyJpZCI6IDEsICJuYW1lIjogIkpvaG4gRG9lIiwgImFnZSI6IDMwfQ==",
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 16,
                "timestamp": 1545084651000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5Mg==",
                "value": "eyJpZCI6IDIsICJuYW1lIjogIkphbmUgU21pdGgiLCAiYWdlIjogMjV9", 
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 17,
                "timestamp": 1545084652000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5Mw==",
                "value": "eyJpZCI6IDMsICJuYW1lIjogIkFsaWNlIEpvaG5zb24iLCAiYWdlIjogMjh9",
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 18,
                "timestamp": 1545084653000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5NA==", 
                "value": "eyJpZCI6IDQsICJuYW1lIjogIkJvYiBCcm93biIsICJhZ2UiOiA0MH0=", 
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 19,
                "timestamp": 1545084654000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5NQ==",  
                "value": "eyJpZCI6IDUsICJuYW1lIjogIkNoYXJsaWUgV2lsc29uIiwgImFnZSI6IDM1fQ==",  
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 20,
                "timestamp": 1545084655000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5Ng==",  
                "value": "eyJpZCI6IDYsICJuYW1lIjogIkRhdmlkIExlZSIsICJhZ2UiOiA0NX0=", 
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 21,
                "timestamp": 1545084656000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5Nw==",  
                "value": "eyJpZCI6IDcsICJuYW1lIjogIkV2YSBHcmVlbiIsICJhZ2UiOiAzM30=",
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 22,
                "timestamp": 1545084657000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5OA==", 
                "value": "eyJpZCI6IDgsICJuYW1lIjogIkZyYW5rIFdoaXRlIiwgImFnZSI6IDM4fQ==",  
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 23,
                "timestamp": 1545084658000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5OQ==", 
                "value": "eyJpZCI6IDksICJuYW1lIjogIkdyYWNlIFRheWxvciIsICJhZ2UiOiAyOX0=",  
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            },
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 24,
                "timestamp": 1545084659000,
                "timestampType": "CREATE_TIME",
                "key": "cmVjb3JkS2V5MTA=",
                "value": "eyJpZCI6IDEwLCAibmFtZSI6ICJIYW5uYWggQnJvd24iLCAiYWdlIjogMzF9", 
                "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
            }
        ]
    }
  }

output

START RequestId: e40a3ae3-fdc4-4071-8737-a4b79650bf0b Version: $LATEST
{"level":"INFO","location":"lambda_handler:14","message":"Processing 10 records from mytopic-0","timestamp":"2024-10-10 22:04:36,562+0000","service":"service_undefined"}
{"level":"INFO","location":"process_batch:24","message":"Starting batch processing with 10 records","timestamp":"2024-10-10 22:04:36,564+0000","service":"service_undefined"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey1, Offset=15","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-1"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey2, Offset=16","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-2"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey3, Offset=17","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-3"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey4, Offset=18","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-4"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey5, Offset=19","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-5"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey6, Offset=20","timestamp":"2024-10-10 22:04:36,566+0000","service":"service_undefined","taskName":"Task-6"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey7, Offset=21","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-7"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey8, Offset=22","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-8"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey9, Offset=23","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-9"}
{"level":"INFO","location":"process_record:44","message":"Processing record: Key=recordKey10, Offset=24","timestamp":"2024-10-10 22:04:36,567+0000","service":"service_undefined","taskName":"Task-10"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey1, Offset=15","timestamp":"2024-10-10 22:04:37,075+0000","service":"service_undefined","taskName":"Task-1"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey2, Offset=16","timestamp":"2024-10-10 22:04:37,078+0000","service":"service_undefined","taskName":"Task-2"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey3, Offset=17","timestamp":"2024-10-10 22:04:37,079+0000","service":"service_undefined","taskName":"Task-3"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey4, Offset=18","timestamp":"2024-10-10 22:04:37,080+0000","service":"service_undefined","taskName":"Task-4"}
{"level":"ERROR","location":"process_record:56","message":"Error processing record with key recordKey5: Simulated error for key: recordKey5","timestamp":"2024-10-10 22:04:37,080+0000","service":"service_undefined","taskName":"Task-5"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey6, Offset=20","timestamp":"2024-10-10 22:04:37,081+0000","service":"service_undefined","taskName":"Task-6"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey7, Offset=21","timestamp":"2024-10-10 22:04:37,082+0000","service":"service_undefined","taskName":"Task-7"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey8, Offset=22","timestamp":"2024-10-10 22:04:37,082+0000","service":"service_undefined","taskName":"Task-8"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey9, Offset=23","timestamp":"2024-10-10 22:04:37,083+0000","service":"service_undefined","taskName":"Task-9"}
{"level":"INFO","location":"process_record:53","message":"Completed processing record: Key=recordKey10, Offset=24","timestamp":"2024-10-10 22:04:37,083+0000","service":"service_undefined","taskName":"Task-10"}
{"level":"ERROR","location":"lambda_handler:21","message":"1 records failed processing","timestamp":"2024-10-10 22:04:37,085+0000","service":"service_undefined","exception":"Traceback (most recent call last):\n  File \"/var/task/app.py\", line 16, in lambda_handler\n    results = process_batch(records)\n              ^^^^^^^^^^^^^^^^^^^^^^\n  File \"/var/task/app.py\", line 33, in process_batch\n    raise Exception(f\"{len(errors)} records failed processing\")\nException: 1 records failed processing","exception_name":"Exception","stack_trace":{"type":"Exception","value":"1 records failed processing","module":"builtins","frames":[{"file":"/var/task/app.py","line":16,"function":"lambda_handler","statement":"results = process_batch(records)"},{"file":"/var/task/app.py","line":33,"function":"process_batch","statement":"raise Exception(f\"{len(errors)} records failed processing\")"}]}}
END RequestId: ac8f886e-9a73-4955-b1db-c3d265c37cdc
REPORT RequestId: ac8f886e-9a73-4955-b1db-c3d265c37cdc	Init Duration: 0.24 ms	Duration: 828.81 ms	Billed Duration: 829 ms	Memory Size: 128 MB	Max Memory Used: 128 MB	
null

You can improve this code by adding the event source class for better type hinting, for example. Or using the parser for validation, and implementing the Idempotency decorator to ensure each record is processed only once.

Thanks for this great discussion; I really enjoy talking about these topics!

@cpossinger
Copy link
Author

Thanks for the code snippet I ended up changing some things for my use case and got things working. I reduced my average lambda execution time from 80ms to 50ms and achieved better fault tolerance with the added concurrency.

I think my confusion with the article is in the first paragraph where msk is grouped with the other streams and queues. I assumed that powertools could do batch processing with all of those tools listed together.

@leandrodamascena
Copy link
Contributor

Hi @cpossinge it's great to hear that you're now processing messages faster!
I'm closing this issue, and please open a new one or a discussion if you need additional help.

Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request feature request
Projects
Status: Coming soon
Development

No branches or pull requests

2 participants