Skip to content

Commit 515eb9a

Browse files
garyrussellartembilan
authored andcommitted
GH-1484: RabbitListener.batch() Override
Resolves #1484
1 parent 97a508e commit 515eb9a

7 files changed

Lines changed: 52 additions & 10 deletions

File tree

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListener.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,4 +332,18 @@
332332
*/
333333
String converterWinsContentType() default "true";
334334

335+
/**
336+
* Override the container factory's {@code batchListener} property. The listener
337+
* method signature should receive a {@code List<?>}; refer to the reference
338+
* documentation. This allows a single container factory to be used for both record
339+
* and batch listeners; previously separate container factories were required.
340+
* @return "true" for the annotated method to be a batch listener or "false" for a
341+
* single message listener. If not set, the container factory setting is used. SpEL and
342+
* property place holders are not supported because the listener type cannot be
343+
* variable.
344+
* @since 3.0
345+
* @see Boolean#parseBoolean(String)
346+
*/
347+
String batch() default "";
348+
335349
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint en
461461
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
462462
resolveMessageConverter(endpoint, rabbitListener, target, beanName);
463463
resolveReplyContentType(endpoint, rabbitListener);
464+
if (StringUtils.hasText(rabbitListener.batch())) {
465+
endpoint.setBatchListener(Boolean.parseBoolean(rabbitListener.batch()));
466+
}
464467
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);
465468

466469
this.registrar.registerEndpoint(endpoint, factory);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,9 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
381381
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode)
382382
.acceptIfNotNull(endpoint.getBatchingStrategy(), instance::setBatchingStrategy);
383383
instance.setListenerId(endpoint.getId());
384-
endpoint.setBatchListener(this.batchListener);
384+
if (endpoint.getBatchListener() == null) {
385+
endpoint.setBatchListener(this.batchListener);
386+
}
385387
}
386388
applyCommonOverrides(endpoint, instance);
387389

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEn
8484

8585
private TaskExecutor taskExecutor;
8686

87-
private boolean batchListener;
87+
private Boolean batchListener;
8888

8989
private BatchingStrategy batchingStrategy;
9090

@@ -293,7 +293,21 @@ public void setTaskExecutor(TaskExecutor taskExecutor) {
293293
this.taskExecutor = taskExecutor;
294294
}
295295

296+
/**
297+
* True if this endpoint is for a batch listener.
298+
* @return true if batch.
299+
*/
296300
public boolean isBatchListener() {
301+
return this.batchListener == null ? false : this.batchListener;
302+
}
303+
304+
/**
305+
* True if this endpoint is for a batch listener.
306+
* @return {@link Boolean#TRUE} if batch.
307+
* @since 3.0
308+
*/
309+
@Nullable
310+
public Boolean getBatchListener() {
297311
return this.batchListener;
298312
}
299313

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MethodRabbitListenerEndpoint.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -130,7 +130,7 @@ public void setAdapterProvider(AdapterProvider adapterProvider) {
130130
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
131131
Assert.state(this.messageHandlerMethodFactory != null,
132132
"Could not create message listener - MessageHandlerMethodFactory not set");
133-
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
133+
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(getBatchListener());
134134
messageListener.setHandlerAdapter(configureListenerAdapter(messageListener));
135135
String replyToAddress = getDefaultReplyToAddress();
136136
if (replyToAddress != null) {
@@ -159,11 +159,12 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
159159

160160
/**
161161
* Create an empty {@link MessagingMessageListenerAdapter} instance.
162+
* @param batch whether this endpoint is for a batch listener.
162163
* @return the {@link MessagingMessageListenerAdapter} instance.
163164
*/
164-
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
165-
return this.adapterProvider.getAdapter(isBatchListener(), this.bean, this.method, this.returnExceptions,
166-
this.errorHandler, getBatchingStrategy());
165+
protected MessagingMessageListenerAdapter createMessageListenerInstance(@Nullable Boolean batch) {
166+
return this.adapterProvider.getAdapter(batch == null ? isBatchListener() : batch, this.bean, this.method,
167+
this.returnExceptions, this.errorHandler, getBatchingStrategy());
167168
}
168169

169170
@Nullable

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,14 @@ default TaskExecutor getTaskExecutor() {
116116
default void setBatchListener(boolean batchListener) {
117117
}
118118

119+
/**
120+
* Whether this endpoint is for a batch listener.
121+
* @return {@link Boolean#TRUE} if batch.
122+
* @since 3.0
123+
*/
124+
@Nullable
125+
Boolean getBatchListener();
126+
119127
/**
120128
* Set a {@link BatchingStrategy} to use when debatching messages.
121129
* @param batchingStrategy the batching strategy.

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -143,7 +143,7 @@ public DirectRabbitListenerContainerFactory directListenerContainerFactory() {
143143
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
144144
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
145145
factory.setConnectionFactory(connectionFactory());
146-
factory.setBatchListener(true);
146+
factory.setBatchListener(false);
147147
factory.setConsumerBatchEnabled(true);
148148
factory.setBatchSize(2);
149149
return factory;
@@ -202,7 +202,7 @@ public void listen2(List<Message<Foo>> in) {
202202
this.fooMessagesLatch.countDown();
203203
}
204204

205-
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
205+
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory", batch = "true")
206206
public void listen3(List<Foo> in) {
207207
this.foosConsumerBatchToo = in;
208208
this.fooConsumerBatchTooLatch.countDown();

0 commit comments

Comments
 (0)