常见事务管理器综述

目录

常见事务管理器综述

事务管理器功能

  • 开启事务
  • 提交事务
  • 回滚事务
  • 管理多个关联事务

spring事务管理器设计

事务管理器接口定义为PlatformTransactionManager,默认实现为DataSourceTransactionManager,通过Connection接口提供的功能实现事务相关功能的管理,其本身不具备事务功能

主要功能:

  1. 负责事务的创建、提交、回滚
  2. 负责管理关联事务的事务传播

实现原理:

// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
  	//执行SQL
    executeSQLWithSameConnnection();
  	//提交事务
    transactionManager.commit(status);
} catch (Exception e) {
    //回滚事务
    transactionManager.rollback(status);
}

sharding-sphere事务管理器设计

img

sharding-sphere事务管理器(STM)与spring事务管理器(PTM)功能有所不同,STM负责为Connection提供事务相关功能,其本身并不会负责事务的创建、提交和回滚.

实现原理:

public final class ShardingConnection extends AbstractConnectionAdapter {
  @Override
    public void setAutoCommit(final boolean autoCommit) throws SQLException {
        if (TransactionType.LOCAL == transactionType) {
            super.setAutoCommit(autoCommit);
            return;
        }
        if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {
            return;
        }
        if (autoCommit && shardingTransactionManager.isInTransaction()) {
            shardingTransactionManager.commit();
            return;
        }
        if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
            closeCachedConnections();
            shardingTransactionManager.begin();
        }
    }
    
    @Override
    public void commit() throws SQLException {
        if (TransactionType.LOCAL == transactionType) {
            super.commit();
        } else {
            shardingTransactionManager.commit();
        }
    }
    
    @Override
    public void rollback() throws SQLException {
        if (TransactionType.LOCAL == transactionType) {
            super.rollback();
        } else {
            shardingTransactionManager.rollback();
        }
    }
}

STM通过SPI机制可以实现不同的事务功能

LOCAL:包装本地事务(Connection)

XA:包装XA事务管理器提供的事务功能

BASE:包装其他事务管理器(如seata)提供的事务功能

seata事务管理器

全局事务管理器:

负责全局事务的创建,存在于TM服务中,一般由@GlobalTransactional注解开启

全局事务与Connection是否存在无关

实现原理:

类似@Transactional声明式本地事务管理方式

参考代码:

声明式:

io.seata.tm.api.DefaultGlobalTransaction

io.seata.tm.DefaultTransactionManager

io.seata.spring.annotation.GlobalTransactionalInterceptor

手动事务:

io.seata.tm.api.TransactionalTemplate

分支事务管理:

不论本地事务创建在TM还是在RM,每一个本地事务均会创建一个分支事务

分支事务通过DataSourceProxy,ConnectionProxy,StatementProxy,PreparedStatementProxy等实现代理JDBC标准接口实现本地事务执行过程中分支事务的创建、提交、回滚

实现原理:

//通过targetConnection实现真正的事务功能
public class ConnectionProxy extends AbstractConnectionProxy {
  	@Override
    public void commit() throws SQLException {
        try {
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
  
    private void doCommit() throws SQLException {
          if (context.inGlobalTransaction()) {
              processGlobalTransactionCommit();
          } else if (context.isGlobalLockRequire()) {
              processLocalCommitWithGlobalLocks();
          } else {
              targetConnection.commit();
          }
      }
  	
  	private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
}

事务管理器协作

spring+seata

整合方式:

  • 通过SeataAutoDataSourceProxyCreator这个BeanPostProcessor将数据源替换为seata的DatasourceProxy
  • 通过 注册GlobalTransactionScanner,处理GlobalTransactional注解
  • 通过各框架提供的机制在远程调用时传递全局事务id

使用示例:

//TM1:未操作数据库
@GlobalTransactional
public void test(){
  test1();
  test2();
}

//TM2:操作数据库
@GlobalTransactional
public void test(){
  testLocal1();
  testLocal2();
  test1();
  test2();
}

//TM2:本地事务1
@Transactional
public void testLocal1(){
  
}

//TM2:本地事务2
@Transactional
public void testLocal2(){
  
}

//RM1
@Transactional
public void test1();

//RM1
@Transactional
public void test2();

实现方式:

seata事务管理器负责全局事务的处理

spring事务管理器使用seata代理数据源和代理连接在处理本地事务的同时执行了分支事务的处理

调用模式:

//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
  	//执行SQL
    executeSQLWithSameConnnection(){
      ConnectionProxy.executeSQL(){
        //seata enhance
        MysqlConnection.executeSQL()
      }
    }
  	//提交事务
    transactionManager.commit(status){
      ConnectionProxy.commit(){
        //seata enhance
        MysqlConnection.commit()
      }
    }
} catch (Exception e) {
    //回滚事务
    transactionManager.rollback(status){
      ConnectionProxy.rollback(){
        //seata enhance
        MysqlConnection.rollback()
      }
    }
}

spring+sharding-sphere

整合方式:

  • 最新版本通过自定义JDBC协议实现自定义JDBC驱动以简化与spring整合的复杂度

    # 配置 DataSource Driver
    spring.datasource.driver-class-name=org.apache.shardingsphere.driver.ShardingSphereDriver
    # 指定 YAML 配置文件
    spring.datasource.url=jdbc:shardingsphere:classpath:xxx.yaml
    
  • 之前版本通过stater整合

使用示例:

@Transactional
//方式一:注解式
@ShardingTransactionType(TransactionType.LOCAL)
public void test(){
  //方式二:编程式
  //TransactionTypeHolder.set(TransactionType.LOCAL);
  test1();
  test2();
}

@Transactional
public void test1();

@Transactional
public void test2();

实现方式:

spring负责本地事务的创建,提交和回滚

Sharding-sphere通过SPI机制提供的事务管理器实现Connection的事务能力

Local:本地事务,由mysql connection提供事务实现

XA:XA事务,由第三方XA框架提供XA事务能力

BASE:柔性事务,目前由seata提供分布式事务能力

调用模式:

//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
  	//执行SQL
    executeSQLWithSameConnnection(){
      ShardingConnection.executeSQL(){
        //sharding enhance
        MysqlConnection.executeSQL()
      }
    }
  	//提交事务
    transactionManager.commit(status){
      ShardingConnection.commit(){
        //sharding enhance
        MysqlConnection.commit()
      }
    }
} catch (Exception e) {
    //回滚事务
    transactionManager.rollback(status){
      ShardingConnection.rollback(){
        //sharding enhance
        MysqlConnection.rollback()
      }
    }
}

spring+sharding-sphere+seata

整合方式:

  1. 关闭seata数据源自动代理功能
  2. 引入sharding-sphere BASE事务管理seata spi包
  3. 设置使用sharding-sphere BASE模式

使用示例:

//TM1:未操作数据库
@GlobalTransactional
public void test(){
  test1();
  test2();
}

//RM1:sharding-sphere,seata
@Transactional
@ShardingTransactionType(TransactionType.BASE)
public void test1();

//RM2:sharding-sphere,seata
@Transactional
@ShardingTransactionType(TransactionType.BASE)
public void test2();

调用模式:

//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
  	//执行SQL
    executeSQLWithSameConnnection(){
      ShardingConnection.executeSQL(){
        //sharding enhance
        ShardingConnection.createConnection(){
          seataConnection = shardingTransactionManager.getConnection()
        }
        seataConnection.executeSQL(){
          //seata enhance
        	MysqlConnection.executeSQL()
        }
      }
    }
  	//提交事务
    transactionManager.commit(status){
      ShardingConnection.commit(){
        //sharding enhance
        seataConnection.commit(){
          //seata enhance
        	MysqlConnection.commit()
        }
      }
    }
} catch (Exception e) {
    //回滚事务
    transactionManager.rollback(status){
      ShardingConnection.rollback(){
        //sharding enhance
        seataConnection.rollback(){
          //seata enhance
        	MysqlConnection.rollback()
        }
      }
    }
}

sharding-sphere通过事务管理器SPI获取到seata的connection实现ConnectionProxy,使用ConnectionProxy实现分布式事务功能

Q:为什么不能使用seata数据源代理进行整合?

必须先执行sharding-sphere功能后才能获取到执行的真实SQL,必须由sharding-sphere调用seata.

参考资料:

详解Spring的事务管理PlatformTransactionManager