广告
返回顶部
首页 > 资讯 > 前端开发 > JavaScript >如何利用MQ实现事务补偿
  • 710
分享到

如何利用MQ实现事务补偿

2024-04-02 19:04:59 710人浏览 泡泡鱼
摘要

本篇内容介绍了“如何利用MQ实现事务补偿”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!RabbitMQ 在

本篇内容介绍了“如何利用MQ实现事务补偿”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

RabbitMQ互联网公司有着大规模应用,本篇将实战介绍 SpringBoot 整合 rabbitMQ,同时也将在具体的业务场景中介绍利用 MQ  实现事务补偿操作。

一、介绍

本篇我们一起来实操一下springBoot整合rabbitMQ,为后续业务处理做铺垫。

废话不多说,直奔主题!

二、整合实战

2.1、创建一个 maven 工程,引入 amqp 包

<!--amqp 支持--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.2、在全局文件中配置 rabbitMQ 服务信息

spring.rabbitmq.addresses=197.168.24.206:5672 spring.rabbitmq.username=guest spring.rabbitmq.passWord=guest spring.rabbitmq.virtual-host=/

其中,spring.rabbitmq.addresses参数值为 rabbitmq 服务器地址

2.3、编写 rabbitmq 配置类

@Slf4j @Configuration public class RabbitConfig {           @Bean     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,                                         @Value("${spring.rabbitmq.username}") String userName,                                         @Value("${spring.rabbitmq.password}") String password,                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) {         CachinGConnectionFactory connectionFactory = new CachingConnectionFactory();         connectionFactory.setAddresses(addresses);         connectionFactory.setUsername(userName);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(vhost);         return connectionFactory;     }           @Bean     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){         return new RabbitAdmin(connectionFactory);     }           @Bean     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);         //数据转换为JSON存入消息队列         rabbitTemplate.setMessageConverter(new Jackson2jsonMessageConverter());         return rabbitTemplate;     }           @Bean     public RabbitUtil rabbitUtil(){         return new RabbitUtil();     }  }

2.4、编写 RabbitUtil 工具类

public class RabbitUtil {      private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);      @Autowired     private RabbitAdmin rabbitAdmin;      @Autowired     private RabbitTemplate rabbitTemplate;           public void addExchange(String exchangeType, String exchangeName){         Exchange exchange = createExchange(exchangeType, exchangeName);         rabbitAdmin.declareExchange(exchange);     }           public boolean deleteExchange(String exchangeName){         return rabbitAdmin.deleteExchange(exchangeName);     }           public void addQueue(String queueName){         Queue queue = createQueue(queueName);         rabbitAdmin.declareQueue(queue);     }           public boolean deleteQueue(String queueName){         return rabbitAdmin.deleteQueue(queueName);     }           public void deleteQueue(String queueName, boolean unused, boolean empty){         rabbitAdmin.deleteQueue(queueName,unused,empty);     }           public void purgeQueue(String queueName){         rabbitAdmin.purgeQueue(queueName, false);     }           public boolean existQueue(String queueName){         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;     }           public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         rabbitAdmin.declareBinding(binding);     }           public void addBinding(Binding binding){         rabbitAdmin.declareBinding(binding);     }           public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         removeBinding(binding);     }           public void removeBinding(Binding binding){         rabbitAdmin.removeBinding(binding);     }           public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         //声明交换器         addExchange(exchangeType, exchangeName);         //声明队列         addQueue(queueName);         //声明绑定关系         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);     }           public void convertAndSend(String exchange, String routingKey, final Object object){         rabbitTemplate.convertAndSend(exchange, routingKey, object);     }           public Message getMessage(String messageType, Object msg){         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType(messageType);         Message message = new Message(msg.toString().getBytes(),messageProperties);         return message;     }           private Exchange createExchange(String exchangeType, String exchangeName){         if(ExchangeType.DIRECT.equals(exchangeType)){             return new DirectExchange(exchangeName);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return new TopicExchange(exchangeName);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             return new HeadersExchange(exchangeName);         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return new FanoutExchange(exchangeName);         }         return null;     }           private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         if(ExchangeType.DIRECT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             if(isWhereAll){                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();             }else{                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();             }         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));         }         return null;     }           private Queue createQueue(String queueName){         return new Queue(queueName);     }            public final static class ExchangeType {                   public final static String DIRECT = "DIRECT";                   public final static String TOPIC = "TOPIC";                   public final static String HEADERS = "HEADERS";                   public final static String FANOUT = "FANOUT";     } }

此致, rabbitMQ 核心操作功能操作已经开发完毕!

2.5、编写队列监听类(静态)

@Slf4j @Configuration public class DirectConsumeListener {           @RabbitListener(queues = "mq.direct.1")     public void consume(Message message, Channel channel) throws IOException {         log.info("DirectConsumeListener,收到消息: {}", message.toString());     } }

如果你需要监听指定的队列,只需要方法上加上@RabbitListener(queues = "")即可,同时填写对应的队列名称。

但是,如果你想动态监听队列,而不是通过写死在方法上呢?

请看下面介绍!

2.6、编写队列监听类(动态)

重新实例化一个SimpleMessageListenerContainer对象,这个对象就是监听容器

@Slf4j @Configuration public class DynamicConsumeListener {           @Bean     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);         container.setMessageListener((MessageListener) message -> {             log.info("ConsumerMessageListen,收到消息: {}", message.toString());         });         return container;     } }

如果想向SimpleMessageListenerContainer添加监听队列或者移除队列,只需通过如下方式即可操作。

@Slf4j @RestController @RequestMapping("/consumer") public class ConsumerController {      @Autowired     private SimpleMessageListenerContainer container;      @Autowired     private RabbitUtil rabbitUtil;           @PostMapping("addQueue")     public void addQueue(@RequestBody ConsumerInfo consumerInfo) {         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());         if(!existQueue){             throw new CommonExecption("当前队列不存在");         }         //消费mq消息的类         container.addQueueNames(consumerInfo.getQueueName());         //打印监听容器中正在监听到队列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }           @PostMapping("removeQueue")     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {         //消费mq消息的类         container.removeQueueNames(consumerInfo.getQueueName());         //打印监听容器中正在监听到队列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }           @PostMapping("queryListenerQueue")     public void queryListenerQueue() {         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     } }

2.7、发送消息到交换器

发送消息到交换器,非常简单,只需要通过如下方式即可!

  • 先编写一个请求参数实体类

@Data public class ProduceInfo implements Serializable {      private static final long serialVersionUID = 1l;           private String exchangeName;           private String routingKey;           public String msg; }
  • 编写接口api

@RestController @RequestMapping("/produce") public class ProduceController {      @Autowired     private RabbitUtil rabbitUtil;           @PostMapping("sendMessage")     public void sendMessage(@RequestBody ProduceInfo produceInfo) {         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);     }  }

当然,你也可以直接使用rabbitTemplate操作类,来实现发送消息。

rabbitTemplate.convertAndSend(exchange, routingKey, message);

参数内容解释:

  • exchange:表示交换器名称

  • routingKey:表示路由键key

  • message:表示消息

2.8、交换器、队列维护操作

如果想通过接口对 rabbitMQ 中的交换器、队列以及绑定关系进行维护,通过如下方式接口操作,即可实现!

先编写一个请求参数实体类

@Data public class QueueConfig implements Serializable{      private static final long serialVersionUID = 1l;           private String exchangeType;           private String exchangeName;           private String queueName;           private String routingKey; }

编写接口api

 @RestController @RequestMapping("/config") public class RabbitController {       @Autowired     private RabbitUtil rabbitUtil;           @PostMapping("addExchange")     public void addExchange(@RequestBody QueueConfig config) {         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());     }           @PostMapping("deleteExchange")     public void deleteExchange(@RequestBody QueueConfig config) {         rabbitUtil.deleteExchange(config.getExchangeName());     }           @PostMapping("addQueue")     public void addQueue(@RequestBody QueueConfig config) {         rabbitUtil.addQueue(config.getQueueName());     }           @PostMapping("deleteQueue")     public void deleteQueue(@RequestBody QueueConfig config) {         rabbitUtil.deleteQueue(config.getQueueName());     }           @PostMapping("purgeQueue")     public void purgeQueue(@RequestBody QueueConfig config) {         rabbitUtil.purgeQueue(config.getQueueName());     }           @PostMapping("addBinding")     public void addBinding(@RequestBody QueueConfig config) {         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }           @PostMapping("removeBinding")     public void removeBinding(@RequestBody QueueConfig config) {         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }           @PostMapping("andExchangeBindingQueueOfHeaderAll")     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAll");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);     }           @PostMapping("andExchangeBindingQueueOfHeaderAny")     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAny");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);     } }

至此,rabbitMQ 管理器基本的 crud 全部开发完成!

三、利用 MQ 实现事务补偿

当然,我们花了这么大的力气,绝不仅仅是为了将 rabbitMQ 通过 WEB  项目将其管理起来,最重要的是能投入业务使用中去!

上面的操作只是告诉我们怎么使用 rabbitMQ!

  • 当你仔细回想整个过程的时候,其实还是回到最初那个问题,什么时候使用 MQ ?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:支付订单、扣减库存、生成相应单据、发红包、发短信通知等等。

在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用  MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取 MQ 的消息(或者由 MQ  推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

这种是利用 MQ 实现业务解耦,其它的场景包括最终一致性、广播、错峰流控等等。

利用 MQ 实现业务解耦的过程其实也很简单。

  • 当主流程结束之后,将消息推送到发红包、发短信交换器中即可

@Service public class OrderService {      @Autowired     private RabbitUtil rabbitUtil;           @Transactional     public void createOrder(Order order){         //1、创建订单         //2、调用库存接口,减库存         //3、向客户发放红包         rabbitUtil.convertAndSend("exchange.send.bonus", null, order);         //4、发短信通知         rabbitUtil.convertAndSend("exchange.sms.message", null, order);     }  }
  • 监听发红包操作

 @RabbitListener(queues = "exchange.send.bonus") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //调用发红包接口 }

监听发短信操作

 @RabbitListener(queues = "exchange.sms.message") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //调用发短信接口 }

既然 MQ 这么好用,那是不是完全可以将以前的业务也按照整个模型进行拆分呢?

答案显然不是!

当引入 MQ 之后业务的确是解耦了,但是当 MQ 一旦挂了,所有的服务基本都挂了,是不是很可怕!

但是没关系,俗话说,兵来将挡、水来土掩,这句话同样适用于 IT 开发者,有坑填坑!

“如何利用MQ实现事务补偿”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: 如何利用MQ实现事务补偿

本文链接: https://www.lsjlt.com/news/76053.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何利用MQ实现事务补偿
    本篇内容介绍了“如何利用MQ实现事务补偿”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!rabbitMQ 在...
    99+
    2022-10-19
  • 利用Redis如何实现自动补全功能
    忘了redis从哪个版本开启,能够根据输入的部分命令前缀给出提示,即自动补全。接下来笔者介绍基于redis实现这个很酷的功能。 about sorted set 假设结果中有mara,marabel,ma...
    99+
    2022-10-18
  • 详解如何利用GORM实现MySQL事务
    目录前言禁用默认事务自动事务手动事务嵌套事务SavePoint、RollbackTo小结前言 为了确保数据一致性,在项目中会经常用到事务处理,回滚操作还是比较常见的需求;事务处理可以用来维护数据库的完整性,保证成批的sq...
    99+
    2022-09-07
  • 如何利用WMI实现系统补丁检测分析
    这篇文章主要介绍“如何利用WMI实现系统补丁检测分析”,在日常操作中,相信很多人在如何利用WMI实现系统补丁检测分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何利用WMI实现系统补丁检测分析”的疑惑有所...
    99+
    2023-06-08
  • 如何利用Redis实现分布式事务管理
    如何利用Redis实现分布式事务管理引言:随着互联网的快速发展,分布式系统的使用越来越广泛。在分布式系统中,事务管理是一项重要的挑战。传统的事务管理方式在分布式系统中难以实现,并且效率低下。而利用Redis的特性,我们可以轻松地实现分布式事...
    99+
    2023-11-07
    管理 redis 分布式事务
  • PostgreSQL中怎么利用DBLink实现自治事务
    本篇文章给大家分享的是有关PostgreSQL中怎么利用DBLink实现自治事务,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。安装dblink...
    99+
    2022-10-18
  • JavaBean中怎么利用JDBC实现事务处理
    今天就跟大家聊聊有关JavaBean中怎么利用JDBC实现事务处理,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。JDBC(Java Data Base Connectivity,ja...
    99+
    2023-06-17
  • 怎么在Spring中利用@Transactional实现事务回滚
    今天就跟大家聊聊有关怎么在Spring中利用@Transactional实现事务回滚,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、使用场景举例在了解@Transactional怎...
    99+
    2023-05-30
    spring transactional
  • ORACLE PL/SQL 利用自治事务实现日志记录
    程序中通常都要实现日志记录功能,尤其是事务发生报错时的错误日志。如果把日志记录在数据库中,可以方便后续的查询和分析。但是如果直接把记录日志的共能写在事务中,如果事务发生ROLLBACK,记录的日志也会发生R...
    99+
    2022-10-18
  • Restful服务如何利用Jersey来实现
    这篇文章将为大家详细讲解有关Restful服务如何利用Jersey来实现,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1.创建maven-web工程,后面就是正常的maven工程创建流程。2...
    99+
    2023-05-31
    jersey restful ers
  • 如何利用Redis和Haskell实现事件驱动的应用功能
    如何利用Redis和Haskell实现事件驱动的应用功能引言:Redis是一个高性能的键值存储系统,常用于缓存、消息队列、实时计算等场景。Haskell是一种强类型的函数式编程语言,拥有高度的表达能力和强大的类型系统。Redis和Haske...
    99+
    2023-10-22
    redis Haskell 事件驱动
  • 怎么在Java中利用JDBC实现一个事务功能
    本文章向大家介绍怎么在Java中利用JDBC实现一个事务功能,主要包括怎么在Java中利用JDBC实现一个事务功能的使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。Java是什么Java是一门面...
    99+
    2023-05-30
  • 如何利用原生JS实现触摸滑动监听事件
    前言 今天写一个小Demo,有个地方涉及到了左滑右滑的逻辑,本来想着用插件来着,但是想到自己好久没用原生JS写滑动的监听了,所以试着用原生JS来实现了一下,毕竟温故而知新嘛,同时做...
    99+
    2022-11-12
  • kafka事务是如何实现的
    Kafka提供了基于消息的分布式事务机制,可以确保消息的原子性、一致性和持久性。Kafka事务的实现基于以下两个核心概念:生产者事务...
    99+
    2023-09-14
    kafka
  • MySQL如何实现事务的ACID
    前言 最近在面试,有被问到,MySQL的InnoDB引擎是如何实现事务的,又或者说是如何实现ACID这几个特性的,当时没有答好,所以自己总结出来,记录一下。 事务的四大特性ACID 事务的四大特性ACID分别是,A-...
    99+
    2022-05-20
    MySQL 事务 MySQL 实现事务特性 MySQL 事务 acid
  • 我们如何实现MySQL事务?
    我们知道,在事务中,语句是作为一个单元执行的。如果事务内的任何操作失败,则整个事务将失败并应回滚;否则,语句所做的任何更改都会保存到数据库中。为了实现事务,MySQL 提供了以下语句 -START TRANSACTION顾名思义,事务从此语...
    99+
    2023-10-22
  • Django事务回滚如何实现
    这篇文章主要介绍“Django事务回滚如何实现”,在日常操作中,相信很多人在Django事务回滚如何实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Django事务回滚如何实现”的疑惑有所帮助!接下来,请跟...
    99+
    2023-07-05
  • 如何利用redis实现倒计时任务
    这篇文章主要介绍“如何利用redis实现倒计时任务”,在日常操作中,相信很多人在如何利用redis实现倒计时任务问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何利用redi...
    99+
    2022-10-19
  • 如何用C#实现SAGA分布式事务
    目录背景成功的 SAGA异常的 SAGA子事务屏障写在最后背景 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的...
    99+
    2022-11-13
    Saga分布式事务 C#实现SAGA分布式事务
  • 如何利用ChatGPT和Python实现对话事件的时序管理
    如何利用ChatGPT和Python实现对话事件的时序管理引言:随着人工智能的快速发展,ChatGPT作为一种基于大规模预训练模型的对话生成模型,已经成为自然语言处理领域的热门技术之一。然而,仅凭ChatGPT本身还无法实现对话事件的时序管...
    99+
    2023-10-24
    ChatGPT Python 对话事件管理
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作