Python 官方文档:入门教程 => 点击学习
目录前言导致消息出现丢失的原因环境准备工作使用confirm机制模拟场景实现RabbitTemplate.ConfirmCallback接口发送端代码实现效果使用return机制模拟
之前看很多网上大佬的防丢失的文章,文章中理论知识偏多,所以自己想着实践一下,实践过程中也踩了一些坑,因此写出了这篇文章。如果文章有误人子弟的地方,望在评论区指出。
下面我们以这三种情况进行实践。
jdk1.8
Spring Boot 2.3.7.RELEASE
spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7
我事先准备了好了交换机以及队列:
message.log.test.exchange
和message.log.test2.exchange
message.loss.test.queue
其中message.loss.test.queue
和message.log.test.exchange
是绑定关系,而message.log.test2.exchange
没有绑定队列
1.发送时失败
发送时失败,rabbitmq有两种情况是属于发送时失败。
第一种的解决方式是使用confirm机制。第二种解决方式则是使用return机制。
confirm机制是当发送端的消息没有到达rabbitmq的交换机(exchange)时,会触发confirm方法,告诉发送端该消息没有到达rabbitmq,需要做业务处理。
这里我们发送消息到rabbitmq不存在的交换机上,就可以模拟上述场景。
@Component
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//第一个坑,如果发送端发送消息时没有对correlationData进行处理,conirm方法接收到的对象都会是null
//当接收失败并且correlationData对象为null,证明目前已经无法追溯回业务,可以做业务日志处理
if(!ack&&correlationData==null){
System.out.println(cause);
//日志处理。。。
return;
}
//如果接收失败
if(!ack){
System.out.println("消息Id:"+correlationData.getId());
Message message=correlationData.getReturnedMessage();
System.out.println("消息体:"+new String(message.getBody()));
//这里可以持久化业务消息体到数据库,然后定时去进行补偿处理或者重试等等
return;
}
//处理完成
}
}
@PostMapping("push")
public boolean push(){
TestMessage testMessage=new TestMessage();
testMessage.setName("mq名称");
testMessage.setBusinessId("业务Id");
//定义CorrelationData对象以及消息属性。不然comfirm方法无论失败还是成功,CorrelationData参数永远是null
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//传递业务数据
correlationData.setReturnedMessage(new Message(JSONObject.tojsON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties()));
//发送消息(这里发送给了message.log.test.exchange11交换机,但实际rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData);
return true;
}
这里是我踩的第一个坑,如果发送端不定义correlationData,那么confirm接收到的correlationData对象参数 都会是null
当消息到达了rabbitmq的交换机的时候,但是又没有到达队列,那么就会触发return方法。
下面我们定义一个没有绑定队列的交换机,然后发送消息到交换机,就可以模拟上述场景
@Component
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息标识:" + message.getMessageProperties().getDeliveryTag());
String messageBody = null;
try {
messageBody = new String(message.getBody(), "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("消息:" + messageBody);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
}
@PostMapping("push2")
public boolean push2(){
TestMessage testMessage=new TestMessage();
testMessage.setName("mq名称2");
testMessage.setBusinessId("业务Id");
template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString());
return true;
}
这里需注意消息体需要JSON序列化,不然returnedMessage方法接收的消息body会是乱码
这个开启rabbitmq的持久化机制就好了,开启之后消息到达rabbitmq服务,会实时转入磁盘。这里怎么设置就不多说了,网上挺多文章可以解答。
不过即使开启了还是会有一种情况会造成消息丢失,那就是消息即将要持久化到磁盘的那一刻,服务挂了,就会造成丢失,不过这种情况我也不知道怎么模拟,所以就暂不实践了。
上面提到默认情况下rabbitmq使用的是自动ack的方式,我们将它改成手动ack的方式,就可以解决这个问题。
rabbitmq:
listener:
simple:
#开启手动确认
acknowledge-mode: manual
#开启失败后的重试机制
retry:
enabled: true
#最多重试3次
max-attempts: 3
下面我们试一下几种消费端消费不成功的场景
@Component
public class TestConsumer {
@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
System.out.println("消费testmessage消息:"+testmessage.getName());
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
效果流程:
总而言之,如果消费端没有做手动确认的操作,那么在消费端还没关闭之前,消息会变成Unacked,不会再次被消费,但一旦消费端关闭了,消息会重新回到队列,让消费端消费。
@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
System.out.println("消费testmessage消息:"+testmessage.getName());
//故意触发异常
if(!StringUtils.isEmpty(testmessage.getName())){
throw new RuntimeException("11211");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
上面的效果图显示,我在触发了异常之后,消息重试了三次,也就是我在application.yml 配置的重试三次
如果我去掉重试机制会是什么效果。
效果和忘记做ack操作的效果一样,消息没有ack后,消息会变成Unacked状态,消费端关闭后消息会重新回到队列,然后重新链接的时候,就会再消费一次。
到此这篇关于Spring boot Rabbitmq消息防丢失实践的文章就介绍到这了,更多相关Spring boot Rabbitmq 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: SpringbootRabbitmq消息防丢失实践
本文链接: https://www.lsjlt.com/news/168368.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0