Add @AmqpListener annotation support#3357
Conversation
* Extract `AbstractListenerAnnotationBeanPostProcessor` into the core `spring-amqp` module from the `RabbitListenerAnnotationBeanPostProcessor` for the common infrastructure and API which can be used in other implementations. * Implement `AmqpListenerAnnotationBeanPostProcessor` as an extension of just extracted `AbstractListenerAnnotationBeanPostProcessor` * The `AmqpListenerAnnotationBeanPostProcessor` processes bean methods with `@AmqpListener` * The `@AmqpListeners` is a `@Repeatable` container for `@AmqpListener` * The `MethodAmqpListenerEndpoint` is an `AbstractAmqpListenerEndpoint` extension for POJO method-based listeners. The `@AmqpListener` parsing in the `AmqpListenerAnnotationBeanPostProcessor` creates `MethodAmqpListenerEndpoint` instances for methods they re declared on * The `MethodAmqpMessageListenerContainerFactory` is an extension of the `AmqpMessageListenerContainerFactory` with POJO method-specific listeners * Extract simple `record BytesToStringConverter` into `spring-amqp` module and replace internal type in the `RabbitListenerAnnotationBeanPostProcessor` * Fix nullability for the `RabbitListenerAnnotationBeanPostProcessor` * Fix TODO in the `AmqpListenerEndpoint.getId()` Javadoc: the final id generation decision in done at the moment of bean registration in the `AmqpListenerEndpointRegistry` * Add `AmqpMessageListenerContainerFactory.configureEndpoint()` for target extensions where final endpoint properties are merged with whatever is provided as default values in the container factory * Fix `AmqpMessageListenerContainerFactory.createContainer()` moving `setupMessageListener()` call in the end. Essentially, all the properties must be set into the listener container before we provide a listener over there * Expose `AmqpListenerAnnotationBeanPostProcessor` as a bean in the `AmqpDefaultConfiguration` * Add `io.projectreactor:reactor-core` as test dependency for request-reply test with a `Mono` return type
|
TODO (which could be done in the separate PRs)
I mean there are still more like Thanks |
cppwfs
left a comment
There was a problem hiding this comment.
Looks great!
Just some nitpicks.
| } | ||
| } | ||
|
|
||
| protected @Nullable Integer resolveExpressiontoInteger(String value, String attribute) { |
| methods.add(new ListenerMethod<>(method, listenerAnnotations)); | ||
| } | ||
| if (hasClassLevelListeners) { | ||
| Annotation rabbitHandler = AnnotationUtils.findAnnotation(method, handlerAnnotation); |
There was a problem hiding this comment.
amqpHandler instead of rabbitHandler.
| protected static String noBeanFoundMessage(Object target, String listenerBeanName, String requestedBeanName, | ||
| Class<?> expectedClass) { | ||
|
|
||
| return "Could not register rabbit listener endpoint on [" |
| /** | ||
| * The AMQP 1.0 addresses to listen to. | ||
| * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'. | ||
| * Expression must be resolved to the address. |
There was a problem hiding this comment.
Shouldn't this say that Each expression must be resolved to an address?
| * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'. | ||
| * Expression must be resolved to the address. | ||
| * The addresses must exist. | ||
| * @return the addresses or expressions (SpEL) to listen to from in the target listener container. |
There was a problem hiding this comment.
Change to listen to from in to to listen to in
| /** | ||
| * Override the container factory's {@code headerMapper} used for this listener. | ||
| * @return the header mapper bean name. | ||
| * If a SpEL expression is provided ({@code #{...}}), |
|
|
||
| /** | ||
| * Set the default behavior for messages rejection, for example, when the listener | ||
| * threw an exception. When {@code true} (default), messages will be requeued, otherwise - rejected. |
|
|
||
| private @Nullable BeanResolver beanResolver; | ||
|
|
||
| public MethodAmqpListenerEndpoint(Object bean, Method method, String... addresses) { |
| if (resolved != null && beanType.isAssignableFrom(resolved.getClass())) { | ||
| return (B) resolved; | ||
| } | ||
| else if (resolved instanceof String factoryBeanName && StringUtils.hasText(factoryBeanName)) { |
There was a problem hiding this comment.
Wouldn't we want else if (resolved != null && resolve instance of...?
There was a problem hiding this comment.
The instanceof operator does the trick for null.
So, if resolved == null, then instanceof resolves to false.
| /** | ||
| * A method annotated with {@link A}, together with the annotations. | ||
| * | ||
| * @param method the method with annotations |
* Resolve copy/paste artifacts in the `AbstractListenerAnnotationBeanPostProcessor`: replace `rabbit` with empty string - the `handler` and `listener` are enough for the context * Add Javadoc to `MethodAmqpListenerEndpoint` ctor
AbstractListenerAnnotationBeanPostProcessorinto the corespring-amqpmodule from theRabbitListenerAnnotationBeanPostProcessorfor the common infrastructure and API which can be used in other implementations.AmqpListenerAnnotationBeanPostProcessoras an extension of just extractedAbstractListenerAnnotationBeanPostProcessorAmqpListenerAnnotationBeanPostProcessorprocesses bean methods with@AmqpListener@AmqpListenersis a@Repeatablecontainer for@AmqpListenerMethodAmqpListenerEndpointis anAbstractAmqpListenerEndpointextension for POJO method-based listeners.The
@AmqpListenerparsing in theAmqpListenerAnnotationBeanPostProcessorcreatesMethodAmqpListenerEndpointinstances for methods they re declared onMethodAmqpMessageListenerContainerFactoryis an extension of theAmqpMessageListenerContainerFactorywith POJO method-specific listenersrecord BytesToStringConverterintospring-amqpmodule and replace internal type in theRabbitListenerAnnotationBeanPostProcessorRabbitListenerAnnotationBeanPostProcessorAmqpListenerEndpoint.getId()Javadoc: the final id generation decision in done at the moment of bean registration in theAmqpListenerEndpointRegistryAmqpMessageListenerContainerFactory.configureEndpoint()for target extensions where final endpoint properties are merged with whatever is provided as default values in the container factoryAmqpMessageListenerContainerFactory.createContainer()movingsetupMessageListener()call in the end.Essentially, all the properties must be set into the listener container before we provide a listener over there
AmqpListenerAnnotationBeanPostProcessoras a bean in theAmqpDefaultConfigurationio.projectreactor:reactor-coreas test dependency for request-reply test with aMonoreturn type