-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
When concurrent requests are made to a FIFO queue, requests return messages from the same MessageGroupId.
Scenario Example
Consider we have a FIFO queue with 10 messages from the same MessageGroupId.
Then we make two concurrent receiveMessage requests for 5 messages each.
According to SQS FIFO contract, we should receive only 5 messages in the first request, and none on the second request, as long as Visibility Timeout is not expired.
With LocalStack 1.0.4 wrongly we receive all 10 messages, 5 in each request.
When the same is done targeting AWS, the behavior is correct, the first request returns 5 messages and the second one no messages.
Expected Behavior
We should not receive messages from the same MessageGroupId for concurrent requests to FIFO queues.
How are you starting LocalStack?
Custom (please describe below)
Steps To Reproduce
How are you starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)
With TestContainers
private static final String LOCAL_STACK_VERSION = "localstack/localstack:1.0.4";
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION))
.withServices(SQS).withReuse(true);I've tried with reuse(false) and the behavior is unchanged.
Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)
I created a JUnit5 test that executes the steps I've described in the previous section and that reproduces the behavior. When targeting AWS the test passes, when targeting LocalStack it doesn't. All results are consistent throughout many test runs.
@Test
void receivesTwoBatchesFromSameMessageGroup() {
String messageGroupId = UUID.randomUUID().toString();
String queueName = "my-fifo-queue.fifo";
logger.info("Create queue {}", queueName);
String queueUrl = sqsAsyncClient.createQueue(CreateQueueRequest.builder()
.queueName(queueName)
.attributes(Collections.singletonMap(QueueAttributeName.FIFO_QUEUE, "true"))
.build())
.join()
.queueUrl();
logger.info("Send 10 messages to url {} for message group {}", queueUrl, messageGroupId);
List<SendMessageBatchRequestEntry> entries = IntStream.range(0, 10)
.mapToObj(index -> SendMessageBatchRequestEntry
.builder()
.messageGroupId(messageGroupId)
.messageBody("test - " + index)
.messageDeduplicationId(UUID.randomUUID().toString())
.id(UUID.randomUUID().toString())
.build())
.collect(toList());
sqsAsyncClient.sendMessageBatch(SendMessageBatchRequest
.builder()
.entries(entries)
.queueUrl(queueUrl)
.build())
.join();
logger.info("Receive two batches of 5 messages from queue {}", queueUrl);
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture1 = sqsAsyncClient.receiveMessage(ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL)
.waitTimeSeconds(10)
.receiveRequestAttemptId(UUID.randomUUID().toString())
.maxNumberOfMessages(5)
.build());
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture2 = sqsAsyncClient.receiveMessage(ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL)
.receiveRequestAttemptId(UUID.randomUUID().toString())
.waitTimeSeconds(10)
.maxNumberOfMessages(5)
.build());
CompletableFuture.allOf(receiveMessageFuture1, receiveMessageFuture2).join();
logger.info("Received two batches from queue {}", queueUrl);
List<software.amazon.awssdk.services.sqs.model.Message> firstBatch = receiveMessageFuture1.join().messages().stream()
.filter(message -> message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID).equals(messageGroupId))
.collect(toList());
List<software.amazon.awssdk.services.sqs.model.Message> secondBatch = receiveMessageFuture2.join().messages().stream()
.filter(message -> message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID).equals(messageGroupId))
.collect(toList());
logger.info("First batch: {} messages Second Batch: {} messages", firstBatch.size(), secondBatch.size());
List<software.amazon.awssdk.services.sqs.model.Message> allMessages = new ArrayList<>(firstBatch);
allMessages.addAll(secondBatch);
// Make sure we're not receiving the same messages
long distinctCount = allMessages.stream().map(software.amazon.awssdk.services.sqs.model.Message::body).distinct().count();
assertThat(distinctCount).isEqualTo(5);
assertThat(allMessages).hasSize(5);
}Environment
- OS: MacOS Monterey 12.5
- LocalStack: 1.0.4Anything else?
Execution logs targeting LocalStack:
Execution logs targeting AWS:

