常见事务管理器综述
常见事务管理器综述
事务管理器功能
- 开启事务
- 提交事务
- 回滚事务
- 管理多个关联事务
spring事务管理器设计
事务管理器接口定义为PlatformTransactionManager
,默认实现为DataSourceTransactionManager
,通过Connection
接口提供的功能实现事务相关功能的管理,其本身不具备事务功能
主要功能:
- 负责事务的创建、提交、回滚
- 负责管理关联事务的事务传播
实现原理:
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
//执行SQL
executeSQLWithSameConnnection();
//提交事务
transactionManager.commit(status);
} catch (Exception e) {
//回滚事务
transactionManager.rollback(status);
}
sharding-sphere事务管理器设计
sharding-sphere事务管理器(STM)与spring事务管理器(PTM)功能有所不同,STM负责为Connection
提供事务相关功能,其本身并不会负责事务的创建、提交和回滚.
实现原理:
public final class ShardingConnection extends AbstractConnectionAdapter {
@Override
public void setAutoCommit(final boolean autoCommit) throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.setAutoCommit(autoCommit);
return;
}
if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {
return;
}
if (autoCommit && shardingTransactionManager.isInTransaction()) {
shardingTransactionManager.commit();
return;
}
if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
closeCachedConnections();
shardingTransactionManager.begin();
}
}
@Override
public void commit() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.commit();
} else {
shardingTransactionManager.commit();
}
}
@Override
public void rollback() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.rollback();
} else {
shardingTransactionManager.rollback();
}
}
}
STM通过SPI机制可以实现不同的事务功能
LOCAL:包装本地事务(Connection)
XA:包装XA事务管理器提供的事务功能
BASE:包装其他事务管理器(如seata)提供的事务功能
seata事务管理器
全局事务管理器:
负责全局事务的创建,存在于TM服务中,一般由@GlobalTransactional
注解开启
全局事务与Connection是否存在无关
实现原理:
类似@Transactional
声明式本地事务管理方式
参考代码:
声明式:
io.seata.tm.api.DefaultGlobalTransaction
io.seata.tm.DefaultTransactionManager
io.seata.spring.annotation.GlobalTransactionalInterceptor
手动事务:
io.seata.tm.api.TransactionalTemplate
分支事务管理:
不论本地事务创建在TM还是在RM,每一个本地事务均会创建一个分支事务
分支事务通过DataSourceProxy
,ConnectionProxy
,StatementProxy
,PreparedStatementProxy
等实现代理JDBC标准接口实现本地事务执行过程中分支事务的创建、提交、回滚
实现原理:
//通过targetConnection实现真正的事务功能
public class ConnectionProxy extends AbstractConnectionProxy {
@Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
}
事务管理器协作
spring+seata
整合方式:
- 通过
SeataAutoDataSourceProxyCreator
这个BeanPostProcessor将数据源替换为seata的DatasourceProxy - 通过 注册
GlobalTransactionScanner
,处理GlobalTransactional
注解 - 通过各框架提供的机制在远程调用时传递全局事务id
使用示例:
//TM1:未操作数据库
@GlobalTransactional
public void test(){
test1();
test2();
}
//TM2:操作数据库
@GlobalTransactional
public void test(){
testLocal1();
testLocal2();
test1();
test2();
}
//TM2:本地事务1
@Transactional
public void testLocal1(){
}
//TM2:本地事务2
@Transactional
public void testLocal2(){
}
//RM1
@Transactional
public void test1();
//RM1
@Transactional
public void test2();
实现方式:
seata事务管理器负责全局事务的处理
spring事务管理器使用seata代理数据源和代理连接在处理本地事务的同时执行了分支事务的处理
调用模式:
//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
//执行SQL
executeSQLWithSameConnnection(){
ConnectionProxy.executeSQL(){
//seata enhance
MysqlConnection.executeSQL()
}
}
//提交事务
transactionManager.commit(status){
ConnectionProxy.commit(){
//seata enhance
MysqlConnection.commit()
}
}
} catch (Exception e) {
//回滚事务
transactionManager.rollback(status){
ConnectionProxy.rollback(){
//seata enhance
MysqlConnection.rollback()
}
}
}
spring+sharding-sphere
整合方式:
最新版本通过自定义JDBC协议实现自定义JDBC驱动以简化与spring整合的复杂度
# 配置 DataSource Driver spring.datasource.driver-class-name=org.apache.shardingsphere.driver.ShardingSphereDriver # 指定 YAML 配置文件 spring.datasource.url=jdbc:shardingsphere:classpath:xxx.yaml
之前版本通过stater整合
使用示例:
@Transactional
//方式一:注解式
@ShardingTransactionType(TransactionType.LOCAL)
public void test(){
//方式二:编程式
//TransactionTypeHolder.set(TransactionType.LOCAL);
test1();
test2();
}
@Transactional
public void test1();
@Transactional
public void test2();
实现方式:
spring负责本地事务的创建,提交和回滚
Sharding-sphere通过SPI机制提供的事务管理器实现Connection的事务能力
Local:本地事务,由mysql connection提供事务实现
XA:XA事务,由第三方XA框架提供XA事务能力
BASE:柔性事务,目前由seata提供分布式事务能力
调用模式:
//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
//执行SQL
executeSQLWithSameConnnection(){
ShardingConnection.executeSQL(){
//sharding enhance
MysqlConnection.executeSQL()
}
}
//提交事务
transactionManager.commit(status){
ShardingConnection.commit(){
//sharding enhance
MysqlConnection.commit()
}
}
} catch (Exception e) {
//回滚事务
transactionManager.rollback(status){
ShardingConnection.rollback(){
//sharding enhance
MysqlConnection.rollback()
}
}
}
spring+sharding-sphere+seata
整合方式:
- 关闭seata数据源自动代理功能
- 引入sharding-sphere BASE事务管理seata spi包
- 设置使用sharding-sphere BASE模式
使用示例:
//TM1:未操作数据库
@GlobalTransactional
public void test(){
test1();
test2();
}
//RM1:sharding-sphere,seata
@Transactional
@ShardingTransactionType(TransactionType.BASE)
public void test1();
//RM2:sharding-sphere,seata
@Transactional
@ShardingTransactionType(TransactionType.BASE)
public void test2();
调用模式:
//约定methodA(){methodB()}表示methodA内部会调用methodB
// 开启事务
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
//执行SQL
executeSQLWithSameConnnection(){
ShardingConnection.executeSQL(){
//sharding enhance
ShardingConnection.createConnection(){
seataConnection = shardingTransactionManager.getConnection()
}
seataConnection.executeSQL(){
//seata enhance
MysqlConnection.executeSQL()
}
}
}
//提交事务
transactionManager.commit(status){
ShardingConnection.commit(){
//sharding enhance
seataConnection.commit(){
//seata enhance
MysqlConnection.commit()
}
}
}
} catch (Exception e) {
//回滚事务
transactionManager.rollback(status){
ShardingConnection.rollback(){
//sharding enhance
seataConnection.rollback(){
//seata enhance
MysqlConnection.rollback()
}
}
}
}
sharding-sphere通过事务管理器SPI获取到seata的connection实现ConnectionProxy,使用ConnectionProxy实现分布式事务功能
Q:为什么不能使用seata数据源代理进行整合?
必须先执行sharding-sphere功能后才能获取到执行的真实SQL,必须由sharding-sphere调用seata.
参考资料: