Add AmqpMessageListenerContainer for AMQP 1.0#3281
Add AmqpMessageListenerContainer for AMQP 1.0#3281cppwfs merged 2 commits intospring-projects:mainfrom
AmqpMessageListenerContainer for AMQP 1.0#3281Conversation
* Implement the container a similar way to the `RabbitAmqpListenerContainer` * Use some reflection for calling internal ProtonJ API * Therefore, expose `AmqpRuntimeHints` for AOT * Add useful `setReceivedRoutingKey(message.to())` mapping into the `ProtonUtils.fromProtonMessage()` * Guard client exceptions parsing against possible nullable `ErrorCondition` * Document `AmqpMessageListenerContainer`
|
This is like initial PoC. |
cppwfs
left a comment
There was a problem hiding this comment.
Great work here.
Helllooooo AmqpMessageListenerContainer!
spring-amqp-client/src/main/java/org/springframework/amqp/client/aot/package-info.java
Outdated
Show resolved
Hide resolved
...ent/src/main/java/org/springframework/amqp/client/listener/AmqpMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
...ent/src/main/java/org/springframework/amqp/client/listener/AmqpMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
| consumer.pause(); | ||
| try (consumer) { | ||
| while (consumer.queuedDeliveries() > 0) { | ||
| Thread.sleep(100); |
There was a problem hiding this comment.
Should this be configurable?
There was a problem hiding this comment.
I don't think so. This is a busy-wait tick to avoid blocking CPU.
Kinda typical pattern for simple loop-based short condition barriers.
| @SuppressWarnings("NullAway") | ||
| AmqpConsumer(ClientReceiver receiver) { | ||
| this.receiver = receiver; | ||
| this.protonReceiver = (ProtonReceiver) ReflectionUtils.invokeMethod(PROTON_RECEIVER_METHOD, receiver); |
There was a problem hiding this comment.
I see where we suppress the nulls for this method. But is it possible that these invokes would return a null? Just curious.
There was a problem hiding this comment.
See ClientReceiverLinkType.protonReceiver property and how it is populated.
Essentially that is done in the ClientReceiverBuilder.receiver().
So, this protonLink() never returns null.
...rc/test/java/org/springframework/amqp/client/listener/AmqpMessageListenerContainerTests.java
Show resolved
Hide resolved
* Improve Javadoc in the `aot/package-info.java` * Improve `AmqpMessageListenerContainerTests.pauseAndResumeContainer()` to check receiver credits before and after `pause/resume` * Verify the `AmqpMessageListenerContainerTests` configuration for AOP proxy * Make `AmqpMessageListenerContainer.autoAccept` as `true` by default as it was implied originally * Consistency in the code style for the `AmqpMessageListenerContainer.proxy` assignment * Remove redundant `Assert.notNull()`from the `AmqpMessageListenerContainer.setTaskExecutor()` since NullAway covers expectations
|
Thank you for review, @cppwfs ! I left a couple comments unresolved for your consideration: no code change, but question is answered. |
cppwfs
left a comment
There was a problem hiding this comment.
LGTM
Thanks for the great work!
On the last commit you forgot to enter a blank line between the title and the body. 😉
RabbitAmqpListenerContainerAmqpRuntimeHintsfor AOTsetReceivedRoutingKey(message.to())mapping into theProtonUtils.fromProtonMessage()ErrorConditionAmqpMessageListenerContainer