Skip to content

Add AmqpMessageListenerContainer for AMQP 1.0#3281

Merged
cppwfs merged 2 commits intospring-projects:mainfrom
artembilan:amqp10-listener-container
Jan 13, 2026
Merged

Add AmqpMessageListenerContainer for AMQP 1.0#3281
cppwfs merged 2 commits intospring-projects:mainfrom
artembilan:amqp10-listener-container

Conversation

@artembilan
Copy link
Copy Markdown
Member

  • Implement the container a similar way to the RabbitAmqpListenerContainer
  • Use some reflection for calling internal ProtonJ API
  • Therefore, expose AmqpRuntimeHints for AOT
  • Add useful setReceivedRoutingKey(message.to()) mapping into the ProtonUtils.fromProtonMessage()
  • Guard client exceptions parsing against possible nullable ErrorCondition
  • Document AmqpMessageListenerContainer

* Implement the container a similar way to the `RabbitAmqpListenerContainer`
* Use some reflection for calling internal ProtonJ API
* Therefore, expose `AmqpRuntimeHints` for AOT
* Add useful `setReceivedRoutingKey(message.to())` mapping into the `ProtonUtils.fromProtonMessage()`
* Guard client exceptions parsing against possible nullable `ErrorCondition`
* Document `AmqpMessageListenerContainer`
@artembilan artembilan added this to the 4.1.0-M1 milestone Jan 9, 2026
@artembilan artembilan requested a review from cppwfs January 9, 2026 20:26
@artembilan
Copy link
Copy Markdown
Member Author

This is like initial PoC.
There are still a lot of work for "native" listener contract.
Then @AmqpListener support.
Separate PRs, though.
Thanks

Copy link
Copy Markdown
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work here.

Helllooooo AmqpMessageListenerContainer!

consumer.pause();
try (consumer) {
while (consumer.queuedDeliveries() > 0) {
Thread.sleep(100);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This is a busy-wait tick to avoid blocking CPU.
Kinda typical pattern for simple loop-based short condition barriers.

@SuppressWarnings("NullAway")
AmqpConsumer(ClientReceiver receiver) {
this.receiver = receiver;
this.protonReceiver = (ProtonReceiver) ReflectionUtils.invokeMethod(PROTON_RECEIVER_METHOD, receiver);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see where we suppress the nulls for this method. But is it possible that these invokes would return a null? Just curious.

Copy link
Copy Markdown
Member Author

@artembilan artembilan Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See ClientReceiverLinkType.protonReceiver property and how it is populated.
Essentially that is done in the ClientReceiverBuilder.receiver().
So, this protonLink() never returns null.

* Improve Javadoc in the `aot/package-info.java`
* Improve `AmqpMessageListenerContainerTests.pauseAndResumeContainer()`
to check receiver credits before and after `pause/resume`
* Verify the `AmqpMessageListenerContainerTests` configuration for AOP proxy
* Make `AmqpMessageListenerContainer.autoAccept` as `true` by default
as it was implied originally
* Consistency in the code style for the `AmqpMessageListenerContainer.proxy` assignment
* Remove redundant `Assert.notNull()`from the `AmqpMessageListenerContainer.setTaskExecutor()`
since NullAway covers expectations
@artembilan artembilan requested a review from cppwfs January 13, 2026 14:41
@artembilan
Copy link
Copy Markdown
Member Author

Thank you for review, @cppwfs !

I left a couple comments unresolved for your consideration: no code change, but question is answered.

Copy link
Copy Markdown
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks for the great work!

On the last commit you forgot to enter a blank line between the title and the body. 😉

@cppwfs cppwfs merged commit f573758 into spring-projects:main Jan 13, 2026
3 checks passed
@artembilan artembilan deleted the amqp10-listener-container branch January 14, 2026 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants