✨ RabbitMQ:死信队列 1.死信队列1.1死信队列基本介绍1.2消息成为死信的三种情况1.3死信队列结构图1.4死信的处理方式 2.TTL消息过期时间2.1基本介绍2.2生产者2.3消费者12.4消费者22.
✨ RabbitMQ:死信队列
📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

通常情况下,消费者是能正常消费消息的,但是出现上面说的三种情况之一,就无法正常消费信息,消息就会进入死信交换机,死信交换机会和死信队列进行绑定,最后由其他消费者来消费死信消息。

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
③ 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和x-dead-letter-routing-key

当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以对消息或者队列设置过期时间,队列中的消息过期是成为死信队列的三种原因之一。

public class Producer { //正常交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //正常队列 public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) { try { Channel channel = ConnectUtil.getChannel(); //声明交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //声明队列 //channel.queueDeclare(NORMAL_QUEUE, true, false, false, null); //把正常交换机和正常队列进行绑定 //channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "tom"); //设置过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); //发送消息 for (int i = 0; i < 10; i++) { String message = "消息:" + i; //发送消息 channel.basicPublish(NORMAL_EXCHANGE, "tom", null, message.getBytes()); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }} public class Consumer1 { //定义交换机(正常交换机,死信交换机) public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_EXCHANGE = "dead_exchange"; //定义队列(正常队列,死信队列) public static final String NORMAL_QUEUE = "normal_queue"; public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) { try { //创建信道对象 Channel channel = ConnectUtil.getChannel(); //声明交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //设置正常队列和死信队列进行绑定,key固定不可以改变 Map map = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "jack"); //声明正常队列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); //正常交换机绑定正常队列 channel.queueBind(NORMAL_QUEUE,NORMAL_QUEUE,"tom"); //声明死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //死信交换机绑定死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"jack"); //消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8")); } }; //监听消息(队列名称,是否自动确认消息,消费对象) channel.basicConsume(NORMAL_QUEUE, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }} public class Consumer2 { //定义交换机(死信交换机) public static final String DEAD_EXCHANGE = "dead_exchange"; //定义队列(死信队列) public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) { try { //创建信道对象 Channel channel = ConnectUtil.getChannel(); //声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //死信交换机绑定死信队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "jack"); //消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8")); } }; //监听消息(队列名称,是否自动确认消息,消费对象) channel.basicConsume(DEAD_QUEUE, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }} 在创建队列的时候设置队列的x-message-ttl属性,例如:
Map map = new HashMap<>();//设置队列有效期为10秒map.put("x-message-ttl",10000);channel.queueDeclare(queueName,durable,exclusive,autoDelete,map); 对每条消息设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes()); 如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃。
如果是消息设置了TTL属性,那么即使消息过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,那么已经过期的消息也许还能存活较长时间。如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃。
来源地址:https://blog.csdn.net/qq_52797170/article/details/127282842
--结束END--
本文标题: RabbitMQ:死信队列
本文链接: https://www.lsjlt.com/news/372179.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0