Skip to content

bug: concurrent requests made to a FIFO queue return messages from the same MessageGroupId #6766

@tomazfernandes

Description

@tomazfernandes

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

When concurrent requests are made to a FIFO queue, requests return messages from the same MessageGroupId.

Scenario Example

Consider we have a FIFO queue with 10 messages from the same MessageGroupId.

Then we make two concurrent receiveMessage requests for 5 messages each.

According to SQS FIFO contract, we should receive only 5 messages in the first request, and none on the second request, as long as Visibility Timeout is not expired.

With LocalStack 1.0.4 wrongly we receive all 10 messages, 5 in each request.

When the same is done targeting AWS, the behavior is correct, the first request returns 5 messages and the second one no messages.

Expected Behavior

We should not receive messages from the same MessageGroupId for concurrent requests to FIFO queues.

How are you starting LocalStack?

Custom (please describe below)

Steps To Reproduce

How are you starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)

With TestContainers

private static final String LOCAL_STACK_VERSION = "localstack/localstack:1.0.4";

static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION))
	.withServices(SQS).withReuse(true);

I've tried with reuse(false) and the behavior is unchanged.

Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)

I created a JUnit5 test that executes the steps I've described in the previous section and that reproduces the behavior. When targeting AWS the test passes, when targeting LocalStack it doesn't. All results are consistent throughout many test runs.

@Test
void receivesTwoBatchesFromSameMessageGroup() {
	String messageGroupId = UUID.randomUUID().toString();
	String queueName = "my-fifo-queue.fifo";
	logger.info("Create queue {}", queueName);
	String queueUrl = sqsAsyncClient.createQueue(CreateQueueRequest.builder()
			.queueName(queueName)
			.attributes(Collections.singletonMap(QueueAttributeName.FIFO_QUEUE, "true"))
			.build())
		.join()
		.queueUrl();
	logger.info("Send 10 messages to url {} for message group {}", queueUrl, messageGroupId);
	List<SendMessageBatchRequestEntry> entries = IntStream.range(0, 10)
		.mapToObj(index -> SendMessageBatchRequestEntry
			.builder()
			.messageGroupId(messageGroupId)
			.messageBody("test - " + index)
			.messageDeduplicationId(UUID.randomUUID().toString())
			.id(UUID.randomUUID().toString())
			.build())
		.collect(toList());
	sqsAsyncClient.sendMessageBatch(SendMessageBatchRequest
		.builder()
			.entries(entries)
		.queueUrl(queueUrl)
		.build())
		.join();
	logger.info("Receive two batches of 5 messages from queue {}", queueUrl);
	CompletableFuture<ReceiveMessageResponse> receiveMessageFuture1 = sqsAsyncClient.receiveMessage(ReceiveMessageRequest
		.builder()
		.queueUrl(queueUrl)
		.attributeNames(QueueAttributeName.ALL)
		.waitTimeSeconds(10)
		.receiveRequestAttemptId(UUID.randomUUID().toString())
		.maxNumberOfMessages(5)
		.build());
	CompletableFuture<ReceiveMessageResponse> receiveMessageFuture2 = sqsAsyncClient.receiveMessage(ReceiveMessageRequest
		.builder()
		.queueUrl(queueUrl)
		.attributeNames(QueueAttributeName.ALL)
		.receiveRequestAttemptId(UUID.randomUUID().toString())
		.waitTimeSeconds(10)
		.maxNumberOfMessages(5)
		.build());
	CompletableFuture.allOf(receiveMessageFuture1, receiveMessageFuture2).join();
	logger.info("Received two batches from queue {}", queueUrl);
	List<software.amazon.awssdk.services.sqs.model.Message> firstBatch = receiveMessageFuture1.join().messages().stream()
		.filter(message -> message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID).equals(messageGroupId))
		.collect(toList());
	List<software.amazon.awssdk.services.sqs.model.Message> secondBatch = receiveMessageFuture2.join().messages().stream()
		.filter(message -> message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID).equals(messageGroupId))
		.collect(toList());
	logger.info("First batch: {} messages Second Batch: {} messages", firstBatch.size(), secondBatch.size());
	List<software.amazon.awssdk.services.sqs.model.Message> allMessages = new ArrayList<>(firstBatch);
	allMessages.addAll(secondBatch);
	// Make sure we're not receiving the same messages
	long distinctCount = allMessages.stream().map(software.amazon.awssdk.services.sqs.model.Message::body).distinct().count();
	assertThat(distinctCount).isEqualTo(5);
	assertThat(allMessages).hasSize(5);

}

Environment

- OS: MacOS Monterey 12.5
- LocalStack: 1.0.4

Anything else?

Execution logs targeting LocalStack:

Screen Shot 2022-08-26 at 19 57 16

Execution logs targeting AWS:

Screen Shot 2022-08-26 at 19 56 12

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions