广告
返回顶部
首页 > 资讯 > 后端开发 > Python >RabbitMQ消费端ACKNACK及重回队列机制详解
  • 522
分享到

RabbitMQ消费端ACKNACK及重回队列机制详解

RabbitMQ消费端ACKNACKRabbitMQ消费端重回队列 2022-12-26 12:12:05 522人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录ACK和NACK使用方式1、basicNack2、basicAck3、basicReject4、basicRecover消费端的重回队列实际运用ACK和NACK 当我们使用Ra

ACK和NACK

当我们使用RabbitMQ时用于网络异常,业务处理异常或者业务错误导致消息无法立即消费时,在这种情况下,传输中的信息将无法正常投递——它们需要被重新投递。Acknowledgements机制让服务器和客户端知道何时需要重新投递。

当设置autoACK=false 时,就可以使用手工ACK。 其实手工方式包括了手工ACK、手工NACK。

  • 手工 ACK 时,会发送给Broker一个应答,代表消息处理成功,Broker就可回送响应给Pro;
  • NACK 则表示消息处理失败,如果设置了重回队列,Broker端就会将没有成功处理的消息重新发送。

使用方式

1、basicNack

Consumer消费时,若由于业务异常,可手工 NACK 记录日志,然后进行补偿: basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息第三个参数是否requeue,如果requeue 参数设置为true ,则RabbitMQ重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

multiple 参数设置为false 则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack 和basicReject 方法一样;multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。

  • 参数1: 消息
  • 参数2: 是否应用于多消息
  • 参数3: 是否重新放回队列,否则丢弃或者进入死信队列
void basicNack(long deliveryTag, boolean multiple, boolean requeue)

2、basicAck

如果由于服务器宕机等严重问题,就需要手工 ACK 保障Con消费成功:deliveryTag : 每次接收消息+1,可以做此消息处理通道的名字。

void basicAck(long deliveryTag, boolean multiple)

3、basicReject

basic.Reject拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。该方法reject后,该消费者还是会消费到该条被reject的消息。reject一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令。

  • 参数1: 消息
  • 参数2: 是否重新放回队列,否则丢弃或者进入死信队列
void basicReject(long deliveryTag, boolean requeue);

4、basicRecover

basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。

channel.basicRecover(true);

消费端的重回队列

重回队列针对没有处理成功的消息,将消息重新投递给Broker。
重回队列会把消费失败的消息重新添加到队列尾端,供Consumer重新消费。
一般在实际应用中,都会关闭重回队列,即设置为false。

在确认ack后再把消息通过basicPublish到队列尾部,防止正常消息堆积。

channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLaiN,JSON.tojsONBytes(new Object()));

实际运用

@RabbitListener(queues = "${rabbitmq.one-queue}",
        containerFactory = "rabbitListenerContainerFactory")
@RabbitHandler
public void dORMOnduty(Message message, Channel channel) {
    try {
        String body = new String(message.getBody());
        //log.info("消息处理:{}",body);
    } catch (Exception e) {
        //消费异常,设置重回队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    } finally {
        //最终确认消息消费成功
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

ACK机制可以保证Con拉取到了消息,若处理失败了,则队列中还有这个消息,仍然可以给Con处理。ack机制是 Con 告诉 Broker 当前消息是否成功消费,至于 Broker 如何处理 NACK,取决于 Con 是否设置了 requeue:如果 requeue=false, 则NACK 后 Broker 还是会删除消息的。

但一般处理消息失败都是因为代码逻辑出bug,即使队列中后来仍然保留该消息,然后再给Con消费,依旧报错。 当然,若一台机器宕机,消息还有,还可以给另外机器消费,这种情景下 ACK 很有用。

如果不使用 ACK 机制,直接把出错消息存库,便于日后查bug或重新执行。

以上就是RabbitMQ消费端ACK NACK及重回队列机制详解的详细内容,更多关于RabbitMQ消费端ACK NACK的资料请关注编程网其它相关文章!

--结束END--

本文标题: RabbitMQ消费端ACKNACK及重回队列机制详解

本文链接: https://www.lsjlt.com/news/175856.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作