-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
Prerequisites: Using Spring JMS and AWS sdk 1.x to receive messages
- Create a fifo queue
awslocal sqs create-queue --queue-name example.fifo --attributes '{"VisibilityTimeout":"60","FifoQueue":"true","ContentBasedDeduplication":"true","DeduplicationScope":"messageGroup","FifoThroughputLimit":"perMessageGroupId"}'
- Send a few messages to a fifo queue with a same group ID
- Using 5 threads to receive and process messages through Spring JMS, it takes 50 seconds to process a message (Thread.sleep).
public static void main(String[] args) throws Exception {
AmazonSQSAsync sqsClient = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(SQSConfig.serviceEndpoint, "us-east-1"))
.build();
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqsClient);
MessageListener messageListener = new ReceiverCallback();
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setDestinationName(SQSConfig.queueName);
listenerContainer.setSessionTransacted(false);
listenerContainer.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
listenerContainer.setMessageListener(messageListener);
listenerContainer.setConcurrentConsumers(5);
listenerContainer.setAutoStartup(true);
listenerContainer.initialize();
listenerContainer.start();
}
public static class ReceiverCallback implements MessageListener {
// Used to listen for message silence
private final Map<String, List<String>> messageGroups = new ConcurrentHashMap<>();
@Override
public void onMessage(Message message) {
try {
String groupId = message.getStringProperty("JMSXGroupID");
System.out.println(String.format("%s %s receive,Group ID: %s, start..",
Thread.currentThread().getName(),
System.currentTimeMillis(), groupId));
Thread.sleep(50000);
message.acknowledge();
System.out.println(String.format("%s %s receive,Group ID: %s ,done",
Thread.currentThread().getName(),
System.currentTimeMillis(), groupId));
} catch (Exception e) {
e.printStackTrace();
}
}
}According to the [FIFO delivery logic]((https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html). The second message will not be processed if the first message has not been processed.
It means only one thread can receive and process message, the others will wait. But both 5 threads can receive and process the message.
But the current behavior is the second message will also be processed if the first one is not finished.
The logs looks like below:
DefaultMessageListenerContainer-4 1689768077073 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768077309 receive,Group ID: Default, start..
DefaultMessageListenerContainer-1 1689768078329 receive,Group ID: Default, start..
DefaultMessageListenerContainer-3 1689768078438 receive,Group ID: Default, start..
DefaultMessageListenerContainer-2 1689768080545 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689768127150 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768127150 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768127339 receive,Group ID: Default ,done
DefaultMessageListenerContainer-5 1689768127340 receive,Group ID: Default, start..
DefaultMessageListenerContainer-1 1689768128372 receive,Group ID: Default ,done
Expected Behavior
If we change the SQS to connect to AWS SQS, The second message will not be processed if the first message has not been processed.
The log looks like below:
DefaultMessageListenerContainer-5 1689768861280 receive,Group ID: Default, start..
DefaultMessageListenerContainer-5 1689768913178 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768913182 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689768963529 receive,Group ID: Default ,done
DefaultMessageListenerContainer-4 1689768963543 receive,Group ID: Default, start..
DefaultMessageListenerContainer-4 1689769013939 receive,Group ID: Default ,done
Create SQS command:
aws sqs create-queue --queue-name example.fifo --attributes '{"VisibilityTimeout":"60","FifoQueue":"true","ContentBasedDeduplication":"true","DeduplicationScope":"messageGroup","FifoThroughputLimit":"perMessageGroupId"}'How are you starting LocalStack?
With a docker run command
Steps To Reproduce
How are you starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)
docker run localstack/localstack
Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)
awslocal s3 mb s3://mybucket
Environment
- OS: Ubuntu & MAC OSX
- LocalStack: 2.1.0
Java DEMO: https://github.com/shawn-ann/sqs_demoAnything else?
I found an issue and it was fixed in 2.1.0, but the behavior is still a bit weird.