返回顶部
首页 > 资讯 > 数据库 >Spring方法中调用异步方法进行事务控制详解
  • 200
分享到

Spring方法中调用异步方法进行事务控制详解

springjava数据库mysql 2023-08-20 09:08:40 200人浏览 独家记忆
摘要

spring 异步事务控制 文章目录 Spring 异步事务控制Spring 事务源码逻辑一、事务拦截器拦截二、进行事务控制三、事务开启 / 回滚 /提交操作四、事务完成,清除事务信息简单总结 异步方法事务控制方案一:自身编码

spring 异步事务控制

前言:这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

Spring 事务源码逻辑

在进行 Spring 异步事务控制编码前,先了解下 Spring 是如何进行事务控制的。

一、事务拦截器拦截

定义了@Transactional的方法会被代理,由代理对象执行方法。会进入TransactionInterceptor#invoke()方法

public Object invoke(MethodInvocation invocation) throws Throwable {    Class<?> targetClass = (invocation.getThis() != null ? aopUtils.getTargetClass(invocation.getThis()) : null);    // Adapt to TransactionAspectSupport's invokeWithinTransaction...    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {// ...        }    });}

二、进行事务控制

进入invokeWithinTransaction()方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,             final InvocationCallback invocation) throws Throwable {    // If the transaction attribute is null, the method is non-transactional.    TransactionAttributeSource tas = getTransactionAttributeSource();    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);    // 获取事务管理器    final TransactionManager tm = determineTransactionManager(txAttr);    // ...    PlatfORMTransactionManager ptm = asPlatformTransactionManager(tm);    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {        // 定义事务相关信息,如隔离级别、传播机制等        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);        Object retVal;        try {            // 调用被代理方法            retVal = invocation.proceedWithInvocation();        }        catch (Throwable ex) {            // target invocation exception            completeTransactionAfterThrowing(txInfo, ex);            throw ex;        }        finally {            // 设置ThraedLocal数据为null            cleanupTransactionInfo(txInfo);        }       // ...                // 提交/回滚事务        commitTransactionAfterReturning(txInfo);        return retVal;    }    // ...}

进入 createTransactionIfNecessary()方法

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,                           @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {    // ...    TransactionStatus status = null;    if (txAttr != null) {        if (tm != null) {            // 构建事务状态,如:启动事务、设置数据库连接信息等            status = tm.getTransaction(txAttr);        }        // ...    }    // 返回事务信息    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}

getTransaction() 方法

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)    throws TransactionException {    // 使用默认的事务定义    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// 获取事务对象,进入这个方法可以看到事务信息都是存在 ThreadLocal 中的    Object transaction = doGetTransaction();    boolean debugEnabled = logger.isDebugEnabled();    // ...    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {        SuspendedResourcesHolder suspendedResources = suspend(null);                try {            // 开启事务            return startTransaction(def, transaction, debugEnabled, suspendedResources);        }        // ...    }}

进入 DataSourceTransactionManager # doGetTransaction()方法

protected Object doGetTransaction() {    // DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类    DataSourceTransactionObject txObject = new DataSourceTransactionObject();    txObject.setSavepointAllowed(isNestedTransactionAllowed());    ConnectionHolder conHolder =        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());    txObject.setConnectionHolder(conHolder, false);    return txObject;}// 进入 TransactionSynchronizationManager # getResource()方法private static Object doGetResource(Object actualKey) {    Map<Object, Object> map = resources.get();    if (map == null) {        return null;    }    Object value = map.get(actualKey);    // Transparently remove ResourceHolder that was marked as void...    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {        map.remove(actualKey);        // Remove entire ThreadLocal if empty...        if (map.isEmpty()) {            resources.remove();        }        value = null;    }    return value;}// 观察 TransactionSynchronizationManager 对象public abstract class TransactionSynchronizationManager {private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");}

三、事务开启 / 回滚 /提交操作

事务的控制操作都是由 DataSourceTransactionManager 来进行的,如:

  • doBegin:开启事务
  • doCommit:提交事务
  • doRollback:回滚事务

其中主要是从 ThreadLocal 中取出 Connection 对象进行事务控制,以 doCommit 为例:

protected void doCommit(DefaultTransactionStatus status) {    // 获取连接对象    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();    Connection con = txObject.getConnectionHolder().getConnection();    if (status.isDebug()) {        logger.debug("Committing JDBC transaction on Connection [" + con + "]");    }    try {        // 提交        con.commit();    }    catch (sqlException ex) {        throw translateException("JDBC commit", ex);    }}

四、事务完成,清除事务信息

在第二步结尾,进入commitTransactionAfterReturning()

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {    if (txInfo != null && txInfo.getTransactionStatus() != null) {        if (logger.isTraceEnabled()) {            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");        }        // 提交/回滚事务        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());    }}public final void commit(TransactionStatus status) throws TransactionException {    if (status.isCompleted()) {        throw new IllegalTransactionStateException(            "Transaction is already completed - do not call commit or rollback more than once per transaction");    }    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;    if (defStatus.isLocalRollbackOnly()) {        if (defStatus.isDebug()) {            logger.debug("Transactional code has requested rollback");        }        // 回滚        processRollback(defStatus, false);        return;    }    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {        if (defStatus.isDebug()) {            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");        }        // 回滚        processRollback(defStatus, true);        return;    }// 提交    processCommit(defStatus);}

以提交为例,进入processCommit()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {    try {        boolean beforeCompletionInvoked = false;        try {            boolean unexpectedRollback = false;            // 各种异常、判断等前置处理        }        finally {            // 清除事务信息            cleanupAfterCompletion(status);        }    }}private void cleanupAfterCompletion(DefaultTransactionStatus status) {    status.setCompleted();    if (status.isNewSynchronization()) {        TransactionSynchronizationManager.clear();    }    if (status.isNewTransaction()) {        doCleanupAfterCompletion(status.getTransaction());    }    if (status.getSuspendedResources() != null) {        if (status.isDebug()) {            logger.debug("Resuming suspended transaction after completion of inner transaction");        }        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());    }}

简单总结

由上面的源码可以知道,Spring 的事务控制主要是一下步骤:

  1. 对标注了事务注解的方法进行动态代理
  2. 代理方法的前置处理是获取数据库连接,定义事务信息等,存储在 ThreadLocal 中
  3. 开启事务
  4. 执行方法逻辑
  5. 提交 / 回滚事务
  6. 清除事务信息

异步方法事务控制

这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

方案一:自身编码控制数据库连接

这个方法是自己结合对 Spring 事务控制理解想到的,许多细节并没有考虑到,只提供一种思路。

说到底 Spring 的事务控制还是基于数据库连接的,只不过它帮助我们简化了操作。所以如果我们自己去维护这个数据库连接,然后再对它进行手动的事务控制即可。

PS:要注意的是,使用 mybatis 结合 Spring 时,Mybatis 获取的数据库连接,是通过 Spring 获得的,所以如果自己在执行 Mybatis 方法前创建数据库连接,再手动控制是没有用的,因为自己创建的数据库连接和 Mybatis 使用的不是同一个。

由于如上情况,如果完全的靠自己获取事务连接进行事务控制,那么就还需要改写 Mybatis 执行的逻辑,这样就很麻烦。所以这里使用的方案是依旧利用 Spring 获取数据库连接对象,只不过要将这个对象拿出来,自己维护。

编码

通过之前的源码知道,Spring 获取数据库连接获取后,将其存储到 DataSourceTransactionObject 类中的

protected void doBegin(Object transaction, TransactionDefinition definition) {    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;    Connection con = null;    try {        if (!txObject.hasConnectionHolder() ||            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {            Connection newCon = obtainDataSource().getConnection();            if (logger.isDebugEnabled()) {                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");            }            // 将获取的 Connection 对象设置到 DataSourceTransactionObject            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);        }        // ...    }}

DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类,所以没法在外部使用它,所以这里利用了反射来使用这个对象。

方法 A

需要注意下面的代码获取到数据库连接对象后只进行了简单处理,将其设置到了该类的一个属性中,实际使用需要考虑如何维护问题

public void asyncTrans(Boolean flag) {    DefaultTransactionStatus status = null;    try {        DefaultTransactionDefinition def = new DefaultTransactionDefinition();        // 设置事务信息(事务名、传播方式)        def.setName("SomeTxName");        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);        // 获取事务状态        status = (DefaultTransactionStatus) transactionManager.getTransaction(def);        // 获取事务信息(实际类型 DataSourceTransactionObject,是DataSourceTransactionManager的私有静态内部类)        Object transaction = status.getTransaction();        // 由于是私有静态内部类,所以根据反射调用,获取数据库连接        Method[] methods = ReflectionUtils.getAllDeclaredMethods(transaction.getClass());        for (Method method : methods) {            // 获取数据库连接            if ("getConnectionHolder".equals(method.getName())) {                ConnectionHolder connectionHolder = (ConnectionHolder) ReflectionUtils.invokeMethod(method, transaction);                Connection connection = connectionHolder.getConnection();                // 将该连接自己维护(这里简单处理,只是设置到了该类的一个属性中,实际使用需要考虑如何维护问题)                this.connection = connection;            }        }        // 执行方法逻辑        AreaZoneRel zoneRel = new AreaZoneRel();        zoneRel.setZoneName("华北");        zoneRel.setAreaName("北京");        zoneRelMapper.insert(zoneRel);        // 执行异步方法        asyncMethod(flag);    } catch (Exception ex) {        // 回滚        transactionManager.rollback(status);        throw new RuntimeException(ex);    } finally {        status.setCompleted();        if (status.isNewSynchronization()) {            TransactionSynchronizationManager.clear();        }    }}

异步方法 B

PS:下面针对事务 A 的回滚 / 提交异常只是简单处理,直接将连接关闭(事务会自动回滚),实际使用需要考量。

此外下面的实现有许多细节没有注意,如:A 提交失败,B 的回滚处理问题。

private void asyncMethod(Boolean flag) {    // 执行异步方法    Callable callable = () -> {        try {            // 子线程本身的事务控制使用Spring的事务控制            transactionTemplate.execute(status -> {                AreaGraphRel graphRel = new AreaGraphRel();                graphRel.setAreaName("上海");                graphRel.setGraphName("上海.svg");                graphRelMapper.insert(graphRel);                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }                // 异常模拟                if (flag) {                    int a = 1 / 0;                }                return null;            });        } catch (Exception e) {            // 如果异步方法抛出异常,则回滚外部线程的事务            try {                connection.rollback();            } catch (Exception exception) {                // 粗暴处理,异常直接关闭连接                connection.close();                throw new RuntimeException(exception);            }            throw new RuntimeException(e);        }        try {            // 提交            connection.commit();        } catch (Exception exception) {            // 粗暴处理,异常直接关闭连接            connection.close();            throw new RuntimeException(exception);        }        return 200;    };    FutureTask task = new FutureTask(callable);    new Thread(task).start();}

方案二:基于快照实现

这个方案是根据 Seata AT 模式的思路实现的

这种方式就是对需要进行控制的方法生成数据快照,如:将 A 方法提交事务前数据信息记录下来,作为前一个版本的数据快照;这时执行 B 方法时,当 B 方法执行失败,其事务回滚,那么就根据快照将 A 方法的数据更新为上一个版本。

PS:这里有如下几点需要注意:

  1. 当利用快照回滚时,需要考虑是否要保证当前数据信息和快照修改后的数据一致问题。
    因为如果在基于快照进行更新时,该数据被其他调用进行数据修改,此时再基于快照更新数据则是将期间其他的正常操作结果都覆盖了,那么要考虑这样是否会对系统数据造成影响。
  2. 当利用快照回滚时,调用失败的异常处理。
    如果是上面 1 比较不一致导致回滚异常,那么只能通过告警,人为处理;如果是超时等情况可以利用 MQ 等消息中间件进行不断的失败重试,最后再结合异常报警通知,来通知人手动处理。

来源地址:https://blog.csdn.net/AhangA/article/details/129978356

您可能感兴趣的文档:

--结束END--

本文标题: Spring方法中调用异步方法进行事务控制详解

本文链接: https://www.lsjlt.com/news/376321.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作