Skip to content

Commit 158d5cd

Browse files
authored
GH-1491: Support Optional/null Payloads
Resolves #1491 **cherry-pick to 2.4.x** * Improve connection factory bean in test.
1 parent d96aa71 commit 158d5cd

6 files changed

Lines changed: 271 additions & 6 deletions

File tree

spring-amqp/src/main/java/org/springframework/amqp/support/converter/AbstractJackson2MessageConverter.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.lang.reflect.Type;
2222
import java.nio.charset.Charset;
2323
import java.nio.charset.StandardCharsets;
24+
import java.util.Optional;
2425

2526
import org.apache.commons.logging.Log;
2627
import org.apache.commons.logging.LogFactory;
@@ -96,6 +97,8 @@ public abstract class AbstractJackson2MessageConverter extends AbstractMessageCo
9697

9798
private boolean alwaysConvertToInferredType;
9899

100+
private boolean nullAsOptionalEmpty;
101+
99102
/**
100103
* Construct with the provided {@link ObjectMapper} instance.
101104
* @param objectMapper the {@link ObjectMapper} to use.
@@ -148,6 +151,15 @@ public void setSupportedContentType(MimeType supportedContentType) {
148151
this.supportedCTCharset = this.supportedContentType.getParameter("charset");
149152
}
150153

154+
/**
155+
* When true, if jackson decodes the body as {@code null} convert to {@link Optional#empty()}
156+
* instead of returning the original body. Default false.
157+
* @param nullAsOptionalEmpty true to return empty.
158+
* @since 2.4.7
159+
*/
160+
public void setNullAsOptionalEmpty(boolean nullAsOptionalEmpty) {
161+
this.nullAsOptionalEmpty = nullAsOptionalEmpty;
162+
}
151163

152164
@Nullable
153165
public ClassMapper getClassMapper() {
@@ -316,7 +328,12 @@ public Object fromMessage(Message message, @Nullable Object conversionHint) thro
316328
}
317329
}
318330
if (content == null) {
319-
content = message.getBody();
331+
if (this.nullAsOptionalEmpty) {
332+
content = Optional.empty();
333+
}
334+
else {
335+
content = message.getBody();
336+
}
320337
}
321338
return content;
322339
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.AnnotatedElement;
2020
import java.lang.reflect.Method;
21+
import java.lang.reflect.Type;
2122
import java.nio.charset.Charset;
2223
import java.nio.charset.StandardCharsets;
2324
import java.util.ArrayList;
@@ -28,6 +29,7 @@
2829
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.Optional;
3133
import java.util.Set;
3234
import java.util.concurrent.ConcurrentHashMap;
3335
import java.util.concurrent.ConcurrentMap;
@@ -73,6 +75,7 @@
7375
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
7476
import org.springframework.context.EnvironmentAware;
7577
import org.springframework.context.expression.StandardBeanExpressionResolver;
78+
import org.springframework.core.MethodParameter;
7679
import org.springframework.core.Ordered;
7780
import org.springframework.core.annotation.AnnotationUtils;
7881
import org.springframework.core.annotation.MergedAnnotations;
@@ -82,16 +85,22 @@
8285
import org.springframework.core.convert.support.DefaultConversionService;
8386
import org.springframework.core.env.Environment;
8487
import org.springframework.core.task.TaskExecutor;
88+
import org.springframework.format.support.DefaultFormattingConversionService;
8589
import org.springframework.lang.Nullable;
90+
import org.springframework.messaging.Message;
91+
import org.springframework.messaging.converter.GenericMessageConverter;
8692
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
8793
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
94+
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
95+
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
8896
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
8997
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
9098
import org.springframework.util.Assert;
9199
import org.springframework.util.ClassUtils;
92100
import org.springframework.util.CollectionUtils;
93101
import org.springframework.util.ReflectionUtils;
94102
import org.springframework.util.StringUtils;
103+
import org.springframework.validation.ObjectError;
95104
import org.springframework.validation.Validator;
96105

97106
/**
@@ -980,6 +989,9 @@ private String resolve(String value) {
980989
*/
981990
private class RabbitHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
982991

992+
private final DefaultFormattingConversionService defaultFormattingConversionService =
993+
new DefaultFormattingConversionService();
994+
983995
private MessageHandlerMethodFactory factory;
984996

985997
RabbitHandlerMethodFactoryAdapter() {
@@ -1008,20 +1020,70 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
10081020
defaultFactory.setValidator(validator);
10091021
}
10101022
defaultFactory.setBeanFactory(RabbitListenerAnnotationBeanPostProcessor.this.beanFactory);
1011-
DefaultConversionService conversionService = new DefaultConversionService();
1012-
conversionService.addConverter(
1023+
this.defaultFormattingConversionService.addConverter(
10131024
new BytesToStringConverter(RabbitListenerAnnotationBeanPostProcessor.this.charset));
1014-
defaultFactory.setConversionService(conversionService);
1025+
defaultFactory.setConversionService(this.defaultFormattingConversionService);
10151026

1016-
List<HandlerMethodArgumentResolver> customArgumentsResolver =
1017-
new ArrayList<>(RabbitListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
1027+
List<HandlerMethodArgumentResolver> customArgumentsResolver = new ArrayList<>(
1028+
RabbitListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
10181029
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);
1030+
GenericMessageConverter messageConverter = new GenericMessageConverter(
1031+
this.defaultFormattingConversionService);
1032+
defaultFactory.setMessageConverter(messageConverter);
1033+
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
1034+
customArgumentsResolver.add(new OptionalEmptyAwarePayloadArgumentResolver(messageConverter, validator));
10191035
defaultFactory.afterPropertiesSet();
10201036
return defaultFactory;
10211037
}
10221038

10231039
}
10241040

1041+
private static class OptionalEmptyAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
1042+
1043+
OptionalEmptyAwarePayloadArgumentResolver(
1044+
org.springframework.messaging.converter.MessageConverter messageConverter,
1045+
@Nullable Validator validator) {
1046+
1047+
super(messageConverter, validator);
1048+
}
1049+
1050+
@Override
1051+
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
1052+
Object resolved = null;
1053+
try {
1054+
resolved = super.resolveArgument(parameter, message);
1055+
}
1056+
catch (MethodArgumentNotValidException ex) {
1057+
if (message.getPayload().equals(Optional.empty())) {
1058+
Type type = parameter.getGenericParameterType();
1059+
List<ObjectError> allErrors = ex.getBindingResult().getAllErrors();
1060+
if (allErrors.size() == 1
1061+
&& allErrors.get(0).getDefaultMessage().equals("Payload value must not be empty")) {
1062+
return Optional.empty();
1063+
}
1064+
}
1065+
throw ex;
1066+
}
1067+
/*
1068+
* Replace Optional.empty() list elements with null.
1069+
*/
1070+
if (resolved instanceof List) {
1071+
List<?> list = ((List<?>) resolved);
1072+
for (int i = 0; i < list.size(); i++) {
1073+
if (list.get(i).equals(Optional.empty())) {
1074+
list.set(i, null);
1075+
}
1076+
}
1077+
}
1078+
return resolved;
1079+
}
1080+
1081+
@Override
1082+
protected boolean isEmptyPayload(Object payload) {
1083+
return payload == null || payload.equals(Optional.empty());
1084+
}
1085+
1086+
}
10251087
/**
10261088
* The metadata holder of the class with {@link RabbitListener}
10271089
* and {@link RabbitHandler} annotations.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.reflect.WildcardType;
2323
import java.util.Collection;
2424
import java.util.List;
25+
import java.util.Optional;
2526

2627
import org.springframework.amqp.core.MessageProperties;
2728
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
@@ -412,7 +413,15 @@ private Type determineInferredType() { // NOSONAR - complexity
412413
}
413414
}
414415
}
416+
return checkOptional(genericParameterType);
417+
}
418+
419+
protected Type checkOptional(Type genericParameterType) {
420+
if (genericParameterType instanceof ParameterizedType
421+
&& ((ParameterizedType) genericParameterType).getRawType().equals(Optional.class)) {
415422

423+
return ((ParameterizedType) genericParameterType).getActualTypeArguments()[0];
424+
}
416425
return genericParameterType;
417426
}
418427

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.annotation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Optional;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.amqp.AmqpException;
30+
import org.springframework.amqp.core.MessageBuilder;
31+
import org.springframework.amqp.core.MessagePropertiesBuilder;
32+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
33+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
34+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
35+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
36+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
37+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
38+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.messaging.handler.annotation.Payload;
43+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
44+
45+
import com.fasterxml.jackson.core.JsonProcessingException;
46+
import com.fasterxml.jackson.databind.ObjectMapper;
47+
48+
/**
49+
* @author Gary Russell
50+
* @since 2.8
51+
*
52+
*/
53+
@SpringJUnitConfig
54+
@RabbitAvailable(queues = { "op.1", "op.2" })
55+
public class OptionalPayloadTests {
56+
57+
@Test
58+
void optionals(@Autowired RabbitTemplate template, @Autowired Listener listener)
59+
throws JsonProcessingException, AmqpException, InterruptedException {
60+
61+
ObjectMapper objectMapper = new ObjectMapper();
62+
template.send("op.1", MessageBuilder.withBody(objectMapper.writeValueAsBytes("foo"))
63+
.andProperties(MessagePropertiesBuilder.newInstance()
64+
.setContentType("application/json")
65+
.build())
66+
.build());
67+
template.send("op.1", MessageBuilder.withBody(objectMapper.writeValueAsBytes(null))
68+
.andProperties(MessagePropertiesBuilder.newInstance()
69+
.setContentType("application/json")
70+
.build())
71+
.build());
72+
template.send("op.2", MessageBuilder.withBody(objectMapper.writeValueAsBytes("bar"))
73+
.andProperties(MessagePropertiesBuilder.newInstance()
74+
.setContentType("application/json")
75+
.build())
76+
.build());
77+
template.send("op.2", MessageBuilder.withBody(objectMapper.writeValueAsBytes(null))
78+
.andProperties(MessagePropertiesBuilder.newInstance()
79+
.setContentType("application/json")
80+
.build())
81+
.build());
82+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
83+
assertThat(listener.deOptionaled).containsExactlyInAnyOrder("foo", null, "bar", "baz");
84+
}
85+
86+
@Configuration
87+
@EnableRabbit
88+
public static class Config {
89+
90+
@Bean
91+
RabbitTemplate template() {
92+
return new RabbitTemplate(rabbitConnectionFactory());
93+
}
94+
95+
@Bean
96+
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
97+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
98+
factory.setConnectionFactory(rabbitConnectionFactory());
99+
factory.setMessageConverter(converter());
100+
return factory;
101+
}
102+
103+
@Bean
104+
ConnectionFactory rabbitConnectionFactory() {
105+
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
106+
}
107+
108+
@Bean
109+
Jackson2JsonMessageConverter converter() {
110+
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
111+
converter.setNullAsOptionalEmpty(true);
112+
return converter;
113+
}
114+
115+
@Bean
116+
Listener listener() {
117+
return new Listener();
118+
}
119+
120+
}
121+
122+
static class Listener {
123+
124+
final CountDownLatch latch = new CountDownLatch(4);
125+
126+
List<String> deOptionaled = new ArrayList<>();
127+
128+
@RabbitListener(queues = "op.1")
129+
void listen(@Payload(required = false) String payload) {
130+
this.deOptionaled.add(payload);
131+
this.latch.countDown();
132+
}
133+
134+
@RabbitListener(queues = "op.2")
135+
void listen(Optional<String> optional) {
136+
this.deOptionaled.add(optional.orElse("baz"));
137+
this.latch.countDown();
138+
}
139+
140+
}
141+
142+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4081,6 +4081,38 @@ converter to determine the type.
40814081
IMPORTANT: Starting with version 1.6.11, `Jackson2JsonMessageConverter` and, therefore, `DefaultJackson2JavaTypeMapper` (`DefaultClassMapper`) provide the `trustedPackages` option to overcome https://pivotal.io/security/cve-2017-4995[Serialization Gadgets] vulnerability.
40824082
By default and for backward compatibility, the `Jackson2JsonMessageConverter` trusts all packages -- that is, it uses `*` for the option.
40834083

4084+
Starting with version 2.4.7, the converter can be configured to return `Optional.empty()` if Jackson returns `null` after deserializing the message body.
4085+
This facilitates `@RabbitListener` s to receive null payloads, in two ways:
4086+
4087+
====
4088+
[source, java]
4089+
----
4090+
@RabbitListener(queues = "op.1")
4091+
void listen(@Payload(required = false) Thing payload) {
4092+
handleOptional(payload); // payload might be null
4093+
}
4094+
4095+
@RabbitListener(queues = "op.2")
4096+
void listen(Optional<Thing> optional) {
4097+
handleOptional(optional.orElse(this.emptyThing));
4098+
}
4099+
----
4100+
====
4101+
4102+
To enable this feature, set `setNullAsOptionalEmpty` to `true`; when `false` (default), the converter falls back to the raw message body (`byte[]`).
4103+
4104+
====
4105+
[source, java]
4106+
----
4107+
@Bean
4108+
Jackson2JsonMessageConverter converter() {
4109+
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
4110+
converter.setNullAsOptionalEmpty(true);
4111+
return converter;
4112+
}
4113+
----
4114+
====
4115+
40844116
[[jackson-abstract]]
40854117
====== Deserializing Abstract Classes
40864118

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ Batch listeners can now consume `Collection<?>` as well as `List<?>`.
2828
The batch messaging adapter now ensures that the method is suitable for consuming batches.
2929
When setting the container factory `consumerBatchEnabled` to `true`, the `batchListener` property is also set to `true`.
3030
See <<receiving-batch>> for more infoprmation.
31+
32+
`MessageConverter` s can now return `Optional.empty()` for a null value; this is currently implemented by the `Jackson2JsonMessageConverter`.
33+
See <<Jackson2JsonMessageConverter-from-message>> for more information.

0 commit comments

Comments
 (0)