iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)
  • 193
分享到

java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

2024-04-02 19:04:59 193人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

目录前言一、项目依赖二、数据源配置三、数据源的注册四、配置数据源对应的sqlSessionFactory五、测试接口六、建立JtaTestContoller.java七、在test.

前言

首先,到底啥是分布式事务呢,比如我们在执行一个业务逻辑的时候有两步分别操作A数据源和B数据源,当我们在A数据源执行数据更改后,在B数据源执行时出现运行时异常,那么我们必须要让B数据源的操作回滚,并回滚对A数据源的操作;这种情况在支付业务时常常出现;比如买票业务在最后支付失败,那之前的操作必须全部回滚,如果之前的操作分布在多个数据源中,那么这就是典型的分布式事务回滚;

了解了什么是分布式事务,那分布式事务在java的解决方案就是JTA(即Java Transaction api);SpringBoot官方提供了 Atomikos or Bitronix的解决思路;

其实,大多数情况下很多公司是使用消息队列的方式实现分布式事务。

本篇文章重点讲解springboot环境下,整合 Atomikos +Mysql+mybatis+Tomcat/jetty;

一、项目依赖

pom.xml中添加atomikos的springboot相关依赖:

<!--分布式事务-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

点进去会发现里面整合好了:​​transactions-jms​​、​​transactions-jta​​、​​transactions-jdbc​​、​​javax.transaction-api​

二、数据源配置

把数据源的相关配置项单独提炼到一个application.yml中:

注意:

  • 这回我们的spring.datasource.type 是com.alibaba.druid.pool.xa.DruidXADataSource;
  • ​spring.jta.transaction-manager-id​​的值在你的电脑中是唯一的,这个详细请阅读官方文档;

完整的yml文件如下:

spring:
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
druid:

systemDB:
name: systemDB
url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
username: root
passWord: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true

businessDB:
name: businessDB

url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
username: root
password: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
#jta相关参数配置
jta:
log-dir: classpath:tx-logs
transaction-manager-id: txManager

三、数据源的注册

在DruidConfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册

package com.zjt.config;
import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.support.Http.StatViewServlet;
import com.alibaba.druid.support.http.WEBStatFilter;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.web.servlet.FilterReGIStrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;

@Configuration
public class DruidConfig {
@Bean(name = "systemDataSource")
@Primary
@Autowired
public DataSource systemDataSource(Environment env){
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.systemDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("systemDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;

}
@Autowired
@Bean(name = "businessDataSource")
public AtomikosDataSourceBean businessDataSource(Environment env){

AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.businessDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("businessDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);

return ds;
}

@Bean(name = "xatx")
public JtaTransactionManager regTransactionManager (){
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
private Properties build(Environment env, String prefix){
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
prop.put("timeBetweenEvictionRunsMillis",
env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
prop.put("filters", env.getProperty(prefix + "filters"));

return prop;
}
@Bean
public ServletRegistrationBean druidServlet(){
ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid
@Configuration
// 精确到 mapper 目录,以便跟其他数据源隔离
@MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory")
public class MybatisDatasourceConfig {
@Autowired
@Qualifier("systemDataSource")
private DataSource ds;
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapper xml目录
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper
}

MybatisDatasource2Config.java

package com.zjt.config;
import com.zjt.util.MyMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;

@Configuration
// 精确到 mapper 目录,以便跟其他数据源隔离
@MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2")
public class MybatisDatasource2Config {
@Autowired
@Qualifier("businessDataSource")
private DataSource ds;
@Bean
public SqlSessionFactory sqlSessionFactory2() throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapper xml目录
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2
}

由于我们本例中只使用一个事务管理器:xatx,故就不在使用​​TxAdviceInterceptor.java​​和​​TxAdvice2Interceptor.java​​中配置的事务管理器了;有需求的童鞋可以自己配置其他的事务管理器;(见DruidConfig.java中查看)

五、测试接口

新建分布式业务测试接口JtaTestService.java和实现类JtaTestServiceImpl.java

其实就是一个很简单的test01()方法,在该方法中我们分别先后调用​​classService.saveOrUpdateTClass(tClass);​​和​​teacherService.saveOrUpdateTeacher(teacher);​

实现先后操作两个数据源:然后我们可以自己debug跟踪事务的提交时机,此外,也可以在在两个方法全执行结束之后,手动制造一个运行时异常,来检查分布式事务是否全部回滚;

注意:

在实现类的方法中我使用的是:

@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })从而指定了使用哪个事务管理器,事务隔离级别(一般都用我这个默认的),回滚的条件(一般可以使用Exception),这三个可以自己根据业务实际修改;
package com.zjt.service3;
import java.util.Map;
public interface JtaTestService {
public Map<String,Object> test01();
}
package com.zjt.service3.impl;

import com.zjt.entity.TClass;
import com.zjt.entity.Teacher;
import com.zjt.service.TClassService;
import com.zjt.service2.TeacherService;
import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.LinkedHashMap;
import java.util.Map;

@Service("jtaTestServiceImpl")
public class JtaTestServiceImpl implements JtaTestService{

@Autowired
@Qualifier("teacherServiceImpl")
private TeacherService teacherService;
@Autowired
@Qualifier("tclassServiceImpl")
private TClassService tclassService;
@Override
@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })
public Map<String, Object> test01() {
LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
TClass tClass=new TClass();
tClass.setName("8888");
tclassService.saveOrUpdateTClass(tClass);
Teacher teacher=new Teacher();
teacher.setName("8888");
teacherService.saveOrUpdateTeacher(teacher);
System.out.println(1/0);
resultMap.put("state","success");
resultMap.put("message","分布式事务同步成功");
return resultMap;
}
}

六、建立JtaTestContoller.java

建立JtaTestContoller.java,接受一个来自前端的http请求,触发JtaTestService 的test01方法

package com.zjt.web;
import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.LinkedHashMap;
import java.util.Map;
@Controller
@RequestMapping("/jtaTest")
public class JtaTestContoller {
@Autowired
@Qualifier("jtaTestServiceImpl")
private JtaTestService taTestService;
@ResponseBody
@RequestMapping("/test01")
public Map<String,Object> test01(){
LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
try {
return taTestService.test01();
}catch (Exception e){
resultMap.put("state","fail");
resultMap.put("message","分布式事务同步失败");
return resultMap;
}
}
}

七、在test.ftl中增加一个按钮来测试

//分布式事务测试
$("#JTATest").click(function(){
$.ajax({
type: "POST",
url: "${basePath!}/jtaTest/test01",
data: {} ,
async: false,
error: function (request) {
layer.alert("与服务器连接失败/(ㄒoㄒ)/~~");
return false;
},
success: function (data) {
if (data.state == 'fail') {
layer.alert(data.message);
return false;
}else if(data.state == 'success'){
layer.alert(data.message);
}
}
});
});
<button class="layui-btn" id="JTATest">同时向班级和老师表插入名为8888的班级和老师</button>

八、启动服务,验证结果

点击这个按钮,跳转到controller:

当正常执行了sql语句之后,我们可以发现数据库并没有变化,因为整个方法的事务还没有走完,当我们走到1/0这步时:

抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:

在​​this.completeTransactionAfterThrowing(txInfo, var16);​​方法中会将事务全部回滚:

22:09:04.243 logback [http-NIO-8080-exec-5] INFO c.a.i.imp.CompositeTransactionImp - rollback() done of transaction 192.168.1.103.tm0000400006

此时,当我们再次打开数据库验证,依旧没有变化,证明分布式事务配置成功;

大家可以基于我的代码自己练习一下,自己尝试着使用多事务管理器的情况下的灵活配置;

到此这篇关于java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)的文章就介绍到这了,更多相关java SpringBoot 分布式事务 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

本文链接: https://www.lsjlt.com/news/166213.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)
    目录前言一、项目依赖二、数据源配置三、数据源的注册四、配置数据源对应的sqlSessionFactory五、测试接口六、建立JtaTestContoller.java七、在test....
    99+
    2024-04-02
  • JPA多数据源分布式事务处理方案
    目录前言问题背景XA事务方案XADataSource,XA协议数据源XAConnectionXAResource引入Atomikos依赖创建JTA事务管理器MysqlXA事务行为链式...
    99+
    2024-04-02
  • springboot+mybatisplus+druid如何实现多数据源+分布式事务
    这篇文章主要介绍springboot+mybatisplus+druid如何实现多数据源+分布式事务,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!  jdk环境:1.8  springboot:2.1.3.RELEA...
    99+
    2023-06-02
  • java分布式事务解决方案是什么
    Java分布式事务解决方案包括但不限于以下几种: 使用XA协议来管理分布式事务。XA协议是一种由X/Open组织定义的分布式事务...
    99+
    2024-04-02
  • 详解Java分布式事务的 6 种解决方案
    介绍 在分布式系统、微服务架构大行其道的今天,服务间互相调用出现失败已经成为常态。如何处理异常,如何保证数据一致性,成为微服务设计过程中,绕不开的一个难题。 在不同的业务场景下,解决...
    99+
    2024-04-02
  • JPA多数据源分布式事务的示例分析
    这篇文章主要介绍了JPA多数据源分布式事务的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。问题背景在解决mysql字段脱敏处理时,结合sharding-jdbc的脱敏...
    99+
    2023-06-29
  • LCN分布式事务解决方案详解
    目录一、什么是分布式事务?二、lcn的实现思路2.1 本地执行的状态怎么提交给全局事务?2.2 本地事务的提交或回滚怎么实现?三、lcn的使用3.1 下载lcn-manager (全...
    99+
    2024-04-02
  • 一文搞明白Java Spring Boot分布式事务解决方案
    目录前言1. 什么是反向补偿2. 基本概念梳理3. 什么是两阶段提交4. AT 模式5. TCC 模式6. XA 模式7. Saga 模式前言 分布式事务,咱们前边也聊过很多次了,网...
    99+
    2024-04-02
  • 分布式事务的7种解决方案是怎样的
    分布式事务的7种解决方案是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。 分布式事务最经典的七种解决方案随着...
    99+
    2024-04-02
  • 基于PostgreSQL/openGauss 的分布式数据库解决方案
    在 MySQL ShardingSphere-Proxy 逐渐成熟并被广泛采用的同时,ShardingSphere 团队也在 PostgreSQL ShardingSphere-Pr...
    99+
    2024-04-02
  • 图文精讲java常见分布式事务理论与解决方案
    目录CAP理论C(Consistence):一致性A(Availability):可用性P(Partition tolerance):分区容错性BASE理论BA(Basically ...
    99+
    2024-04-02
  • Oracle数据库分布式事务ORA-01591错误的解决方法
    这篇文章将为大家详细讲解有关Oracle数据库分布式事务ORA-01591错误的解决方法,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定...
    99+
    2024-04-02
  • 浅谈Java实现分布式事务的三种方案
    目录一、问题描述二、分布式事务2.1、什么是分布式系统2.2、什么是事务2.3、什么是本地事务2.4、什么是分布式事务三、如何进行分布式事务控制3.1、CAP理论3.2、分布式系统如...
    99+
    2024-04-02
  • java分布式事务之可靠消息最终一致性解决方案
    目录一、什么是可靠消息最终一致性事务1、本地事务与消息发送的原子性问题2、事务参与方接收消息的可靠性3、消息重复消费的问题二、解决方案1、本地消息表方案2、RocketMQ事务消息方...
    99+
    2022-11-13
    java分布式事务消息一致性 java分布式事务
  • 微服务分布式事务4种解决方案是怎么样的
    本篇文章给大家分享的是有关微服务分布式事务4种解决方案是怎么样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。分布式事务分布式事务是指事务的参与者,支持事务的服务器,资源服务器...
    99+
    2023-06-02
  • SpringBoot内置数据源的持久化与解决方案
    目录数据层解决方案与SQL有关的解决方案SpringBoot提供了3种内嵌的数据源对象供开发者选择内置持久化解决方案Springboot内置数据库数据层解决方案 SQLNoSQL 与...
    99+
    2024-04-02
  • Spring Cloud + Nacos + Seata整合过程(分布式事务解决方案)
    目录一、简介二、seata-server部署1、官网下载2、解压到本地3、修改配置文件4、seata数据库初始化5、业务数据库6、启动seata-server三、微服务项目集成Sea...
    99+
    2024-04-02
  • 关于MySQL与Golan分布式事务经典的七种解决方案
    目录1、基础理论1.1 事务1.2 分布式事务2、分布式事务的解决方案2.1 两阶段提交/XA2.2 SAGA2.3 TCC2.4 本地消息表2.5 事务消息2.6 最大努力通知2....
    99+
    2024-04-02
  • Unix系统下的大数据分析:跨平台分布式解决方案?
    在当今的信息时代,数据成为了企业和组织最宝贵的财富之一。处理海量数据的能力成为了企业发展的关键因素之一。而Unix系统作为一种高性能、高可靠性的操作系统,为大数据分析提供了一个优秀的平台。 随着数据量的不断增加,单机处理已经无法满足需求。...
    99+
    2023-07-26
    大数据 unix 分布式
  • MongoDB技术开发中遇到的分布式事务问题解决方案分析
    MongoDB技术开发中遇到的分布式事务问题解决方案分析随着互联网的迅猛发展,分布式系统变得越来越重要。在分布式系统中,数据库的一致性和事务的处理变得尤为关键。MongoDB作为一种流行的NoSQL数据库,也面临着分布式事务的挑战。本文将分...
    99+
    2023-10-22
    解决方案 MongoDB 分布式事务
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作