Skip to content

Error handler for async RabbitListeners (Mono, Future, suspend) not reacting AmqpRejectAndDontRequeueException #3163

@jensbaitingerbosch

Description

@jensbaitingerbosch

In what version(s) of Spring AMQP are you seeing this issue?

For example:

3.2.6

Describe the bug

For sync Message handlers, the error handler (RabbitListenerErrorHandler) can throw an AmqpRejectAndDontRequeueException, the message is then deadlettered (asuming the in-queue has a deadletter-exchange configured).

For an async listeners (methods annotated with @RabbitListener end returning Mono or Future or being a Kotlin suspend functions) that is not the case. When the Error Hander throws any exception (including an AmqpRejectAndDontRequeueException), the message will be nacked with requeue=true resulting an an endless loop. When Error handler returns a value, the messase is not acked either.

The logged error message is:

16:47:50.649 [DefaultDispatcher-worker-2 @coroutine#3288] ERROR o.s.a.r.l.a.MessagingMessageListenerAdapter - Future, Mono, or suspend function was completed with an exception for (Body:'{"this":"is not a valid message"}' MessageProperties [headers={x-delivery-count=3135, traceparent=00-68a5e0018bfa76845f473c7ef2501588-192f6d5acaefac54-00}, type=someEvent, correlationId=123-abc-456, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=..., deliveryTag=63, consumerTag=amq.ctag-FVilYqFsNGCOikPuqNJU6w, consumerQueue=...])

To Reproduce
have a Listener

@Component
class SomeListener {
   @RabbitListener(..., errorHandler = "errorHandler")
   fun doSomething(message: Message) = Mono.error (
      throw IllegalStateException("Oops, I cannot process any message")
   )
}
   
@Component
class ErrorHandler: RabbitListenerErrorHandler {
    override fun handleError(
            amqpMessage: Message,
            channel: Channel,
            message: org.springframework.messaging.Message<*>?,
            exception: ListenerExecutionFailedException,
        ): Unit? {
       // just deadletter everything
       throw AmqpRejectAndDontRequeueException(...)
    }

Expected behavior

The message is deadlettered.

** Workaround **

Let the RabbitListenerErrorHandler for these async rabbit listeners ack/nack the message directly insted of returning null or throwing an exception. NOTE: you cannot reuse that errorhandler for sync rabbit Listeners then.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions