iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >springboot整合rocketmq如何实现分布式事务
  • 728
分享到

springboot整合rocketmq如何实现分布式事务

2023-06-15 07:06:24 728人浏览 泡泡鱼
摘要

这篇文章给大家分享的是有关SpringBoot整合RocketMQ如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1 执行流程(1) 发送方向 MQ 服务端发送消息。(2) MQ Server 将

这篇文章给大家分享的是有关SpringBoot整合RocketMQ如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

1 执行流程

springboot整合rocketmq如何实现分布式事务

(1) 发送方向 MQ 服务端发送消息。
(2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
(3) 发送方开始执行本地事务逻辑。
(4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

2 工程

springboot整合rocketmq如何实现分布式事务

2.1 pom

<parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.3.0.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-WEB</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastJSON</artifactId>            <version>1.2.71</version>        </dependency>        <dependency>            <groupId>org.apache.commons</groupId>            <artifactId>commons-collections4</artifactId>            <version>4.2</version>        </dependency>        <dependency>            <groupId>org.apache.commons</groupId>            <artifactId>commons-lang3</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-logging</artifactId>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->        <dependency>            <groupId>org.apache.rocketmq</groupId>            <artifactId>rocketmq-spring-boot-starter</artifactId>            <version>2.0.1</version>        </dependency>        <dependency>            <groupId>org.apache.rocketmq</groupId>            <artifactId>rocketmq-client</artifactId>            <version>4.3.2</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-Maven-plugin</artifactId>                <version>2.3.0.RELEASE</version>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.8.1</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build>

2.2 application.yml

rocketmq:  name-server: 192.168.38.50:9876  producer:    group: transcation-group

2.3 TransactionListenerImpl

@RocketMQtransactionListener(txProducerGroup = "transaction-producer-group")@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener {    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();        @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);        try {            System.out.println("用户A账户减500元.");            System.out.println("用户B账户加500元.");            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);            return RocketMQLocalTransactionState.COMMIT;        } catch (Exception e) {            e.printStackTrace();        }        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);        return RocketMQLocalTransactionState.UNKNOWN;    }        @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);        log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));        return STATE_MAP.get(transId);    }}

2.4 SpringTransactionProducer

@Component@Slf4jpublic class SpringTransactionProducer {    @Autowired    private RocketMQTemplate rocketMQTemplate;        public void sendMsg(String topic, String msg) {        Message<String> message = MessageBuilder.withPayload(msg).build();        this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);        log.info("发送成功");    }}

2.5 SpringTxConsumer

@Component@RocketMQMessageListener(topic = "pay_topic",        consumerGroup = "transaction-consumer-group",        selectorExpression = "*")@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> {    @Override    public void onMessage(String msg) {        log.info("接收到消息 -> {}", msg);    }}

2.6 ProducerController

@RestController@RequestMapping("/producer")public class ProducerController {    @Autowired    private SpringTransactionProducer springTransactionProducer;    @GetMapping("/sendMsg")    public String sendMsg() {        springTransactionProducer.sendMsg("pay_topic", "用户A账户减500元,用户B账户加500元。");        return "发送成功";    }}

2.7 RocketApplication

@SpringBootApplicationpublic class RocketApplication {    public static void main(String[] args) {        SpringApplication.run(RocketApplication.class);    }}

3 测试

3.1 正常消费测试

描述: 正常启动及可。

springboot整合rocketmq如何实现分布式事务

springboot整合rocketmq如何实现分布式事务

3.2 回查代码测试

描述: 执行本地事务时添加异常,重启测试,发现消费者没有收到消息。

springboot整合rocketmq如何实现分布式事务

springboot整合rocketmq如何实现分布式事务

springboot整合rocketmq如何实现分布式事务

感谢各位的阅读!关于“springboot整合rocketmq如何实现分布式事务”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

--结束END--

本文标题: springboot整合rocketmq如何实现分布式事务

本文链接: https://www.lsjlt.com/news/278565.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作