|
5 | 5 | import pytest |
6 | 6 | from botocore.exceptions import ClientError |
7 | 7 |
|
| 8 | +from localstack.aws.api.lambda_ import Runtime |
8 | 9 | from localstack.services.awslambda.lambda_api import ( |
9 | 10 | BATCH_SIZE_RANGES, |
10 | 11 | INVALID_PARAMETER_VALUE_EXCEPTION, |
@@ -1087,3 +1088,71 @@ def test_sqs_invalid_event_filter( |
1087 | 1088 |
|
1088 | 1089 |
|
1089 | 1090 | # TODO: test integration with lambda logs |
| 1091 | + |
| 1092 | + |
| 1093 | +TEST_LAMBDA_SLEEP = os.path.join(THIS_FOLDER, "functions/lambda_sleep.py") |
| 1094 | + |
| 1095 | + |
| 1096 | +class TestFifoQueueMapping: |
| 1097 | + def test_fifo_behavior( |
| 1098 | + self, |
| 1099 | + sqs_client, |
| 1100 | + create_lambda_function, |
| 1101 | + lambda_client, |
| 1102 | + lambda_su_role, |
| 1103 | + logs_client, |
| 1104 | + cleanups, |
| 1105 | + ): |
| 1106 | + # create FIFO queue |
| 1107 | + queue_name = f"test-queue-{short_uid()}.fifo" |
| 1108 | + create_queue_result = sqs_client.create_queue( |
| 1109 | + QueueName=queue_name, |
| 1110 | + Attributes={ |
| 1111 | + "FifoQueue": "true", |
| 1112 | + "ContentBasedDeduplication": "true", |
| 1113 | + "VisibilityTimeout": str(10 * 6), |
| 1114 | + }, |
| 1115 | + ) |
| 1116 | + queue_url = create_queue_result["QueueUrl"] |
| 1117 | + queue_arn = sqs_client.get_queue_attributes( |
| 1118 | + QueueUrl=queue_url, AttributeNames=["QueueArn"] |
| 1119 | + )["Attributes"]["QueueArn"] |
| 1120 | + |
| 1121 | + message_group_id = "fixed-message-group-id-test" |
| 1122 | + |
| 1123 | + # create a lambda to process messages |
| 1124 | + function_name = f"function-name-{short_uid()}" |
| 1125 | + |
| 1126 | + create_lambda_function( |
| 1127 | + func_name=function_name, |
| 1128 | + handler_file=TEST_LAMBDA_SLEEP, |
| 1129 | + runtime=Runtime.python3_9, |
| 1130 | + role=lambda_su_role, |
| 1131 | + timeout=10, |
| 1132 | + Environment={"Variables": {"TEST_SLEEP_S": "5"}}, |
| 1133 | + ) |
| 1134 | + |
| 1135 | + # create event source mapping |
| 1136 | + create_esm_result = lambda_client.create_event_source_mapping( |
| 1137 | + FunctionName=function_name, EventSourceArn=queue_arn, Enabled=False, BatchSize=1 |
| 1138 | + ) |
| 1139 | + esm_uuid = create_esm_result["UUID"] |
| 1140 | + cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=esm_uuid)) |
| 1141 | + |
| 1142 | + # send messages |
| 1143 | + for i in range(5): |
| 1144 | + sqs_client.send_message( |
| 1145 | + QueueUrl=queue_url, MessageBody=f"message-{i}", MessageGroupId=message_group_id |
| 1146 | + ) |
| 1147 | + |
| 1148 | + # enable event source mapping |
| 1149 | + lambda_client.update_event_source_mapping(UUID=esm_uuid, Enabled=True) |
| 1150 | + _await_event_source_mapping_enabled(lambda_client, esm_uuid) |
| 1151 | + |
| 1152 | + # since the lambda has to be called in-order anyway, there shouldn't be any parallel executions |
| 1153 | + log_group_name = f"/aws/lambda/{function_name}" |
| 1154 | + |
| 1155 | + time.sleep(60) |
| 1156 | + |
| 1157 | + log_streams = logs_client.describe_log_streams(logGroupName=log_group_name) |
| 1158 | + assert len(log_streams["logStreams"]) == 1 |
0 commit comments