Spring事务传播机制总结及源码解读

olivee 2年前 ⋅ 1363 阅读
  • 参考:
    • https://www.baeldung.com/spring-transactional-propagation-isolation
    • https://wiyi.org/how-does-transaction-suspension-in-spring.html
    • https://stackoverflow.com/questions/33729810/what-does-suspending-a-transaction-means
    • https://blog.csdn.net/nmgrd/article/details/51883405

1. 原理

Spring creates a proxy, or manipulates the class byte-code, to manage the creation, commit, and rollback of the transaction. In the case of a proxy, Spring ignores @Transactional in internal method calls.

Simply put, if we have a method like callMethod and we mark it as @Transactional, Spring will wrap some transaction management code around the invocation@Transactional method called:

createTransactionIfNecessary();
try {
    callMethod();
    commitTransactionAfterReturning();
} catch (exception) {
    completeTransactionAfterThrowing();
    throw exception;
}

每次数据库操作的时候,会调用 DataSourceUtils的getConnection方法,这个方法会通过TransactionSynchronizationManager从当前线程中获取ConnectionHolder,如果存在表示线程当前在事务中,就可以通过ConnectionHolder来获取事务。当我们自己通过原生的JDBC来操作的时候,而又有jpa、mybatis等混合代码,又想通过Spring的@Transactional来控制事务,就可以通过 DataSourceUtils.getConnection来获取连接,这样获取的连接就会受到事务的控制。

注意: 如果在@Transaction注解内的服务(不管是REQUIRED、SUPPORTS、MANDATORY、REQUIRES_NEW、NOT_SUPPORTED、NEVER、还是NESTED传播机制),用DataSourceUtils获取的链接都不用手动去关闭,关闭了反而会报错。因为spring 的事务会自动去关闭链接。

DataSourceUtils 的 getConnection 的源代码参考:1.1.4.

2. 使用

可以在interfaces, classes,和 method中使用@Transactional。

如果定义在class中,那么class中所有的public mothod都会被应用到@Transactional

然而private 或 protected method 方法中的@Transactional不会生效(由于AOP的代理机制)

例如:

@Transactional
public interface TransferService {
    void transfer(String user1, String user2, double val);
}

TransferServiceImpl中的Transactional会覆盖TransferService的Transactional配置:

@Service
@Transactional
public class TransferServiceImpl implements TransferService {
    @Override
    public void transfer(String user1, String user2, double val) {
        // ...
    }
}

transfer方法的Transactional 会覆盖接口中的Transactional配置:

@Transactional
public void transfer(String user1, String user2, double val) {
    // ...
}

如果调用方法不会应用到AOP代理,那么也不会应用到@Transactional,例如:

public void methodA() {
    // 这里调用mthodB,methodB中的@Transactional不会生效,因为没有走spring的AOP代理
    this.methodB();
}

@Transactional
public void methodB() {
    // ...
  
}

3. 事务传播 Propagation

Spring calls TransactionManager::getTransaction to get or create a transaction according to the propagation. It supports some of the propagations for all types of TransactionManager, but there are a few of them that are only supported by specific implementations of TransactionManager.

3.1 REQUIRED(需要的) - 默认

需要有事事务,有就用,没有就创建。

Spring checks if there is an active transaction, and if nothing exists, it creates a new one. Otherwise, the business logic appends to the currently active transaction:

@Transactional(propagation = Propagation.REQUIRED)
public void requiredExample(String user) { 
    // ... 
}
// 或者
@Transactional
public void requiredExample(String user) { 
    // ... 
}

伪代码:

if (isExistingTransaction()) {
    if (isValidateExistingTransaction()) {
        validateExisitingAndThrowExceptionIfNotValid();
    }
    return existing;
}
return createNewTransaction();

3.2 SUPPORTS(支持的)

有就用,没有就不用。 -- 表示可有可无,是否启用事务依赖外层。

For SUPPORTS, Spring first checks if an active transaction exists. If a transaction exists, then the existing transaction will be used. If there isn't a transaction, it is executed non-transactional:

@Transactional(propagation = Propagation.SUPPORTS)
public void supportsExample(String user) { 
    // ... 
}

spring实现的伪代码:

if (isExistingTransaction()) {
    if (isValidateExistingTransaction()) {
        validateExisitingAndThrowExceptionIfNotValid();
    }
    return existing;
}
return emptyTransaction;

3.3 MANDATORY(强制的)

要求必须在事务内,如果没在就报错。

When the propagation is MANDATORY, if there is an active transaction, then it will be used. If there isn't an active transaction, then Spring throws an exception:

@Transactional(propagation = Propagation.MANDATORY)
public void mandatoryExample(String user) { 
    // ... 
}

spring实现的伪代码:

if (isExistingTransaction()) {
    if (isValidateExistingTransaction()) {
        validateExisitingAndThrowExceptionIfNotValid();
    }
    return existing;
}
throw IllegalTransactionStateException;

3.4 NEVER(从不)

  • 重不需要事务,如果在事务内就报错。
  • 如果没在事务内,则不启用事务,但是服务内的操作都共用一个Connection

For transactional logic with NEVER propagation, Spring throws an exception if there's an active transaction:

@Transactional(propagation = Propagation.NEVER)
public void neverExample(String user) { 
    // ... 
}

spring实现的伪代码:

if (isExistingTransaction()) {
    throw IllegalTransactionStateException;
}
return emptyTransaction;

3.5 NOT_SUPPORTED

  • 不支持事务,如果在事务里面不报错
  • 服务内的操作都共用一个Connection
  • 如果jdbc是auto commit,则执行成功多少就提交多少

If a current transaction exists, first Spring suspends it, and then the business logic is executed without a transaction:

@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void notSupportedExample(String user) { 
    // ... 
}

spring实现的代码片段:

if (isExistingTransaction()) {
    suspend(existing);
}
return emptyTransaction;

3.5 REQUIRES_NEW

  • 新建事务,不影响外层事务

When the propagation is REQUIRES_NEW, Spring suspends the current transaction if it exists, and then creates a new one:

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void requiresNewExample(String user) { 
    // ... 
}

Spring伪代码:

if (isExistingTransaction()) {
    suspend(existing);
    try {
        return createNewTransaction();
    } catch (exception) {
        resumeAfterBeginException();
        throw exception;
    }
}
return createNewTransaction();

3.6 NESTED

如果外层有事务,则创建save point,回滚只回滚内层事务。

如果外层没有事务,则新建事务,这时和REQUIRED差不多。

For NESTED propagation, Spring checks if a transaction exists, and if so, it marks a save point. This means that if our business logic execution throws an exception, then the transaction rollbacks to this save point. If there's no active transaction, it works like REQUIRED.

DataSourceTransactionManager supports this propagation out-of-the-box. Some implementations of JTATransactionManager may also support this.

JpaTransactionManager supports NESTED only for JDBC connections. However, if we set the nestedTransactionAllowed flag to true, it also works for JDBC access code in JPA transactions if our JDBC driver supports save points.

@Transactional(propagation = Propagation.NESTED)
public void nestedExample(String user) { 
    // ... 
}

4. 场景总结

4.1 场景总览图

image-20220222140507463.png

下面对几个主要的场景列举说明:

4.2 外层有事务,内层事务沿用外层Connection(一个事务,一个Connection)

外层(REQUIRED、NESTED、或REQUIRES_NEW)、内层(REQUIRED、SUPPORTS、或MANDATORY)

内网外层共用一个事务,要么全部执行成功 commit事务,要么部分实行失败,全部rollback事务

不能部分rollback,部分 commit(比如:如果想把一个标记为REQUIRED的子服务,在它失败的时候捕获异常,只回滚这个子服务,其它服务执行成功commit,这种)

  • 代码
// TestService 类
    @Autowired
    private TestJdbc1 testJdbc1;

    // 两个内部服务,由于都是REQUIRED,所以要么全部回滚,要么全部执行成功提交事务
    // 下面的方法中:testREQUIRED不会抛出异常,会执行提交事务,而testREQUIREDInError执行失败,会标记为事务为rollback-only;因此执行完testREQUIRED方法再提交事务的时候,会抛出UnexpectedRollbackException异常
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIRED(){
        // 内部服务1,执行成功
        testJdbc1.testREQUIREDInSuccess();
        try {
            // 内部服务2,执行失败,会标记为事务回滚
            testJdbc1.testREQUIREDInError();
        } catch (Exception e) {
            // 这里捕获了异常,但是事务还是标记为回滚了
            // 应为内层服务在外层事务里面,用的是同一个Colletion,内层服务需要rollback,而外层事务需要commit,存在冲突,所以外层的服务在Commit的时候会抛出UnexpectedRollbackException异常
            // 正确的写法是 再把异常抛出去  throw e,因为外层代码有事务,内层代码
        }
    }

// TestJdbc1 类
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIREDInSuccess(){
        // SQL成功
        String sql = "insert into test values ('REQUIRED 0')";
        jdbcTemplate.update(sql);
    }
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIREDInError(){
            // SQL失败 看看事务会不会回滚
            sql = "insert into test values ('REQUIRED 1', 'error')";
            jdbcTemplate.update( sql );
    }

4.3 外层有事务,内层新建事务(两个事务,两个Connetion)

外层(REQUIRED、NESTED、或REQUIRES_NEW)、内层(REQUIRES_NEW)

内层事务rollback不影响外层事务,应为啥两个Connetion

内层抛出的异常,如果在外层没有捕获,那么外层继续抛出异常,会导致外层事务也rollback

  • 代码
// TestService 类
    @Autowired
    private TestJdbc1 testJdbc1;
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIRED_REQUIRES_NEW() {
        //  这里testREQUIREDInSuccess没有事务,相当于就在外层的testREQUIRED_REQUIRES_NEW事务里面
        testJdbc1.testREQUIREDInSuccess();
        try {
            // 这里testREQUIRES_NEWInError抛出异常
            // 由于定义的为REQUIRES_NEW,因此会新创建事务,同时新建Connection
            testJdbc1.testREQUIRES_NEWInError();
        } catch (Exception e) {
            // 这里捕获了异常,就不会影响外层的事务
            // 因为是两个事务,两个Connection,所以内层事务rollback对外层事务不影响
            // 如果不捕获异常,当抛出异常的时候,外层的事务也会rollback
        }
    }

// TestJdbc1 类
    public void testREQUIREDInSuccess(){
        // SQL成功
        String sql = "insert into test values ('REQUIRED 0')";
        jdbcTemplate.update(sql);
    }

    // 内层启动新事务,获取一个新的Conneciton
    @Transactional( propagation = Propagation.REQUIRES_NEW )
    public void testREQUIRES_NEWInError() {
        // 执行失败
        String sql = "insert into test values ('REQUIRED 1', 'error')";
        jdbcTemplate.update( sql );
    }

4.4 外层有事务,内层NESTED事务(一个事务,一个Connetion)

外层(REQUIRED、NESTED、或REQUIRES_NEW)、内层(NESTED)

由于内层事务为NESTED,因此如果内层事务抛出异常,则只回滚到内层服务,不影响外层服务(这种情况的结果和外层有事务,内层新建事务的执行结果是一样的,但是区别是这里只有一个jdbc Connection,内层新建事务会有两个jdbc Connection)

  • 代码
// TestService 类
    @Autowired
    private TestJdbc1 testJdbc1;
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIRED_NESTED() {
        //  这里testREQUIREDInSuccess没有事务,相当于就在外层的testREQUIRED_NESTED事务里面
        testJdbc1.testREQUIREDInSuccess();
        try {
            // 这里testNESTEDInError抛出异常
            // 由于定义的为NESTED,因此会嵌套在外层事务里面,内层事务和外层事务公用一个Connection
            testJdbc1.testNESTEDInError();
        } catch (Exception e) {
            // 这里捕获了异常,就不会影响外层的事务,内层事务rollbak不会影响外层事务
            // 因为用来savepoint机制,虽然是1个Connection,但是rollback的时候只回滚到savepoint到位置,所以不会影响外层的事务
            // 如果不捕获异常,当抛出异常的时候,外层的事务也会rollback
        }
    }

// TestJdbc1 类
    public void testREQUIREDInSuccess(){
        // SQL成功
        String sql = "insert into test values ('REQUIRED 0')";
        jdbcTemplate.update(sql);
    }

    // 内层事务嵌套在外层事务里面,如果内层事务抛出异常,只回滚内层事务
    @Transactional( propagation = Propagation.NESTED )
    public void testNESTEDInError() {
        // 执行失败
        String sql = "insert into test values ('REQUIRED 1', 'error')";
        jdbcTemplate.update( sql );
    }

4.5 外层有事务,内层无事务(一个事务,两个Connetion)

外层(REQUIRED、NESTED、或REQUIRES_NEW)、内层(NOT_SUPPORTED)

内层方法操作了jdbc多次,就会获取多个jdbc连接

内层抛出异常不影响外层事务,应为内层的Connetion和外层不一样

内层抛出的异常,如果在外层没有捕获,那么外层继续抛出异常,会导致外层事务也rollback

  • 代码
// TestService 类
    @Autowired
    private TestJdbc1 testJdbc1;
    @Transactional( propagation = Propagation.REQUIRED )
    public void testREQUIRED_NOT_SUPPORTED() {
        //  这里testREQUIREDInSuccess没有事务,相当于就在外层的testREQUIRED_NOT_SUPPORTED事务里面
        testJdbc1.testREQUIREDInSuccess();
        try {
            // 这里testNOT_SUPPORTEDInError抛出异常
            // 由于定义的为NOT_SUPPORTED,因此不会用到事务,在方法的jdbc操作时候会新获取数据库连接
            testJdbc1.testNOT_SUPPORTEDInError();
        } catch (Exception e) {
            // 这里捕获了异常,就不会影响外层的事务
            // 如果不捕获异常,当抛出异常的时候,外层的事务也会rollback
        }
    }

// TestJdbc1 类
    public void testREQUIREDInSuccess(){
        // SQL成功
        String sql = "insert into test values ('REQUIRED 0')";
        jdbcTemplate.update(sql);
    }
    @Transactional( propagation = Propagation.NOT_SUPPORTED )
    // 虽然不支持事务,但是内层的方法公用一个Connection,如果autocommit为true,则前面执行成功的sql会提交
    public void testNOT_SUPPORTEDInError() {
        // 执行失败
        String sql = "insert into test values ('REQUIRED 1', 'error')";
        // 这里jdbc操作一次,就会获取一次jdbc连接
        jdbcTemplate.update( sql );
    }

4.6 外层有事务,内层NEVER注解(内层不支持事务)

调用内层服务抛出 IllegalTransactionStateException 异常

NEVER和NOT_SUPPORTED区别是

  • NOT_SUPPORTED无事务,但是有一个jdbc connection,会用这个connection执行sql
  • NEVER不支持事务,抛出异常

4.7 外层有事务,内层无@Transactional注解(一个事务,一个Connetion)

外层(REQUIRED、NESTED、或REQUIRES_NEW)、内层无@Transactional注解。相当于只要外层一个AOP,只有一个事务。

相当于没有用到事务的传播机制,内层的服务方法没有被spring的transaction AOP代理

内层的服务方法和外层的服务方法在一个事务里面,内层的服务方法公用外层事务的连接

4.8 内、外层都无@Transactional(0个事务,多个Connetion,每次JDBC都对应一个Connetion)

没有事务,每次JDBC操作都会创建一个连接,用完后立刻释放

  • 优点: 连接释放快
  • 缺点: 频繁获取连接
  • 适用场景:连个jdbc操作间隔的java逻辑代码耗时的场景,这两个jdbc操作就不能放在一个事务里面。如:查询10个订单,对订单进行耗时几秒的处理,更新订单状态,因为中间间隔了几秒的较长时间,所以这个流程建议就不放在一个事务里面。

5. 注意

5.1 关于事务与Connection的关系

  • 开启一个事务就会创建一个数据库连接
  • 当事务结束的时候才会释放数据库连接
  • 如果外层和内层有两个事务,则会创建两个数据库连接,外层的数据库连接将再内层的事务结束后才会释放。
    • 因此在非必要情况下,不要在一个后台业务请求中用嵌套的内、外两个事务。
    • 一个业务请求就创建两个数据库连接,占用数据库连接资源
    • 如果内层事务耗费时间长,外层事务需要等很久才能结束,那么也就是说数据库连接很久才能释放。将会长期霸占数据库连接,导致别的业务请求不能获取数据库连接。
    • 当内层事务为MANDATORY、REQUIRES_NEW的时候会新建事务,因此尽量避免内层事务使用MANDATORY、REQUIRES_NEW
  • 如果内层沿用外层事务,则内层的数据库操作和外层共用一个数据库连接
    • 当内层定义为REQUIED、SUPPORTS的时候,沿用外层事务
  • 如果不开启事务,那么JdbcTemplate、JPA、Mybatis会通过DataSourceUtils的getConnection方法获取连接,这时获取到的连接是不受事务控制的
    • 当不开启事务时,每次JdbcTemplate、或Mybatis操作数据库都会获并释放连接
    • 如果操作了多次JdbcTemplate、或Mybatis,那就会获取多次数据库连接,频繁的获取和释放数据库,可能会影响性能
    • NEVER、NOT_SUPPORTED、或外层方事务定义为SUPPORTS,也相当于不开启事务,内层的数据库连接都不受事务控制
    • 在不开启事务的情况下,DataSource获取Connection、DataSourceUtils的getConnection获取的连接,都不收事务的控制,需要手动去关闭。
    • 在不开启事务的情况下,数据库是否自动commit,依赖于DataSource数据源中配置的数据源是否设置为自动提交
  • 在开启事务的情况下
    • 通过DataSourceUtils的getConnection获取的连接就是事务中的连接,事务会自动关闭或commit这个连接,代码中不需要写关闭数据库、commit等操作
    • 如果是通过DataSource获取Connection,那么这个Connection是不受事务控制的,需要自己手动关闭。这里需要特别注意★★★★,不要误以为定义了@Transactional事务,就可以不关闭通过DataSource获取的Connection数据库连接。

5.2 关于数据库事务的性能

  • 耗费时间长的非数据库操作(比如HTTP请求、加载文件流等)尽量不要放在事务中,这样将导致长期不能释放数据库连接,霸占数据库连接
    • 尽量通过程序设计,把时间长的非数据库操作放到事务之外
  • 每个数据库操作尽量控制在秒级别,最好1秒以内,避免长期占用数据库连接(并发稍微高一点就可能造成排队等待数据库连接)
  • 对于是否开启事务对性能的影响:取决于事务中的非数据库操作的时长
    • 如果非数据库操作的时长较多,则考虑设计成不开启事务,操作完数据库就释放连接资源,这样可以尽快释放连接
    • 如果非数据库操作的时长很短,则考虑开启事务,这样多个数据库操作可以共用数据库连接,可以避免多次获取、释放数据库连接的时间

5.3 关于外层方法的Propagation

从字面意思上将,Propagation翻译为事务传播级别,那么可以理解为改参数仅对内层的@Transactional有效。即理解为当外层定义了@Transactional的时候,内层的@Transactional定义事务是否开启事务(Rqeuired、Supports和MANDATORY)、不开启事务(NOT_SUPPORTED)、新建事务(REQUIRES_NEW)、报错(NEVER)等。然而事实上不是这样的,Propagation定义在外层的@Transactional也是有效的。

  • 当外层的定义的@Transactional的Propagation为REQUIRED、REQUIRES_NEW、NESTED的时候,则会开启事务;
  • 由于默认的Propagation是REQUIRED,因此默认外层方法会开启事务
  • 当外层的Propagation为MANDATORY的时候会抛出异常;
  • 当外层的Propagation为Supports、NOT_SUPPORTED、NEVER的时候,不会创建事务。相当于外层方法就没有事务,和没有写@Transactional的效果是一样的。(在外层方法定义@Transactional(propagation = Propagation.NOT_SUPPORTED)相当于是多此一举,反而增加spring aop的开销)

5.4 关于DataSource数据源

  • DataSource中可以设置数据库连接的autocommit,如果不设置一般默认都是true(DruidDataSource和HikariDataSource默认的autocommit都是true)
  • 如果不启用spring事务,那么所有的数据库操作(Jdbc、JdbcTemplate)执行是否成功,就依赖于DataSource设置的autocommit
  • 如果默认的数据库连接的autocommit为true,在启用Spring的事物后,spring会在事务开始的时候把Connection设置为false,在事务结束后把autocommit还原为true

5.5 关于事务的中的异常

  • 关于事务中抛出异常
    1. 开启事务后,如果在事务内抛出了运行时异常,事务会回滚。
    2. 接着会把这个异常继续往外抛(异常不会因为事务回滚而终止)
    3. 如果事务服务的外层代码中捕获了异常,即便捕获了异常也不影响事务的回滚。如果还事务外还存在外层事务,那么分两种情况:如果内层事务是REQUIRED、SUPPORTS、或MANDATORY,则外层事务也必须回滚;否则(内侧事务为REQUIRES_NEW、REQUIRES_NEW、NESTED)外层事务可以不回滚

spring调用事务内代码捕获的是Trowable异常,因此任何异常都会被spring的代理捕获到,但是默认的回滚只在RuntimeException和Error子异常才回滚。spring内部捕获异常的代码:

      //  org.springframework.transaction.interceptor.TransactionAspectSupport的方法中可以看到:
      try {
                // 调用具体方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 处理异常,根据事务配置,可能 rollback 事务
                completeTransactionAfterThrowing(txInfo, ex);
                // 继续往外抛出异常
                throw ex;
            }

注意:

  • 捕获异常回滚,看的是运行时抛出的真实异常,而不是开的方法定义中声明抛出的异常,如下

    • 比如下面声明抛出的是Exception,但是真实抛出的是RuntimeException,因此也会事务回滚

          @Transactional(propagation = Propagation.REQUIRED)
          public void testREQUIREDIn2() throws Exception{
              try {
                  jdbcTemplate.xxx
              } catch (Exception e) {
                  throw new RuntimeException("error");
              }
          }
      
  • 如果想自定义回滚的异常,通过如下代码设置

    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    

6. spring 源代码解读

6.1 主要类图

image-20220113134403087.png

6. 2 TransactionAspectSupport类

  • 所在package: org.springframework.transaction.interceptor
  • 所在jar: spring-tx-5.3.14.jar

6.2.1 invokeWithinTransaction方法(AOP包裹事务调用业务方法)

            // 如果需要,创建一个事务对象
            // 它会调用 AbstractPlatformTransactionManager 的 getTransaction 方法
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // 调用具体方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 处理异常,这里可能会rollback事务
                completeTransactionAfterThrowing(txInfo, ex);
                // 异常继续往外抛
                throw ex;
            }
            finally {
                // 清理事务信息
                cleanupTransactionInfo(txInfo);
            }

            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                // Set rollback-only in case of Vavr failure matching our rollback rules...
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }
            // 提交事务,如果可以则 commit 事务
            commitTransactionAfterReturning(txInfo);
            return retVal;

6.3 事务处理类

6.3.1 父类 AbstractPlatformTransactionManager

抽象基类实现Spring的标准事务工作流,实现的子类有:JtaTransactionManager和DataSourceTransactionManager等。

  • 所在package: org.springframework.transaction.support
  • 所在jar: spring-tx-5.3.14.jar

6.3.1.1 getTransaction(获取事务)

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();
        if (isExistingTransaction(transaction)) {
            // 如果存在Transaction
            // 参考后续 AbstractPlatformTransactionManager 的 handleExistingTransaction 方法代码
            return handleExistingTransaction(def, transaction, debugEnabled);
        }
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }
    
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            // 如果设置了必须存在事务里面,而又没有发现事务,则抛出异常
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 如果不存在事务,且事务的传播策略是REQUIRED、REQUIRES_NEW、或 NESTED
            // 则新建事务
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                // debug打印事务的名称,开启org.springframework.transaction.support.AbstractPlatformTransactionManager的debug日志可以看到
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // 不需要事务的情况,仅仅设置事务的状态
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }

6.3.1.2 handleExistingTransaction(处理已存在事务场景)

    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            // 如果已存在事务,而内部的传播策略为 NEVER, 则抛出异常
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            // NOT_SUPPORTED 表示内部不支持事务,则先suspend挂起老事务
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            // 创建一个新的事务
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                return startTransaction(definition, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                // 检查不支持 nested Transaction
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // 使用SavePoint
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // 创建事务
                return startTransaction(definition, transaction, debugEnabled, null);
            }
        }

        // 如果事务传播配置为 PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED,则用外层的事务,这里不做处理
        // 由于和外层共用事务,因此需要校验当前的 Isolation及ReadOnly 配置,需要能够兼容外层事务
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
    
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    // 如果当前的定义事务隔离级别,和外层的事务隔离级别不同,则抛异常
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                // 如果当前@Transactional定义的readonly为false,而外部的@Transactional定义为true
                // 则抛出异常,逻辑上外层为readonly,内层也就只能为readonly
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

6.3.1.3 commit 和 processCommit(提交事务)

public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                // 调用具体实现类的 doCommit 方法,提交数据库事务
                doCommit(status);
            }
            else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            }
            else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        catch (RuntimeException | Error ex) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        cleanupAfterCompletion(status);
    }
}

6.3.2 DataSourceTransactionManager

  • 父类:AbstractPlatformTransactionManager - org.springframework.transaction.support
  • 所在包: org.springframework.jdbc.datasource
  • 所在jar: spring-jdbc-5.3.14.jar

6.3.2.1 doBegin (开始事务)

  • 如果Connection的autocommit为true,那么设置为false
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

6.3.2.2 doCleanupAfterCompletion(事务完成后清理资源)

  • 如果Connection的autocommit在事务开始的时候设置为了false,那么再重置为true
    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // Remove the connection holder from the thread, if exposed.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.unbindResource(obtainDataSource());
        }

        // Reset connection.
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }
            DataSourceUtils.resetConnectionAfterTransaction(
                    con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
        }
        catch (Throwable ex) {
            logger.debug("Could not reset JDBC Connection after transaction", ex);
        }

        if (txObject.isNewConnectionHolder()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
            }
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }

        txObject.getConnectionHolder().clear();
    }

6.3.2.3 doCommit(提交事务)

protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.commit();
    }
    catch (SQLException ex) {
        throw translateException("JDBC commit", ex);
    }
}

6.3.2.4 doSuspend 和 doResume (挂起和恢复事务)

  • 挂起和恢复事务,实际上就是将线程中绑定的资源清除掉
  // 挂起
  // 当需要开启新的事务的时候,需要将老的事务挂起,也就是将老事务在线程中绑定的资源清除了,从而不会影响新的事务
  protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 清除 txObject 中的 ConnectionHolder
        txObject.setConnectionHolder(null);
        // 解绑 数据源为key的 ConnectionHolder
        return TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }

    // 恢复
    // 重新把老的事务的资源绑定到 线程中,这样就实现了事务的恢复
    protected void doResume(@Nullable Object transaction, Object suspendedResources) {
        // 绑定 数据源为key的 ConnectionHolder
        TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
    }

6.4 DataSourceUtils类

  • 所在package:org.springframework.jdbc.datasource
  • 所在jar: spring-jdbc-5.3.14.jar

6.4.1 getConnection 和 doGetConnection (获取数据库连接)

public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
    try {
        // 调用 doGetConnection
        return doGetConnection(dataSource);
    }
    catch (SQLException ex) {
        throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
    }
    catch (IllegalStateException ex) {
        throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
    }
}

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    Assert.notNull(dataSource, "No DataSource specified");
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
        // 如果在事务中,则从事务中获取已有Connection
        conHolder.requested();
        if (!conHolder.hasConnection()) {
            logger.debug("Fetching resumed JDBC Connection from DataSource");
            conHolder.setConnection(fetchConnection(dataSource));
        }
        return conHolder.getConnection();
    }
    logger.debug("Fetching JDBC Connection from DataSource");
    // 不从事务中获取 Connection, 直接从数据源获取 Connection
    Connection con = fetchConnection(dataSource);
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        try {
            // 如果开启了事务同步,则将Connection绑定到线程中,后续将从线程中获取该 Connection
            // 在 transaction 结束的时候, 线程中绑定的 Connection 会被移除
            ConnectionHolder holderToUse = conHolder;
            if (holderToUse == null) {
                holderToUse = new ConnectionHolder(con);
            }
            else {
                holderToUse.setConnection(con);
            }
            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(
                    new ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {
                TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }
        catch (RuntimeException ex) {
            releaseConnection(con, dataSource);
            throw ex;
        }
    }

    return con;
}

6.5 DefaultTransactionAttribute类(默认事务属性)

默认的异常捕获事务回滚,当为RuntimeException或Error的子异常,则会回滚。

package org.springframework.transaction.interceptor;

public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {
  // ...
  
  public boolean rollbackOn(Throwable ex) {
        // rollback 的条件是 RuntimeException 或 Error 的子异常
        return (ex instanceof RuntimeException || ex instanceof Error);
    }
  //...
  
}

6.6 事务处理主要代码时序图

image-20220112160158231.png