how to make common dead letter exchange to all my queues


how to make common dead letter exchange to all my queues



I have a common library that handles all the configuration for the rabbitMQ and I'm aiming to achieve a common strategy to handle errors messages after certain attempts and what is in my mind have an exchange that receives all dead letter messages and will differentiate each message by having a header that has an originated queue name. this all after a number of attempts and retries.



I have no idea how to implement this approach, your help to achieve this!



here one of my lisnteners


@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${swift.queue.driverOnlineStatus}", durable = "true"),
exchange = @Exchange(value = "${swift.queue.driverExchange}",
type = ExchangeTypes.HEADERS,
ignoreDeclarationExceptions = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "eventName", value = "${swift.queue.driverOnlineStatus}"),
},
key = ""))



and here the configuration


@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(value = {ConnectionFactory.class})
public SimpleMessageListenerContainer simpleMessageListenerContainerForAsyncRabbit(ConnectionFactory connectionFactoryfactory,
@Autowired(required = false) ErrorHandler errorHandler,
RabbitTemplate rabbitTemplate) {
log.debug("creating new SimpleMessageListenerContainer ... for async rabbit: {}", rabbitProperties);
SimpleMessageListenerContainer containerF = new SimpleMessageListenerContainer(connectionFactoryfactory);
containerF.setConnectionFactory(connectionFactoryfactory);
if (errorHandler != null) {
containerF.setErrorHandler(errorHandler);
}
//listener retry
Interceptor retryListenerInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
.recoverer(new RejectAndDontRequeueRecoverer())
.build();

/*
Note on the two below interceptors
use a non-transactional template for the DLQ
When retries reach the maximum number that massage is dead-lettered
*/

//listener try to republish to error exchange
/*Interceptor retryListenerErrorInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
.recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
.build();
*/

//publisher retry
Interceptor retryPublisherInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getTemplate().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getTemplate().getRetry().getInitialInterval(),rabbitProperties.getTemplate().getRetry().getMultiplier(),rabbitProperties.getTemplate().getRetry().getMaxInterval())
.recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
.build();

containerF.setAdviceChain(retryListenerInterceptor,retryPublisherInterceptor);
return containerF;
}



I'm not sure if we need to define the SimpleMessageListenerContainer I think we should rely on the @EnableRabbit


SimpleMessageListenerContainer


@EnableRabbit



my understanding to your approach, we should create a bean for retryListenerInterceptor that have RepublishMessageRecoverer and that will do the jobs


retryListenerInterceptor


RepublishMessageRecoverer




2 Answers
2



The RepublishMessageRecoverer does exactly what you want. When delivery retries are exhausted, it sends the message to a DLX with the following additional headers:


RepublishMessageRecoverer


public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";



You can either route all the messages to the same queue or bind a DLQ with the original routing key(s).



See the documentation.



The RepublishMessageRecoverer publishes the message with additional information in message headers, such as the exception message, stack trace, original exchange and routing key. Additional headers can be added by creating a subclass and overriding additionalHeaders(). The deliveryMode (or any other properties) can be changed in the additionalHeaders() method, too:


RepublishMessageRecoverer


additionalHeaders()


additionalHeaders()





Could u please add example to configure it with retry strategy
– Moayad Abu Jaber
Jun 30 at 20:01





And can make excption for certain queue to not havd this ?
– Moayad Abu Jaber
Jun 30 at 20:01





It depends on how you are using the framework; @RabbitListener, Vs. @Bean Vs. XML configuration - you need to show your listener configuration (edit the question and comment here - don't add code/config to a comment). The retry configuration is at the container level so, if you have one container consuming from multiple queues, they all get the same behavior; if you have multiple containers each can get its own configuration.
– Gary Russell
Jul 1 at 3:08


@RabbitListener


@Bean





I edit the question with how I bind the queue and listen to it and configuration for the republisher
– Moayad Abu Jaber
Jul 1 at 10:10





I reach a good stage for this but I have some issue this will be applicable to all the simple container listeners, which will impact some other listener doesn't need to be included with this logic. I notice there is a group in RabbitListener annotation I think we can use it somehow, but how can differentiate this in the configuration file (application.properties)
– Moayad Abu Jaber
Jul 1 at 12:49



I found that spring-boot has out of the box support of retry for the simple container by adding the bellow properties:


spring.rabbitmq.listener.simple.retry.enabled=true



this will enable retry if you didn't set the other properties related to the retry will take the default. Also, you should make sure that you disable the requeuing feature


spring.rabbitmq.listener.simple.default-requeue-rejected=false



now you should define RepublishMessagRecoverer by adding the below code


RepublishMessagRecoverer


@BeanmatchIfMissing = true)
public RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange");
}

@Bean
public RetryOperationsInterceptor rabbitRetryInterceptor(RabbitTemplate rabbitTemplate) {
log.debug("creating retry operation for rabbitmq ");
return RetryInterceptorBuilder.stateless()
.recoverer(republishMessageRecoverer(rabbitTemplate))
.build();
}



now all default container will have this configuration and if the retry exhausted they message will be sent to the exchange defined in the retryInterceptor with these headers


retryInterceptor


public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";



Extra note:
if you want to have different behavior for other queues you can define another listenerContainer and assign it to the listener


listenerContainer



**Example: **


@Bean
RabbitListenerContainerFactory dispatchConnection(ConnectionFactory connectionFactory){
System.out.println("creating the container");
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(true);

return factory;
}



and assign the container to @RabbitListener like the below example


@RabbitListener


@RabbitListener(containerFactory ="beanListenerContainerName", queues = "queueName")



I would also thanks Gary Russell for his inspiration by his answer.






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

PySpark - SparkContext: Error initializing SparkContext File does not exist

django NoReverseMatch Exception

List of Kim Possible characters