Seata框架底层源码简单分析

Seata框架底层源码简单分析,第1张

Seata框架底层源码简单分析 Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
  • TC:事务管理器
  • TM:事务开始服务
  • RM:事务参与过程的其他服务

底层实现流程

  • TM和RM都连接到我们的事务协调器TC。
  • TM和RM服务的数据源都被Seata代理,执行语句的前后会保存两条记录,一条是执行前的记录,一条是执行后的记录,是方便后期可以逆向的生成sql去回滚事务。具体记录存放在seata的undo_log表中。
  • TM在使用RPC-Feign远程调用的时候,在ThreadLocal中获取xid。
  • RM在请求头中取到该xid,设置到threadLocal中,同时向seata注册本地事务。
  • TM将当前本地事务的结果发送给TC,TC最后通知所有分支事务是提交或回滚。
  • TM如果调用接口成功之后再抛出异常时,告诉协调者TC,协调者TC再通知到所有分支的事务,根据undo_log逆向回滚事务。
  • 最后如果没有任何异常,TM通知TC,TC最后让所有的有该全局xid的undo_log中的记录都删除。
Seata和LCN的回滚之间有什么区别?

基本实现思路一样,唯一区别在于回滚方式,LCN采用代理数据源加关闭连接,暂时不支持提交本地事务,容易造成数据的死锁。
Seata采用undo_log的形式逆向生成sql,去实现回滚。

Seata生成全局ID源码部分

核心注解在于@GlobalTransactional
引入seata核心依赖


    com.alibaba.cloud
    >spring-cloud-starter-alibaba-seata
    2.1.1.RELEASE

先看到spring.factories配置文件
看到com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration,
这个类
点进去,可以看到,这就是初始化seata的一个配置类。
并且注入全局事务扫描器,如下代码

	@Bean
	public GlobalTransactionScanner globalTransactionScanner() {

		String applicationName = applicationContext.getEnvironment()
				.getProperty("spring.application.name");

		String txServiceGroup = seataProperties.getTxServiceGroup();

		if (StringUtils.isEmpty(txServiceGroup)) {
			txServiceGroup = applicationName + "-seata-service-group";
			seataProperties.setTxServiceGroup(txServiceGroup);
		}

		return new GlobalTransactionScanner(applicationName, txServiceGroup);
	}

点到全局事务扫描类中
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean
可以看到全局事务扫描类继承/实现了以下的接口,那么分别是做啥用的呢?
首先我们根据SpringBean和Aop的源码不难发现
AbstractAutoProxyCreator就是Aop原生的一个代理类
而InitializingBean属于Spring的Bean中的一个注解,执行动作在Bean实例化之后执行。
因此对于这个接口的回调方法中,就实现初始化的动作
源码如下

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        initClient();

    }

    private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(
                "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
        }
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(
                "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
                    + txServiceGroup + "]");
        }
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(
                "Resource Manager is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup
                    + "]");
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        registerSpringShutdownHook();

    }

然后看到可以发现这两行代码

//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);

可以看到,在初始化方法中,就实现了初始化RM、TM然后,注册到TC中。
然后AbstractAutoProxyCreator这个抽象模板又是拿来干啥的呢?
对比下抽象类和这个实现类可以看到,主要是
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey)
重写了父类的这个回调方法。

看到实现类这个方法,
其作用就是,加上@GlobalTransactional这个注解后,当aop创建代理对象的时候,会走这个回调方法,去创建出GlobalTransactionalInterceptor对象。
点进去这个对象发现,哎,实现了MethodInterceptor接口,说明其采用CGLIB代理模式,进行代理,从而进行方法的反射。
Invoke()中就是其会执行的反射方法
具体源码如下

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

其中获取到两个注解

  • GlobalTransactional,判断你方法上有没有加上这个全局事务的注解
  • GlobalLock,作为分布式锁

因此直接点到handleGlobalTransaction()方法中
查看return transactionalTemplate.execute()这个execute方法
具体源码如下

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

先看第一行代码

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
点进去看一下
    private static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }

这里,也就是获取到我们一个分布式事务协调器的一个全局id,那是从哪里获取来的呢?
继续点到getXid()
一直点到最后

@LoadLevel(name = "ThreadLocalContextCore", order = Integer.MIN_VALUE)
public class ThreadLocalContextCore implements ContextCore {

    private ThreadLocal> threadLocal = new ThreadLocal>() {
        @Override
        protected Map initialValue() {
            return new HashMap();
        }

    };

    @Override
    public String put(String key, String value) {
        return threadLocal.get().put(key, value);
    }

    @Override
    public String get(String key) {
        return threadLocal.get().get(key);
    }

    @Override
    public String remove(String key) {
        return threadLocal.get().remove(key);
    }

    @Override
    public Map entries() {
        return threadLocal.get();
    }
}

发现,哎,原来是从threadLocal里面获取到这样的一个全局id。
那么如何创建的呢
往回看到

public static GlobalTransaction getCurrentOrCreate() {
    GlobalTransaction tx = getCurrent();
    if (tx == null) {
        return createNew();
    }
    return tx;
}
//如果第一次为空时,就走createNew()这个方法去创建,再点到构造方法
 DefaultGlobalTransaction() {
     this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
 }

所以此时并不会去设置xid,而是先置空,等待我们的协调器TC,给我们分发全局XID。

代码继续往下看
TransactionInfo txInfo = business.getTransactionInfo();
这里是拿到我们具体事务的某个方法。

继续往下到
beginTransaction(txInfo, tx);
这个方法中,具体源码如下

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }

首先tx.begin(txInfo.getTimeOut(), txInfo.getName());为开启事务。
其begin方法如下

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            check();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [" + xid + "]");
        }

    }

核心创建xid逻辑为

xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);

再次之前,必须判断到threadLocal里面的xid非空,不然就直接报错。
然后再连接到TC,去创建xid。
这里面begin源码在点进去看

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

然后发现,在这里就实现到了,发送请求到TC,去请求获取到xid。
最后请求成功拿到xid后通过
RootContext.bind(xid);把XID设置到threadLocal里面。
到这里就完成了Seata的获取xid过程。

Seata记录执行过程前后源码

实现类在io.seata.rm.datasource.exec. AbstractDMLbaseExecutor
看到doExecute方法

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

点到executeAutoCommitFalse()方法中

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

beforeImage和afterImage也就是生成前置和后置镜像,用于反向生成sql语句。
在executeAutoCommitTrue()方法中把自动提交事务设置为了false。因此此时事务不会立马提交。防止事务成功,但是undo_log日志的事务失败,必须保证这两者的数据最终一致性,所以事务不能立马提交,而是先设置为False。
具体源码如下

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.setAutoCommit(false);
            return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            ((ConnectionProxy) connectionProxy).getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

此时设置False,然后在去执行executeAutoCommitFalse()方法,去把undo_log内容记录,完成最后才执行:connectionProxy.commit();去提交事务。

            connectionProxy.setAutoCommit(false);
            return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });

然后看到executeAutoCommitFalse方法中的prepareUndolog方法

    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
            return;
        }

        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETe ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }

在SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);这里就是记录到undo_log表中事务信息的动作。

因此到现在seata就完成了,前置后置副本在undo_log表中数据的记录。

Seata底层逆向生成sql回滚事务源码

如果发起全局事务的服务没有报错的情况下,通知TC协调器,把undo_log这条记录删除,如果失败的话,则查询本地数据库undo_log表,进行逆向生成sql回滚事务。
具体方法在DataSourceManager#branchRollback()方法中

执行成功,删除undolog日志,源码位置
io.seata.rm.datasource.undo. AbstractUndoLogManager # batchDeleteUndoLog()

    @Override
    public void batchDeleteUndoLog(Set xids, Set branchIds, Connection conn) throws SQLException {
        if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
            return;
        }
        int xidSize = xids.size();
        int branchIdSize = branchIds.size();
        String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
        PreparedStatement deletePST = null;
        try {
            deletePST = conn.prepareStatement(batchDeleteSql);
            int paramsIndex = 1;
            for (Long branchId : branchIds) {
                deletePST.setLong(paramsIndex++,branchId);
            }
            for (String xid: xids){
                deletePST.setString(paramsIndex++, xid);
            }
            int deleteRows = deletePST.executeUpdate();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("batch delete undo log size " + deleteRows);
            }
        }catch (Exception e){
            if (!(e instanceof SQLException)) {
                e = new SQLException(e);
            }
            throw (SQLException) e;
        } finally {
            if (deletePST != null) {
                deletePST.close();
            }
        }

    }

String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
这里,就是生成删除undo_log的sql语句

具体实现如下

    protected static String toBatchDeleteUndoLogSql(int xidSize, int branchIdSize) {
        StringBuilder sqlBuilder = new StringBuilder(64);
        sqlBuilder.append("DELETE FROM ")
                .append(UNDO_LOG_TABLE_NAME)
                .append(" WHERe  " + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " IN ");
        appendInParam(branchIdSize, sqlBuilder);
        sqlBuilder.append(" AND " + ClientTableColumnsName.UNDO_LOG_XID + " IN ");
        appendInParam(xidSize, sqlBuilder);
        return sqlBuilder.toString();
    }

这就是服务没有发生报错,数据正常情况,删除undo_log表的记录

如果报错的情况,回滚逻辑源码如下

   protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
       throws TransactionException {
       String xid = request.getXid();
       long branchId = request.getBranchId();
       String resourceId = request.getResourceId();
       String applicationData = request.getApplicationData();
       if (LOGGER.isInfoEnabled()) {
           LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
       }
       BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
           applicationData);
       response.setXid(xid);
       response.setBranchId(branchId);
       response.setBranchStatus(status);
       if (LOGGER.isInfoEnabled()) {
           LOGGER.info("Branch Rollbacked result: " + status);
       }
   }

具体实现是在:

BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);

找到DataSourceManager实现类:io.seata.rm.datasource.undo. AbstractUndoLogManager # batchDeleteUndoLog()
源码如下

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("branchRollback failed reason [{}]", te.getMessage());
            }
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

这里最主要看到

UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);

核心回滚是在undo()方法
先看到
查询方法

selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);

具体值定义如下

    protected static final String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME +
            " WHERe " + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " = ? AND " + ClientTableColumnsName.UNDO_LOG_XID + " = ? FOR UPDATE";

回滚的具体源码如下

                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            Tablemeta tablemeta = TablemetaCacheFactory.getTablemetaCache(dataSourceProxy).getTablemeta(dataSourceProxy, sqlUndoLog.getTableName());
                            sqlUndoLog.setTablemeta(tablemeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                    dataSourceProxy.getDbType(),
                                    sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }

找到sqlUndoLogs列表,然后做遍历,进行逆向生成SQL回滚事务
其中,逆向回滚源码如下

    public void executeOn(Connection conn) throws SQLException {

        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }

        try {
            String undoSQL = buildUndoSQL();

            PreparedStatement undoPST = conn.prepareStatement(undoSQL);

            TableRecords undoRows = getUndoRows();

            for (Row undoRow : undoRows.getRows()) {
                ArrayList undoValues = new ArrayList<>();
                Field pkValue = null;
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() == KeyType.PrimaryKey) {
                        pkValue = field;
                    } else {
                        undoValues.add(field);
                    }
                }

                undoPrepare(undoPST, undoValues, pkValue);

                undoPST.executeUpdate();
            }

        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }

        }

    }

看到这个代码

String undoSQL = buildUndoSQL();

这里生成出来的字符串就是逆向生成的SQL语句,如果之前是insert语句,那么这里就是delete语句。
所以这就是Seata逆向回滚的大致原理。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5697129.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存