iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >SpringBoot如何整合RabbitMQ实现死信交换机
  • 377
分享到

SpringBoot如何整合RabbitMQ实现死信交换机

2023-07-02 08:07:03 377人浏览 独家记忆
摘要

本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!环境windows1

本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

环境

windows10,idea,otp_win64_25.0,rabbitMQ-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问Http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

        <!-- RabbitMQ -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>            <version>2.7.0</version>        </dependency>

配置

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    passWord: admin    virtual-host: admin_host    publisher-confirm-type: correlated    publisher-returns: true    listener:      simple:        acknowledge-mode: manual        retry:          enabled: true    #开启失败重试          max-attempts: 3    #最大重试次数          initial-interval: 1000  #重试间隔时间 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;@Slf4j@Componentpublic class RabbitConfig {    //业务交换机    public static final String EXCHANGE_PHCP = "phcp";    //业务队列1    public static final String QUEUE_COMPANY = "company";    //业务队列1的key    public static final String ROUTINGKEY_COMPANY = "companyKey";    //业务队列2    public static final String QUEUE_PROJECT = "project";    //业务队列2的key    public static final String ROUTINGKEY_PROJECT = "projecTKEy";    //死信交换机    public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";    //死信队列1    public static final String QUEUE_COMPANY_DEAD = "company_dead";    //死信队列2    public static final String QUEUE_PROJECT_DEAD = "project_dead";    //死信队列1的key    public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";    //死信队列2的key    public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";//    /     * 解决重复确认报错问题,如果没有报错的话,就不用启用这个//     *//     * @param connectionFactory//     * @return//     *///    @Bean//    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//        factory.setConnectionFactory(connectionFactory);//        factory.setMessageConverter(new Jackson2JSONMessageConverter());//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//        return factory;//    }        @Bean("exchangePhcp")    public DirectExchange exchangePhcp() {        return new DirectExchange(EXCHANGE_PHCP);    }     * 声明死信交换机    @Bean("exchangePhcpDead")    public DirectExchange exchangePhcpDead() {        return new DirectExchange(EXCHANGE_PHCP_DEAD);     * 声明业务队列1     *     * @return    @Bean("queueCompany")    public Queue queueCompany() {        Map<String,Object> arguments = new HashMap<>(2);        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);        //绑定该队列到死信交换机的队列1        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);        return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();     * 声明业务队列2    @Bean("queueProject")    public Queue queueProject() {        //绑定该队列到死信交换机的队列2        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);        return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();     * 声明死信队列1    @Bean("queueCompanyDead")    public Queue queueCompanyDead() {        return new Queue(QUEUE_COMPANY_DEAD);     * 声明死信队列2    @Bean("queueProjectDead")    public Queue queueProjectDead() {        return new Queue(QUEUE_PROJECT_DEAD);     * 绑定业务队列1和业务交换机     * @param queue     * @param directExchange    @Bean    public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);     * 绑定业务队列2和业务交换机    public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);     * 绑定死信队列1和死信交换机    public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);     * 绑定死信队列2和死信交换机    public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);}

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;import com.example.rabitmqdemo.mydemo.config.RabbitConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.NIO.charset.StandardCharsets;import java.util.UUID;@Component@Slf4jpublic class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{    @Resource    private RabbitTemplate rabbitTemplate;        @PostConstruct    public void init() {        rabbitTemplate.setConfirmCallback(this);        rabbitTemplate.setReturnsCallback(this);        rabbitTemplate.setMandatory(true);    }        @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (ack) {            System.out.println("消息发送成功" + correlationData);        } else {            System.out.println("消息发送失败:" + cause);        }    }        @Override    public void returnedMessage(ReturnedMessage returnedMessage) {        String str = new String(returnedMessage.getMessage().getBody());        System.out.println("消息发送失败:" + str);    }        public void sendCompany(String str){        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);        //也可以用下面的方式        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);    }        public void sendProject(String str){        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);        //也可以用下面的方式        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);    }}

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component@Slf4jpublic class RabbitConsumer {        @RabbitListener(queues = "company")    public void company(Message message, Channel channel) throws IOException {        try{            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());            channel.basicQos(1);            Thread.sleep(2000);            String s = new String(message.getBody());            log.info("处理消息"+s);            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机            //String str = null;            //str.split("1");            //处理成功,确认应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("处理消息时发生异常:"+e.getMessage());            Boolean redelivered = message.getMessageProperties().getRedelivered();            if(redelivered){                log.error("异常重试次数已到达设置次数,将发送到死信交换机");                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);            }else {                log.error("消息即将返回队列处理重试");                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            }        }    }        @RabbitListener(queues = "project")    public void project(Message message, Channel channel) throws IOException {        try{            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());            channel.basicQos(1);            Thread.sleep(2000);            String s = new String(message.getBody());            log.info("处理消息"+s);            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机            //String str = null;            //str.split("1");            //处理成功,确认应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("处理消息时发生异常:"+e.getMessage());            Boolean redelivered = message.getMessageProperties().getRedelivered();            if(redelivered){                log.error("异常重试次数已到达设置次数,将发送到死信交换机");                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);            }else {                log.error("消息即将返回队列处理重试");                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            }        }    }}

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component@Slf4jpublic class RabbitConsumerDead {        @RabbitListener(queues = "company_dead")    public void company_dead(Message message, Channel channel) throws IOException {        try{            channel.basicQos(1);            String s = new String(message.getBody());            log.info("处理死信"+s);            //在此处记录到数据库、报警之类的操作            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("接收异常:"+e.getMessage());        }    }        @RabbitListener(queues = "project_dead")    public void project_dead(Message message, Channel channel) throws IOException {        try{            channel.basicQos(1);            String s = new String(message.getBody());            log.info("处理死信"+s);            //在此处记录到数据库、报警之类的操作            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("接收异常:"+e.getMessage());        }    }}

测试

MqController

package com.example.rabitmqdemo.mydemo.controller;import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;import lombok.extern.slf4j.Slf4j;import org.springframework.WEB.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RequestMapping("/def")@RestController@Slf4jpublic class MsGController {    @Resource    private RabbltProducer rabbltProducer;        @RequestMapping("/handleCompany")    public void handleCompany(@RequestBody String jsonStr){        rabbltProducer.sendCompany(jsonStr);    }}

“SpringBoot如何整合RabbitMQ实现死信交换机”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: SpringBoot如何整合RabbitMQ实现死信交换机

本文链接: https://www.lsjlt.com/news/340726.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SpringBoot如何整合RabbitMQ实现死信交换机
    本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!环境Windows1...
    99+
    2023-07-02
  • SpringBoot整合RabbitMQ实战教程附死信交换机
    目录前言环境配置配置文件业务消费者死信消费者测试前言 使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那...
    99+
    2024-04-02
  • RabbitMQ交换机与Springboot整合的简单实现
    RabbitMQ-交换机 1、交换机是干什么的? 消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ实现交换机与队列的绑定
    目录简介配置方法概述法1:配置类(简洁方法)(推荐)法2:配置类(繁琐方法)(不推荐)法3:使用方配置(不推荐)法4:MQ服务端网页(不推荐)简介 本文用实例介绍SpringBoot...
    99+
    2024-04-02
  • springboot整合RabbitMQ发送短信的实现
    目录RabbitMQ安装和运行MQ服务器设置创建用户创建虚拟机实现发送短信rabbit-mqservice-baseservice-core中发送消息service-sms中监听消息...
    99+
    2024-04-02
  • springcloud中RabbitMQ死信队列与延迟交换机实现方法
    目录0.引言1. 死信队列1.2 什么是死信?1.3 什么是死信队列?1.4 创建死信交换机、死信队列1.5 实现死信消息1.5.1 基于消费者进行reject或nack实现死信消息...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ处理死信队列和延迟队列
    目录简介实例代码路由配置控制器发送器接收器application.yml实例测试简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列。 Ra...
    99+
    2024-04-02
  • springboot整合RabbitMQ发送短信的实现方法
    这篇文章主要介绍springboot整合RabbitMQ发送短信的实现方法,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!RabbitMQ安装和运行# 安装rpm -ivh erlang-...
    99+
    2023-06-15
  • SpringBoot整合RabbitMQ实现消息确认机制
    前面几篇案例已经将常用的交换器(DirectExchange、TopicExchange、FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认。 ...
    99+
    2024-04-02
  • Springboot集成RabbitMQ死信队列的实现
    目录关于死信队列什么样的消息会进入死信队列?场景分析代码实现场景模拟生产者消费者,设置死信队列监听关于死信队列 在大多数的MQ中间件中,都有死信队列的概念。死信队列同其他的队列一样都...
    99+
    2024-04-02
  • SpringBoot+RabbitMQ 实现死信队列的示例
    前言 死信:无法被消费的消息,称为死信。 如果死信一直留在队列中,会导致一直被消费,却从不消费成功。 所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-lette...
    99+
    2024-04-02
  • Java中SpringBoot如何整合RabbitMQ
    这篇文章主要为大家展示了“Java中SpringBoot如何整合RabbitMQ”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Java中SpringBoot如何整合RabbitMQ”这篇文章吧。...
    99+
    2023-06-05
  • SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列
    今天小编给大家分享一下SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了...
    99+
    2023-06-30
  • 详解SpringBoot整合RabbitMQ如何实现消息确认
    目录简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册ConfirmCallback和ReturnCallback消费者消息确认介绍手动确认...
    99+
    2024-04-02
  • SpringBoot如何实现整合微信支付
    这篇文章将为大家详细讲解有关SpringBoot如何实现整合微信支付,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1.准备工作1.1 数据库表这里涉及微信支付一共两个表:订单表支付记录表1.2 实体类数据...
    99+
    2023-06-22
  • spring boot如何实现对RabbitMQ整合
    本篇文章为大家展示了spring boot如何实现对RabbitMQ整合,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。1.Fanout Exchange介绍Fanout Exchange 消息广播的...
    99+
    2023-05-31
    springboot rabbitmq bit
  • 详解SpringBoot如何实现整合微信登录
    目录1.准备工作1.1 获取微信登录凭证1.2 配置文件1.3 添加依赖1.4 创建读取公共常量的工具类1.5 HttpClient工具类2.实现微信登录2.1 具体流程2.2 生成...
    99+
    2024-04-02
  • RabbitMQ死信机制实现延迟队列的实战
    目录延迟队列应用场景TimeToLive(TTL) DeadLetterExchanges(DLX)延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当...
    99+
    2024-04-02
  • SpringBoot整合RedisTemplate如何实现缓存信息监控
    这篇文章给大家分享的是有关SpringBoot整合RedisTemplate如何实现缓存信息监控的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。SpringBoot 整合 Redis 数据库实现数据缓存的本质是整合...
    99+
    2023-06-28
  • .NET Core中RabbitMQ使用死信队列如何实现
    本篇内容介绍了“.NET Core中RabbitMQ使用死信队列如何实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!在.NET ...
    99+
    2023-07-05
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作