Skip to content

Commit 9250f1c

Browse files
add test for sqs fifo source order/parallelism
1 parent e20e898 commit 9250f1c

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import os
2+
import time
3+
4+
sleep_duration = int(os.getenv("TEST_SLEEP_S", "0"))
5+
6+
7+
def handler(event, context):
8+
print(f"sleeping for {sleep_duration}")
9+
time.sleep(sleep_duration)
10+
print("done sleeping")
11+
return {"status": "ok"}

tests/integration/awslambda/test_lambda_integration_sqs.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66
from botocore.exceptions import ClientError
77

8+
from localstack.aws.api.lambda_ import Runtime
89
from localstack.services.awslambda.lambda_api import (
910
BATCH_SIZE_RANGES,
1011
INVALID_PARAMETER_VALUE_EXCEPTION,
@@ -1087,3 +1088,71 @@ def test_sqs_invalid_event_filter(
10871088

10881089

10891090
# TODO: test integration with lambda logs
1091+
1092+
1093+
TEST_LAMBDA_SLEEP = os.path.join(THIS_FOLDER, "functions/lambda_sleep.py")
1094+
1095+
1096+
class TestFifoQueueMapping:
1097+
def test_fifo_behavior(
1098+
self,
1099+
sqs_client,
1100+
create_lambda_function,
1101+
lambda_client,
1102+
lambda_su_role,
1103+
logs_client,
1104+
cleanups,
1105+
):
1106+
# create FIFO queue
1107+
queue_name = f"test-queue-{short_uid()}.fifo"
1108+
create_queue_result = sqs_client.create_queue(
1109+
QueueName=queue_name,
1110+
Attributes={
1111+
"FifoQueue": "true",
1112+
"ContentBasedDeduplication": "true",
1113+
"VisibilityTimeout": str(10 * 6),
1114+
},
1115+
)
1116+
queue_url = create_queue_result["QueueUrl"]
1117+
queue_arn = sqs_client.get_queue_attributes(
1118+
QueueUrl=queue_url, AttributeNames=["QueueArn"]
1119+
)["Attributes"]["QueueArn"]
1120+
1121+
message_group_id = "fixed-message-group-id-test"
1122+
1123+
# create a lambda to process messages
1124+
function_name = f"function-name-{short_uid()}"
1125+
1126+
create_lambda_function(
1127+
func_name=function_name,
1128+
handler_file=TEST_LAMBDA_SLEEP,
1129+
runtime=Runtime.python3_9,
1130+
role=lambda_su_role,
1131+
timeout=10,
1132+
Environment={"Variables": {"TEST_SLEEP_S": "5"}},
1133+
)
1134+
1135+
# create event source mapping
1136+
create_esm_result = lambda_client.create_event_source_mapping(
1137+
FunctionName=function_name, EventSourceArn=queue_arn, Enabled=False, BatchSize=1
1138+
)
1139+
esm_uuid = create_esm_result["UUID"]
1140+
cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=esm_uuid))
1141+
1142+
# send messages
1143+
for i in range(5):
1144+
sqs_client.send_message(
1145+
QueueUrl=queue_url, MessageBody=f"message-{i}", MessageGroupId=message_group_id
1146+
)
1147+
1148+
# enable event source mapping
1149+
lambda_client.update_event_source_mapping(UUID=esm_uuid, Enabled=True)
1150+
_await_event_source_mapping_enabled(lambda_client, esm_uuid)
1151+
1152+
# since the lambda has to be called in-order anyway, there shouldn't be any parallel executions
1153+
log_group_name = f"/aws/lambda/{function_name}"
1154+
1155+
time.sleep(60)
1156+
1157+
log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
1158+
assert len(log_streams["logStreams"]) == 1

0 commit comments

Comments
 (0)