本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!环境windows1
本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
windows10,idea,otp_win64_25.0,rabbitMQ-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问Http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin
pom.xml
<!-- RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.0</version> </dependency>
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin passWord: admin virtual-host: admin_host publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual retry: enabled: true #开启失败重试 max-attempts: 3 #最大重试次数 initial-interval: 1000 #重试间隔时间 毫秒
RabbitConfig
package com.example.rabitmqdemo.mydemo.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;@Slf4j@Componentpublic class RabbitConfig { //业务交换机 public static final String EXCHANGE_PHCP = "phcp"; //业务队列1 public static final String QUEUE_COMPANY = "company"; //业务队列1的key public static final String ROUTINGKEY_COMPANY = "companyKey"; //业务队列2 public static final String QUEUE_PROJECT = "project"; //业务队列2的key public static final String ROUTINGKEY_PROJECT = "projecTKEy"; //死信交换机 public static final String EXCHANGE_PHCP_DEAD = "phcp_dead"; //死信队列1 public static final String QUEUE_COMPANY_DEAD = "company_dead"; //死信队列2 public static final String QUEUE_PROJECT_DEAD = "project_dead"; //死信队列1的key public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead"; //死信队列2的key public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";// / * 解决重复确认报错问题,如果没有报错的话,就不用启用这个// *// * @param connectionFactory// * @return// */// @Bean// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// factory.setConnectionFactory(connectionFactory);// factory.setMessageConverter(new Jackson2JSONMessageConverter());// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// return factory;// } @Bean("exchangePhcp") public DirectExchange exchangePhcp() { return new DirectExchange(EXCHANGE_PHCP); } * 声明死信交换机 @Bean("exchangePhcpDead") public DirectExchange exchangePhcpDead() { return new DirectExchange(EXCHANGE_PHCP_DEAD); * 声明业务队列1 * * @return @Bean("queueCompany") public Queue queueCompany() { Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD); //绑定该队列到死信交换机的队列1 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD); return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build(); * 声明业务队列2 @Bean("queueProject") public Queue queueProject() { //绑定该队列到死信交换机的队列2 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD); return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build(); * 声明死信队列1 @Bean("queueCompanyDead") public Queue queueCompanyDead() { return new Queue(QUEUE_COMPANY_DEAD); * 声明死信队列2 @Bean("queueProjectDead") public Queue queueProjectDead() { return new Queue(QUEUE_PROJECT_DEAD); * 绑定业务队列1和业务交换机 * @param queue * @param directExchange @Bean public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY); * 绑定业务队列2和业务交换机 public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT); * 绑定死信队列1和死信交换机 public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD); * 绑定死信队列2和死信交换机 public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);}
生产者
RabbltProducer
package com.example.rabitmqdemo.mydemo.producer;import com.example.rabitmqdemo.mydemo.config.RabbitConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.NIO.charset.StandardCharsets;import java.util.UUID;@Component@Slf4jpublic class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setMandatory(true); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息发送成功" + correlationData); } else { System.out.println("消息发送失败:" + cause); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { String str = new String(returnedMessage.getMessage().getBody()); System.out.println("消息发送失败:" + str); } public void sendCompany(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData); } public void sendProject(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData); }}
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component@Slf4jpublic class RabbitConsumer { @RabbitListener(queues = "company") public void company(Message message, Channel channel) throws IOException { try{ System.out.println("次数" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("处理消息"+s); //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机 //String str = null; //str.split("1"); //处理成功,确认应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("处理消息时发生异常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("异常重试次数已到达设置次数,将发送到死信交换机"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即将返回队列处理重试"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } @RabbitListener(queues = "project") public void project(Message message, Channel channel) throws IOException { try{ System.out.println("次数" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("处理消息"+s); //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机 //String str = null; //str.split("1"); //处理成功,确认应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("处理消息时发生异常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("异常重试次数已到达设置次数,将发送到死信交换机"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即将返回队列处理重试"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component@Slf4jpublic class RabbitConsumerDead { @RabbitListener(queues = "company_dead") public void company_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("处理死信"+s); //在此处记录到数据库、报警之类的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收异常:"+e.getMessage()); } } @RabbitListener(queues = "project_dead") public void project_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("处理死信"+s); //在此处记录到数据库、报警之类的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收异常:"+e.getMessage()); } }}
MqController
package com.example.rabitmqdemo.mydemo.controller;import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;import lombok.extern.slf4j.Slf4j;import org.springframework.WEB.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RequestMapping("/def")@RestController@Slf4jpublic class MsGController { @Resource private RabbltProducer rabbltProducer; @RequestMapping("/handleCompany") public void handleCompany(@RequestBody String jsonStr){ rabbltProducer.sendCompany(jsonStr); }}
“SpringBoot如何整合RabbitMQ实现死信交换机”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!
--结束END--
本文标题: SpringBoot如何整合RabbitMQ实现死信交换机
本文链接: https://www.lsjlt.com/news/340726.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
2024-05-16
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0