Seata框架源码分析——AT模式

Seata框架源码分析——AT模式,第1张

Seata框架源码分析——AT模式 Seata框架介绍

Seata(Simple Extensible Autonomous Transaction Architecture) 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务 0 侵入的方式,解决微服务场景下面临的分布式事务问题。

对于分布式事务和Seata框架本身的介绍本文就不再多赘述了,想了解更多Seata框架的细节,建议阅读Seata中文文档(相当详细和易懂):http://seata.io/zh-cn/docs/overview/what-is-seata.html

AT模式介绍

说到AT模式,就不得不说AT模式中的三大角色:


此图又是一个典型的分布式订单流程图解,其中大致的意思:TC属于服务器端,控制着整个订单流程的命脉,也是每个环节执行动作的监听者。TM和RM则是客户端,RM属于本地资源管理器,控制着自己本地的事务动作,并将事务执行结果告知TC。而TM则是定义了事务执行的范围,它的存在是为了统一多个本地事务的提交/回滚 *** 作。

TC:维护全局和分支事务的状态,驱动全局事务提交或回滚。TM:定义全局事务的范围,开始全局事务、提交或回滚全局事务。RM:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

除了以上三大重要角色,还有核心的流程控制:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。(执行用户SQL释放资源,生成快照)二阶段:提交异步化,快速完成。回滚通过一阶段的回滚日志进行反向补偿。 源码分析

以上内容便是针对Seata框架中的AT模式进行大致的介绍,接下来为了了解更多的细节,开始本文的重点——源码分析。

而Seata这一框架也很好的融入进Spring体系,因此在初始化时,Seata同样是通过扫描bean的方式进行初始化,其中来关注下重点类GlobalTransactionScanner:

扫描并初始化客户端:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { // 此处实现了Spring体系中初始化bean的接口
	...
	@Override
    public void afterPropertiesSet() {
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
        if (disableGlobalTransaction) { // 判断是否允许使用全局事务
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        if (initialized.compareAndSet(false, true)) { // 切换初始化状态,防止重复初始化
            initClient(); // 初始化客户端
        }
    }
	...
}

从以下初始化客户端的方法中可以明显看到,其中初始化了TM、RM客户端:

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    // init TM
    // 初始化TM客户端(applicationId、txServiceGroup由配置文件定义)
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    // init RM
    // 初始化RM客户端(applicationId、txServiceGroup由配置文件定义)
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();
}

初始化TM客户端:

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey); // 获取TM客户端实例
    tmNettyRemotingClient.init(); // 客户端实例初始化
}

获取TM客户端实例:

public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    TmNettyRemotingClient tmRpcClient = getInstance(); // 获取实例
    // 设置客户端信息
    tmRpcClient.setApplicationId(applicationId);
    tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
    tmRpcClient.setAccessKey(accessKey);
    tmRpcClient.setSecretKey(secretKey);
    return tmRpcClient;
}

以下获取客户端实例方法,可以看到使用了线程安全的获取单例方式。并从中可知,所说的客户端其实是基于Netty实现的:

public static TmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (TmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                    nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                    KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                    new linkedBlockingQueue<>(MAX_QUEUE_SIZE),
                    new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                        nettyClientConfig.getClientWorkerThreads()),
                    RejectedPolicies.runsOldestTaskPolicy());
                instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}

了解完如何获取TM客户端的实例后,回到下一步,初始化TM客户端:

@Override
public void init() {
    registerProcessor(); // 注册处理者
    if (initialized.compareAndSet(false, true)) {
        super.init(); // 使用父类方法初始化
    }
}

注册客户端对应的结果处理者:

private void registerProcessor() {
    // 注册TC响应结果处理者(根据TC的响应码找到对应的处理者,进行下一步处理)
    ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    // 注册心跳检测处理者(因为TM和TC的连接需要保证畅通,因此需要不断重发心跳信息检测通路)
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

调用父类初始化方法,其中可以看到,利用了计划任务线程池timerExecutor按照指定相隔时间不断的尝试发送TC重连消息,保证TM与TC的正常通路。

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup()); // 不断重连,保证通路
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new linkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

不断尝试重连,保持通路:

void reconnect(String transactionServiceGroup) {
    List availList = null;
    try {
        availList = getAvailServerList(transactionServiceGroup); // 根据配置定义的事务组名获取合法TC网络地址
    } catch (Exception e) {
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        return;
    }
    if (CollectionUtils.isEmpty(availList)) { // 网络地址为空,说明配置信息有误,直接返回,无需进行连接
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);
        if (StringUtils.isBlank(clusterName)) {
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                    ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                    transactionServiceGroup);
            return;
        }
        if (!(registryService instanceof FileRegistryServiceImpl)) {
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        return;
    }
    for (String serverAddress : availList) { // 遍历所有合法网络地址
        try {
            acquireChannel(serverAddress); // 请求连接
        } catch (Exception e) {
            LOGGER.error("{} can not connect to {} cause:{}",frameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
        }
    }
}

请求连接TC网络地址:

Channel acquireChannel(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress); // 根据网络地址获取网络通路
    if (channelToServer != null) { // 网络通路不为空,说明已连接
        // 再判断当前通路是否可用,可用直接返回,不可用再重新进行连接
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("will connect to " + serverAddress);
    }
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) { // 同步方式,连接TC地址
        return doConnect(serverAddress);
    }
}
private Channel doConnect(String serverAddress) {
    // 该网络地址的通路已存在,且可用,则直接返回
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null && channelToServer.isActive()) { 
        return channelToServer;
    }
    // 连接网络地址并创建网络通路
    Channel channelFromPool;
    try {
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool); // 连接成功,并加入channels作为连接记录,方便下次重连查询
    } catch (Exception exx) {
        LOGGER.error("{} register RM failed.",frameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new frameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

以上就是TM客户端的初始化逻辑。

初始化RM客户端:

RM客户端的初始化其实与TM客户端的初始化逻辑大体一致,因此接下来在源码中遇到相同的逻辑,便不再赘述了

public static void init(String applicationId, String transactionServiceGroup) {
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); // 获取RM客户端示例
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); // 设置资源管理器
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); // 设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求(重点,在之后的源码分析会重点讲解)
    rmNettyRemotingClient.init(); // 客户端实例初始化
}
@Override
public void init() {
    registerProcessor(); // 注册请求响应处理者
    if (initialized.compareAndSet(false, true)) {
        super.init(); // 调用父类方法初始化
        // Found one or more resources that were registered before initialization
        if (resourceManager != null
                && !resourceManager.getManagedResources().isEmpty()
                && StringUtils.isNotBlank(transactionServiceGroup)) {
            getClientChannelManager().reconnect(transactionServiceGroup);
        }
    }
}

以上便是TM、RM客户端初始化,以及连接Seata服务端的整体流程。

一阶段提交:

TM、RM已初始化完成后,下一步则是分布式事务的真正核心流程。如果你使用过Seata框架的AT模式,会发现在使用层面上的成本非常小,只需要在总的分布式事务执行入口方法加上注解@GlobalTransactional。有注解,相对应的就会有拦截器,接下来重点关注GlobalTransactionalInterceptor类的invoke拦截方法:

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 获取拦截方法上的@GlobalTransactional注解
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) { // 判断本地是否允许开启分布式事务
            if (globalTransactionalAnnotation != null) { // 注解不为空,则说明拦截的方法上有@GlobalTransactional注解,开启分布式事务
                return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
            } else if (globalLockAnnotation != null) {
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    return methodInvocation.proceed(); // 本地不允许开启分布式事务/拦截方法上没有@GlobalTransactional注解,则按原方法执行
}
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        // 定义事务执行对象,并通过调用事务模板对象开启执行事务
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed(); // 执行原方法
            }

            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod()); // 格式化原方法名,不重要
            }

            @Override
            public TransactionInfo getTransactionInfo() {
                // 若注解上有定义的事务执行合法超时时间,则使用定义的超时时间,否则使用默认值
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }
                // 以下内容都是添加使用者自定义的属性值,最后返回相关的事务信息,很简单,不过多描述
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                Set rollbackRules = new linkedHashSet<>();
                for (Class rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        // 事务执行失败
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                succeed = false;
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                succeed = false;
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
        }
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}

从上面的handleGlobalTransaction方法中可以看到,其中主要是定义了事务执行器的各种方法,而真正开启并执行事务的动作,还是通过transactionalTemplate.execute方法:

public Object execute(TransactionalExecutor business) throws Throwable {
    TransactionInfo txInfo = business.getTransactionInfo(); // 获取事务相关信息
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 获取当前事务对象(主要是通过全局事务XID进行判断)
    Propagation propagation = txInfo.getPropagation(); // 获取事务信息中的事务传播级别
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 针对各个事务级别做特殊处理,在此不多讲,继续往下看
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        if (tx == null) { // 当前事务对象为空,说明当前未开启事务,新建事务对象
            tx = GlobalTransactionContext.createNew();
        }
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
        try {
            beginTransaction(txInfo, tx); // 开启事务(重点)
            Object rs;
            try {
                rs = business.execute(); // 执行原方法
            } catch (Throwable ex) {
                completeTransactionAfterThrowing(txInfo, tx, ex); // 原方法执行异常,回滚(重点)
                throw ex;
            }
            commitTransaction(tx); // 原方法执行无异常,提交(重点)
            return rs;
        } finally {
            // clear
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

开启事务:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin(); // 执行前置钩子函数,做些开启事务前的动作
        tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 开启事务
        triggerAfterBegin(); // 执行后置钩子函数,做些开启事务后的动作
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
    }
}
@Override
public void begin(int timeout, String name) throws TransactionException {
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists can't begin a new global transaction, currentXid = " + currentXid);
    }
    xid = transactionManager.begin(null, null, name, timeout); // 开启事务,通知TC,并返回全局事务XID
    status = GlobalStatus.Begin; // 设置事务状态为已开启
    RootContext.bind(xid); // 为事务上下文绑定全局事务XID
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [{}]", xid);
    }
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); // 发送同步请求,返回XID
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

以上便是开启事务的源码逻辑,而之后的提交/回滚其实也是同样的 *** 作,封装请求发送,返回响应码,这部分逻辑都是一致,再次不做赘述了。

但是,到此一阶段的核心才刚刚开始。对于Seata的AT模式,它是依赖于底层数据的事务驱动,它之所以能够做到本地分支事务的回滚,是因为它对原本的数据源加了一层代理,在代理中生成能够执行回滚的SQL语句,因此才能让回滚的数据恢复。而Seata所代理的类分别是:


其中我们需要重点关注的是SQL执行过程中的SQL解析部分,也就是Statement相关的代理类。此处选择最常用的PreparedStatementProxy.execute方法进行源码分析(其他方法最终入口都是一致的):

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
	...
    @Override
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); // 调用ExecuteTemplate.execute通用类方法解析执行SQL
    }
    ...
}
public static  T execute(StatementProxy statementProxy,
                                                 StatementCallback statementCallback,
                                                 Object... args) throws SQLException {
    return execute(null, statementProxy, statementCallback, args);
}


public static  T execute(List sqlRecognizers,
                                                 StatementProxy statementProxy,
                                                 StatementCallback statementCallback,
                                                 Object... args) throws SQLException {
    if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // 获取事务全局锁,以及判断当前分支事务类型是否是AT模式
        return statementCallback.execute(statementProxy.getTargetStatement(), args); // 获取锁失败,以及当前分支事务类型不是AT模式,则按原statement执行SQL
    }

    String dbType = statementProxy.getConnectionProxy().getDbType();
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        // 根据SQL和数据类型获取SQL识别器(为了接下来的SQL解析,不同的数据库可能解析出来的SQL不同,因此需要在此作区分)
        sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
    }
    Executor executor;
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        executor = new PlainExecutor<>(statementProxy, statementCallback);
    } else {
        if (sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            // 区分SQL类型,获取不同的SQL执行器
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                            new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                            new Object[]{statementProxy, statementCallback, sqlRecognizer});
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<>(statementProxy, statementCallback);
                    break;
            }
        } else {
            executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
        }
    }
    T rs;
    try {
        rs = executor.execute(args); // 执行SQL
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException) ex;
    }
    return rs;
}

调用SQL执行器解析并执行SQL:

@Override
public T execute(Object... args) throws Throwable {
	// 为当前数据库代理连接绑定全局事务id
    String xid = RootContext.getXID();
    if (xid != null) {
        statementProxy.getConnectionProxy().bind(xid); 
    }
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    return doExecute(args);
}

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) { // 判断事务是否自动提交,调用不同的执行方法
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

doExecute方法中可以看到两个方法,一个是属于事务自动提交的,另一个是属于事务手动提交的。这两个方法有何差异,先来看下自动提交的executeAutoCommitTrue:

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.changeAutoCommit(); // 将自动提交改为false
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args); // 调用非自动提交的事务方法,解析执行SQL
            connectionProxy.commit(); // 执行完后手动提交事务
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true); // 执行完SQL后,将自动提交重新改为true
    }
}

从自动提交事务的方法中看到,最终还是调用了手动提交事务的方法解析执行SQL。这是因为在执行SQL之前,需要生成回滚SQL(undoSql),以及执行SQL前后的数据镜像。因此需要在自动提交的方法中,首先将自动提交改为false,执行完 *** 作后,再重置。
因此,重点又到了手动提交事务的方法executeAutoCommitFalse:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    TableRecords beforeImage = beforeImage(); // SQL执行前镜像
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // SQL执行
    TableRecords afterImage = afterImage(beforeImage); // SQL执行后镜像
    prepareUndoLog(beforeImage, afterImage); // 准备回滚日志信息,用于SQL执行异常回滚
    return result;
}

到此,一阶段的部分的源码分析就结束了,这部分主要是针对于TM、RM如何初始化连接TC服务器,以及Seata在数据库底层解析执行SQL的过程代理 *** 作源码进行了分析。

二阶段提交:

执行完业务SQL,以及UndoLog后,接着就是在ConnectionProxy中进行提交commit:

@Override
public void commit() throws SQLException {
    try {
        LOCK_RETRY_POLICY.execute(() -> {
            doCommit(); // 提交
            return null;
        });
    } catch (SQLException e) {
        if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
            rollback(); // 异常回滚
        }
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

提交:

private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit(); // 如果处于全局事务当中,则使用全局事务提交方法(重点)
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks(); // 如果处于全局事务,并使用了全局锁注解,则使用全局事务锁提交方法
    } else {
        targetConnection.commit(); // 否则正常事务提交
    }
}
private void processGlobalTransactionCommit() throws SQLException {
    try {
        register(); // 注册分支事务
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // UndoLog数据入库
        targetConnection.commit(); // 本地事务提交
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false); // 本地事务提交异常,向TC汇报分支事务提交失败
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true); // 本地事务提交成功,向TC汇报分支事务提交成功
    }
    context.reset(); // 分支事务执行完毕,数据库连接上下文信息清空
}

RM向TC注册分支事务:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
        null, context.getXid(), null, context.buildLockKeys()); // 根据本地事务信息向TC注册分支,并返回分支id
    context.setBranchId(branchId); // 将分支id绑定到数据库连接上下文
}

UndoLog入库:

@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
    ConnectionContext connectionContext = cp.getContext();
    if (!connectionContext.hasUndoLog()) {
        return;
    }
    // 根据事务信息封装UndoLog数据
    String xid = connectionContext.getXid();
    long branchId = connectionContext.getBranchId();
    BranchUndoLog branchUndoLog = new BranchUndoLog();
    branchUndoLog.setXid(xid);
    branchUndoLog.setBranchId(branchId);
    branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    UndoLogParser parser = UndoLogParserFactory.getInstance();
    byte[] undoLogContent = parser.encode(branchUndoLog);
    CompressorType compressorType = CompressorType.NONE;
    if (needCompress(undoLogContent)) {
        compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
        undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
    }
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
    }
    insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection()); // UndoLog入库
}

RM向TC汇报本地事务提交情况:

private void report(boolean commitDone) throws SQLException {
    if (context.getBranchId() == null) {
        return;
    }
    int retry = REPORT_RETRY_COUNT; // 重试次数
    while (retry > 0) {
        try {
            DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); // 向TC汇报本地分支事务提交情况
            return;
        } catch (Throwable ex) {
            LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done [" + commitDone + "] Retry Countdown: " + retry);
            retry--; // 发送失败,重试次数减一
            if (retry == 0) { // 重试次数达到最大值,抛出异常
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

到此,二阶段提交的执行逻辑似乎已经完毕。其实不然,这一步的完成,只是RM向TC汇报本地分支事务的提交情况,汇报请求是发送了,但是如何处理TC的响应呢(UndoLog的异步删除、数据回滚)?

在本文的源码分析开头部分介绍了RM客户端的初始话,在这个过程中有一步:“设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求”(向上翻看)。而这个所谓的消息回调监听器,就是用于处理TC响应:

public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);
    
    
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) { 
        BranchCommitResponse response = new BranchCommitResponse();
        exceptionHandleTemplate(new AbstractCallback() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }

    
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }
    ...
}

全局事务执行成功(也就是当前TM所调用的RM分支事务全部执行并提交成功),分支事务提交。其实分支事务在汇报给TC之前就已经将本地事务提交了,因此在此处只需要将无用的UndoLog日志删除即可:

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    }
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); // 分支事务提交(重点)
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch commit result: " + status);
    }
}

将事务信息加入异步队列,以异步的方式处理,提高效率:

 @Override
 public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
     return asyncWorker.branchCommit(xid, branchId, resourceId); // 将事务信息加入异步请求队列
 }
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
    // 将信息封装成异步队列元素,加入队列中,并返回分支状态
    Phase2Context context = new Phase2Context(xid, branchId, resourceId);
    addToCommitQueue(context);
    return BranchStatus.PhaseTwo_Committed;
}
private void addToCommitQueue(Phase2Context context) {
    if (commitQueue.offer(context)) { // 此处的commitQueue其实是阻塞队列,判断如果事务信息已经存在,则不需要加入
        return;
    }
    CompletableFuture
            .runAsync(this::doBranchCommitSafely, scheduledExecutor)
            .thenRun(() -> addToCommitQueue(context)); // 不断尝试加入阻塞队列
}

到此,只看到了将事务信息添加到异步队列中,那么阻塞队列中的事务信息如何处理?答案是在AsyncWorker的构造器中提供了计划任务线程池来处理:

public AsyncWorker(DataSourceManager dataSourceManager) {
    this.dataSourceManager = dataSourceManager;
    LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
    commitQueue = new linkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);
    ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
    scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory); // 定义计划任务线程池
    scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS); // 按照指定相隔时间,不断执行doBranchCommitSafely方法,处理事务信息
}

真正处理提交事务信息的方法入口:

void doBranchCommitSafely() {
    try {
        doBranchCommit();
    } catch (Throwable e) {
        LOGGER.error("Exception occur when doing branch commit", e);
    }
}
private void doBranchCommit() {
    if (commitQueue.isEmpty()) { // 阻塞队列为空,无需处理,直接返回
        return;
    }
    List allContexts = new linkedList<>();
    commitQueue.drainTo(allContexts); // 将阻塞队列中的事务信息转移到allContexts列表中
    Map> groupedContexts = groupedByResourceId(allContexts); // 根据TM处理范围进行事务信息分类(同一次TM调用的RM本地分支事务为一组)
    groupedContexts.forEach(this::dealWithGroupedContexts);
}
private void dealWithGroupedContexts(String resourceId, List contexts) {
    DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
    if (dataSourceProxy == null) {
        LOGGER.warn("Failed to find resource for {}", resourceId);
        return;
    }
    Connection conn;
    try {
        conn = dataSourceProxy.getPlainConnection();
    } catch (SQLException sqle) {
        LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
        return;
    }
    UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
    List> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE); // 对需要删除的UndoLog数据进行限制,每次删除一千条
    splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition)); // 分批次批量删除UndoLog
}
private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List contexts) {
    // 以下就是很常规的SQL执行 *** 作了,在此不过多讲解
    Set xids = new linkedHashSet<>(contexts.size());
    Set branchIds = new linkedHashSet<>(contexts.size());
    contexts.forEach(context -> {
        xids.add(context.xid);
        branchIds.add(context.branchId);
    });
    try {
        undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
        if (!conn.getAutoCommit()) {
            conn.commit();
        }
    } catch (SQLException e) {
        LOGGER.error("Failed to batch delete undo log", e);
        try {
            conn.rollback();
        } catch (SQLException rollbackEx) {
            LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
        }
    } finally {
        try {
            conn.close();
        } catch (SQLException closeEx) {
            LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
        }
    }
}

以上便是全局事务提交成功,分支事务异步删除UndoLog的整体逻辑。

最后,剩下还有如何处理全局事务失败,分支事务回滚:

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
    }
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); // 回滚分支事务
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch Rollbacked result: " + status);
    }
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); // 执行UndoLog,回滚数据
    } catch (TransactionException te) {
        StackTraceLogger.info(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationdata:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;
    for (; ; ) {
        try {
            conn = dataSourceProxy.getPlainConnection();
            // The entire undo process should run in a local transaction.
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); // 根据分支事务ID和全局事务ID为条件,构建查询指定UndoLog数据的SQL
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();
            boolean exists = false;
            while (rs.next()) {
                exists = true; // 在一阶段提交中UndoLog写入成功,标志存在
                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // 查看UndoLog数据的状态,确保只处理正常状态下UndoLog
                if (!canUndo(state)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                    }
                    return;
                }
                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map context = parseContext(contextString);
                byte[] rollbackInfo = getRollbackInfo(rs);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                    : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    // put serializer name to local
                    setCurrentSerializer(parser.getName());
                    List sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        Tablemeta tablemeta = TablemetaCacheFactory.getTablemetaCache(dataSourceProxy.getDbType()).getTablemeta(
                            conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                        sqlUndoLog.setTablemeta(tablemeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn); // 回滚数据
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();
                }
            }
            if (exists) { // UndoLog存在,并已成功回滚数据
                deleteUndoLog(xid, branchId, conn); // 删除UndoLog
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name());
                }
            } else { // UndoLog不存在,说明在一阶段可能写入UndoLog时发生异常,但业务SQL已执行
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); // 重新添加UndoLog
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name());
                }
            }
            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
            }
        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e);
        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (selectPST != null) {
                    selectPST.close();
                }
                if (conn != null) {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                }
            } catch (SQLException closeEx) {
                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
            }
        }
    }
}

至此,Seata框架的AT模式的源码分析到此结束。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5717626.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)