Skip to content

bug: Incorrect behavior of receive message in sqs fifo queue with same group ID #8724

@shawn-ann

Description

@shawn-ann

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

Prerequisites: Using Spring JMS and AWS sdk 1.x to receive messages

  1. Create a fifo queue
awslocal sqs create-queue --queue-name example.fifo --attributes '{"VisibilityTimeout":"60","FifoQueue":"true","ContentBasedDeduplication":"true","DeduplicationScope":"messageGroup","FifoThroughputLimit":"perMessageGroupId"}'
  1. Send a few messages to a fifo queue with a same group ID
  2. Using 5 threads to receive and process messages through Spring JMS, it takes 50 seconds to process a message (Thread.sleep).
public static void main(String[] args) throws Exception {
        AmazonSQSAsync sqsClient = AmazonSQSAsyncClientBuilder.standard()
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(SQSConfig.serviceEndpoint, "us-east-1"))
                .build();
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqsClient);

        MessageListener messageListener = new ReceiverCallback();
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setDestinationName(SQSConfig.queueName);

        listenerContainer.setSessionTransacted(false);
        listenerContainer.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        listenerContainer.setMessageListener(messageListener);
        listenerContainer.setConcurrentConsumers(5); 
        listenerContainer.setAutoStartup(true);
        listenerContainer.initialize();
        listenerContainer.start();
    }

    public static class ReceiverCallback implements MessageListener {
        // Used to listen for message silence
        private final Map<String, List<String>> messageGroups = new ConcurrentHashMap<>();

        @Override
        public void onMessage(Message message) {
            try {
                String groupId = message.getStringProperty("JMSXGroupID");
                System.out.println(String.format("%s %s receive,Group ID: %s, start..",
                        Thread.currentThread().getName(),
                        System.currentTimeMillis(), groupId));
                Thread.sleep(50000);
                message.acknowledge();

                System.out.println(String.format("%s %s receive,Group ID: %s ,done",
                        Thread.currentThread().getName(),
                        System.currentTimeMillis(), groupId));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

According to the [FIFO delivery logic]((https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html). The second message will not be processed if the first message has not been processed.
It means only one thread can receive and process message, the others will wait. But both 5 threads can receive and process the message.

But the current behavior is the second message will also be processed if the first one is not finished.

The logs looks like below:

DefaultMessageListenerContainer-4 1689768077073 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768077309 receive,Group ID: Default, start..
DefaultMessageListenerContainer-1 1689768078329 receive,Group ID: Default, start..
DefaultMessageListenerContainer-3 1689768078438 receive,Group ID: Default, start..
DefaultMessageListenerContainer-2 1689768080545 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689768127150 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768127150 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768127339 receive,Group ID: Default ,done
DefaultMessageListenerContainer-5 1689768127340 receive,Group ID: Default, start..
DefaultMessageListenerContainer-1 1689768128372 receive,Group ID: Default ,done

Expected Behavior

If we change the SQS to connect to AWS SQS, The second message will not be processed if the first message has not been processed.

The log looks like below:

DefaultMessageListenerContainer-5 1689768861280 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768913178 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768913182 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689768963529 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768963543 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689769013939 receive,Group ID: Default ,done

Create SQS command:

aws sqs create-queue --queue-name example.fifo --attributes '{"VisibilityTimeout":"60","FifoQueue":"true","ContentBasedDeduplication":"true","DeduplicationScope":"messageGroup","FifoThroughputLimit":"perMessageGroupId"}'

How are you starting LocalStack?

With a docker run command

Steps To Reproduce

How are you starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)

docker run localstack/localstack

Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)

awslocal s3 mb s3://mybucket

Environment

- OS: Ubuntu & MAC OSX
- LocalStack: 2.1.0
Java DEMO: https://github.com/shawn-ann/sqs_demo

Anything else?

I found an issue and it was fixed in 2.1.0, but the behavior is still a bit weird.

Metadata

Metadata

Assignees

Labels

aws:sqsAmazon Simple Queue Servicestatus: backlogTriaged but not yet being worked ontype: bugBug report

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions