3030 get_invoke_init_type ,
3131 update_done ,
3232)
33- from localstack .testing .aws .util import create_client_with_keys , is_aws_cloud
33+ from localstack .testing .aws .util import (
34+ create_client_with_keys ,
35+ is_aws_cloud ,
36+ )
3437from localstack .testing .pytest import markers
3538from localstack .testing .snapshots .transformer_utility import PATTERN_UUID
3639from localstack .utils import files , platform , testutil
123126TEST_LAMBDA_PYTHON_MULTIPLE_HANDLERS = os .path .join (
124127 THIS_FOLDER , "functions/lambda_multiple_handlers.py"
125128)
129+ TEST_LAMBDA_NOTIFIER = os .path .join (THIS_FOLDER , "functions/lambda_notifier.py" )
126130
127131PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED ["python" ]
128132NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED ["nodejs" ]
@@ -2614,18 +2618,37 @@ def _invoke_lambda():
26142618 assert not errored
26152619
26162620 @markers .aws .validated
2617- @pytest .mark .skip (reason = "flaky" )
2618- def test_reserved_concurrency_async_queue (self , create_lambda_function , snapshot , aws_client ):
2621+ def test_reserved_concurrency_async_queue (
2622+ self ,
2623+ create_lambda_function ,
2624+ sqs_create_queue ,
2625+ sqs_collect_messages ,
2626+ snapshot ,
2627+ aws_client ,
2628+ aws_client_no_retry ,
2629+ ):
2630+ """Test async/event invoke retry behavior due to limited reserved concurrency.
2631+ Timeline:
2632+ 1) Set ReservedConcurrentExecutions=1
2633+ 2) sync_invoke_warm_up => ok
2634+ 3) async_invoke_one => ok
2635+ 4) async_invoke_two => gets retried
2636+ 5) sync invoke => fails with TooManyRequestsException
2637+ 6) Set ReservedConcurrentExecutions=3
2638+ 7) sync_invoke_final => ok
2639+ """
26192640 min_concurrent_executions = 10 + 3
26202641 check_concurrency_quota (aws_client , min_concurrent_executions )
26212642
2643+ queue_url = sqs_create_queue ()
2644+
26222645 func_name = f"test_lambda_{ short_uid ()} "
26232646 create_lambda_function (
26242647 func_name = func_name ,
2625- handler_file = TEST_LAMBDA_INTROSPECT_PYTHON ,
2648+ handler_file = TEST_LAMBDA_NOTIFIER ,
26262649 runtime = Runtime .python3_12 ,
26272650 client = aws_client .lambda_ ,
2628- timeout = 20 ,
2651+ timeout = 30 ,
26292652 )
26302653
26312654 fn = aws_client .lambda_ .get_function_configuration (
@@ -2641,46 +2664,75 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot
26412664 snapshot .match ("put_fn_concurrency" , put_fn_concurrency )
26422665
26432666 # warm up the Lambda function to mitigate flakiness due to cold start
2644- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2667+ sync_invoke_warm_up = aws_client .lambda_ .invoke (
2668+ FunctionName = fn_arn , InvocationType = "RequestResponse"
2669+ )
2670+ assert "FunctionError" not in sync_invoke_warm_up
26452671
2646- # simultaneously queue two event invocations
2647- # The first event invoke gets executed immediately
2648- aws_client .lambda_ .invoke (
2649- FunctionName = fn_arn , InvocationType = "Event" , Payload = json .dumps ({"wait" : 15 })
2672+ # Immediately queue two event invocations:
2673+ # 1) The first event invoke gets executed immediately
2674+ async_invoke_one = aws_client .lambda_ .invoke (
2675+ FunctionName = fn_arn ,
2676+ InvocationType = "Event" ,
2677+ Payload = json .dumps ({"notify" : queue_url , "wait" : 15 }),
26502678 )
2651- # The second event invoke gets throttled and re-scheduled with an internal retry
2652- aws_client .lambda_ .invoke (
2653- FunctionName = fn_arn , InvocationType = "Event" , Payload = json .dumps ({"wait" : 10 })
2679+ assert "FunctionError" not in async_invoke_one
2680+ # 2) The second event invoke gets throttled and re-scheduled with an internal retry
2681+ async_invoke_two = aws_client .lambda_ .invoke (
2682+ FunctionName = fn_arn ,
2683+ InvocationType = "Event" ,
2684+ Payload = json .dumps ({"notify" : queue_url }),
26542685 )
2686+ assert "FunctionError" not in async_invoke_two
26552687
2656- # Ensure one event invocation is being executed and the other one is in the queue.
2657- time .sleep (5 )
2688+ # Wait until the first async invoke is being executed while the second async invoke is in the queue.
2689+ messages = sqs_collect_messages (
2690+ queue_url ,
2691+ expected = 1 ,
2692+ timeout = 15 ,
2693+ )
2694+ async_invoke_one_notification = json .loads (messages [0 ]["Body" ])
2695+ assert (
2696+ async_invoke_one_notification ["request_id" ]
2697+ == async_invoke_one ["ResponseMetadata" ]["RequestId" ]
2698+ )
26582699
26592700 # Synchronous invocations raise an exception because insufficient reserved concurrency is available
2701+ # It is important to disable botocore retries because the concurrency limit is time-bound because it only
2702+ # triggers as long as the first async invoke is processing!
26602703 with pytest .raises (aws_client .lambda_ .exceptions .TooManyRequestsException ) as e :
2661- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2704+ aws_client_no_retry .lambda_ .invoke (
2705+ FunctionName = fn_arn , InvocationType = "RequestResponse"
2706+ )
26622707 snapshot .match ("too_many_requests_exc" , e .value .response )
26632708
26642709 # ReservedConcurrentExecutions=2 is insufficient because the throttled async event invoke might be
26652710 # re-scheduled before the synchronous invoke while the first async invoke is still running.
26662711 aws_client .lambda_ .put_function_concurrency (
26672712 FunctionName = func_name , ReservedConcurrentExecutions = 3
26682713 )
2669- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2670-
2671- def assert_events ():
2672- log_events = aws_client .logs .filter_log_events (
2673- logGroupName = f"/aws/lambda/{ func_name } " ,
2674- )["events" ]
2675- invocation_count = len (
2676- [event ["message" ] for event in log_events if event ["message" ].startswith ("REPORT" )]
2677- )
2678- assert invocation_count == 4
2679-
2680- retry (assert_events , retries = 120 , sleep = 2 )
2714+ # Invocations succeed after raising reserved concurrency
2715+ sync_invoke_final = aws_client .lambda_ .invoke (
2716+ FunctionName = fn_arn ,
2717+ InvocationType = "RequestResponse" ,
2718+ Payload = json .dumps ({"notify" : queue_url }),
2719+ )
2720+ assert "FunctionError" not in sync_invoke_final
26812721
2682- # TODO: snapshot logs & request ID for correlation after request id gets propagated
2683- # https://github.com/localstack/localstack/pull/7874
2722+ # Contains the re-queued `async_invoke_two` and the `sync_invoke_final`, but the order might differ
2723+ # depending on whether invoke_two gets re-schedule before or after the final invoke.
2724+ # AWS docs: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async-error-handling.html
2725+ # "The retry interval increases exponentially from 1 second after the first attempt to a maximum of 5 minutes."
2726+ final_messages = sqs_collect_messages (
2727+ queue_url ,
2728+ expected = 2 ,
2729+ timeout = 20 ,
2730+ )
2731+ invoked_request_ids = {json .loads (msg ["Body" ])["request_id" ] for msg in final_messages }
2732+ assert {
2733+ async_invoke_two ["ResponseMetadata" ]["RequestId" ],
2734+ sync_invoke_final ["ResponseMetadata" ]["RequestId" ],
2735+ } == invoked_request_ids
26842736
26852737 @markers .snapshot .skip_snapshot_verify (paths = ["$..Attributes.AWSTraceHeader" ])
26862738 @markers .aws .validated
0 commit comments