Seata(Simple Extensible Autonomous Transaction Architecture) 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务 0 侵入的方式,解决微服务场景下面临的分布式事务问题。
对于分布式事务和Seata框架本身的介绍本文就不再多赘述了,想了解更多Seata框架的细节,建议阅读Seata中文文档(相当详细和易懂):http://seata.io/zh-cn/docs/overview/what-is-seata.html
AT模式介绍说到AT模式,就不得不说AT模式中的三大角色:
此图又是一个典型的分布式订单流程图解,其中大致的意思:TC属于服务器端,控制着整个订单流程的命脉,也是每个环节执行动作的监听者。TM和RM则是客户端,RM属于本地资源管理器,控制着自己本地的事务动作,并将事务执行结果告知TC。而TM则是定义了事务执行的范围,它的存在是为了统一多个本地事务的提交/回滚 *** 作。
TC:维护全局和分支事务的状态,驱动全局事务提交或回滚。TM:定义全局事务的范围,开始全局事务、提交或回滚全局事务。RM:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
除了以上三大重要角色,还有核心的流程控制:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。(执行用户SQL释放资源,生成快照)二阶段:提交异步化,快速完成。回滚通过一阶段的回滚日志进行反向补偿。 源码分析
以上内容便是针对Seata框架中的AT模式进行大致的介绍,接下来为了了解更多的细节,开始本文的重点——源码分析。
而Seata这一框架也很好的融入进Spring体系,因此在初始化时,Seata同样是通过扫描bean的方式进行初始化,其中来关注下重点类GlobalTransactionScanner:
扫描并初始化客户端:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { // 此处实现了Spring体系中初始化bean的接口 ... @Override public void afterPropertiesSet() { ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this); if (disableGlobalTransaction) { // 判断是否允许使用全局事务 if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } if (initialized.compareAndSet(false, true)) { // 切换初始化状态,防止重复初始化 initClient(); // 初始化客户端 } } ... }
从以下初始化客户端的方法中可以明显看到,其中初始化了TM、RM客户端:
private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } // init TM // 初始化TM客户端(applicationId、txServiceGroup由配置文件定义) TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } // init RM // 初始化RM客户端(applicationId、txServiceGroup由配置文件定义) RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }
初始化TM客户端:
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) { TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey); // 获取TM客户端实例 tmNettyRemotingClient.init(); // 客户端实例初始化 }
获取TM客户端实例:
public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) { TmNettyRemotingClient tmRpcClient = getInstance(); // 获取实例 // 设置客户端信息 tmRpcClient.setApplicationId(applicationId); tmRpcClient.setTransactionServiceGroup(transactionServiceGroup); tmRpcClient.setAccessKey(accessKey); tmRpcClient.setSecretKey(secretKey); return tmRpcClient; }
以下获取客户端实例方法,可以看到使用了线程安全的获取单例方式。并从中可知,所说的客户端其实是基于Netty实现的:
public static TmNettyRemotingClient getInstance() { if (instance == null) { synchronized (TmNettyRemotingClient.class) { if (instance == null) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor( nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(), KEEP_ALIVE_TIME, TimeUnit.SECONDS, new linkedBlockingQueue<>(MAX_QUEUE_SIZE), new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()), RejectedPolicies.runsOldestTaskPolicy()); instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor); } } } return instance; }
了解完如何获取TM客户端的实例后,回到下一步,初始化TM客户端:
@Override public void init() { registerProcessor(); // 注册处理者 if (initialized.compareAndSet(false, true)) { super.init(); // 使用父类方法初始化 } }
注册客户端对应的结果处理者:
private void registerProcessor() { // 注册TC响应结果处理者(根据TC的响应码找到对应的处理者,进行下一步处理) ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null); // 注册心跳检测处理者(因为TM和TC的连接需要保证畅通,因此需要不断重发心跳信息检测通路) ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor(); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null); }
调用父类初始化方法,其中可以看到,利用了计划任务线程池timerExecutor按照指定相隔时间不断的尝试发送TC重连消息,保证TM与TC的正常通路。
@Override public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); // 不断重连,保证通路 } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); if (NettyClientConfig.isEnableClientBatchSendRequest()) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new linkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); } super.init(); clientBootstrap.start(); }
不断尝试重连,保持通路:
void reconnect(String transactionServiceGroup) { ListavailList = null; try { availList = getAvailServerList(transactionServiceGroup); // 根据配置定义的事务组名获取合法TC网络地址 } catch (Exception e) { LOGGER.error("Failed to get available servers: {}", e.getMessage(), e); return; } if (CollectionUtils.isEmpty(availList)) { // 网络地址为空,说明配置信息有误,直接返回,无需进行连接 RegistryService registryService = RegistryFactory.getInstance(); String clusterName = registryService.getServiceGroup(transactionServiceGroup); if (StringUtils.isBlank(clusterName)) { LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup); return; } if (!(registryService instanceof FileRegistryServiceImpl)) { LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName); } return; } for (String serverAddress : availList) { // 遍历所有合法网络地址 try { acquireChannel(serverAddress); // 请求连接 } catch (Exception e) { LOGGER.error("{} can not connect to {} cause:{}",frameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); } } }
请求连接TC网络地址:
Channel acquireChannel(String serverAddress) { Channel channelToServer = channels.get(serverAddress); // 根据网络地址获取网络通路 if (channelToServer != null) { // 网络通路不为空,说明已连接 // 再判断当前通路是否可用,可用直接返回,不可用再重新进行连接 channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (channelToServer != null) { return channelToServer; } } if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to " + serverAddress); } Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object()); synchronized (lockObj) { // 同步方式,连接TC地址 return doConnect(serverAddress); } }
private Channel doConnect(String serverAddress) { // 该网络地址的通路已存在,且可用,则直接返回 Channel channelToServer = channels.get(serverAddress); if (channelToServer != null && channelToServer.isActive()) { return channelToServer; } // 连接网络地址并创建网络通路 Channel channelFromPool; try { NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress); NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey); if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) { RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage(); ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds()); } channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress)); channels.put(serverAddress, channelFromPool); // 连接成功,并加入channels作为连接记录,方便下次重连查询 } catch (Exception exx) { LOGGER.error("{} register RM failed.",frameworkErrorCode.RegisterRM.getErrCode(), exx); throw new frameworkException("can not register RM,err:" + exx.getMessage()); } return channelFromPool; }
以上就是TM客户端的初始化逻辑。
初始化RM客户端:
RM客户端的初始化其实与TM客户端的初始化逻辑大体一致,因此接下来在源码中遇到相同的逻辑,便不再赘述了
public static void init(String applicationId, String transactionServiceGroup) { RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); // 获取RM客户端示例 rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); // 设置资源管理器 rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); // 设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求(重点,在之后的源码分析会重点讲解) rmNettyRemotingClient.init(); // 客户端实例初始化 }
@Override public void init() { registerProcessor(); // 注册请求响应处理者 if (initialized.compareAndSet(false, true)) { super.init(); // 调用父类方法初始化 // Found one or more resources that were registered before initialization if (resourceManager != null && !resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) { getClientChannelManager().reconnect(transactionServiceGroup); } } }
以上便是TM、RM客户端初始化,以及连接Seata服务端的整体流程。
一阶段提交:
TM、RM已初始化完成后,下一步则是分布式事务的真正核心流程。如果你使用过Seata框架的AT模式,会发现在使用层面上的成本非常小,只需要在总的分布式事务执行入口方法加上注解@GlobalTransactional。有注解,相对应的就会有拦截器,接下来重点关注GlobalTransactionalInterceptor类的invoke拦截方法:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 获取拦截方法上的@GlobalTransactional注解 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { // 判断本地是否允许开启分布式事务 if (globalTransactionalAnnotation != null) { // 注解不为空,则说明拦截的方法上有@GlobalTransactional注解,开启分布式事务 return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); // 本地不允许开启分布式事务/拦截方法上没有@GlobalTransactional注解,则按原方法执行 }
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable { boolean succeed = true; try { // 定义事务执行对象,并通过调用事务模板对象开启执行事务 return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); // 执行原方法 } public String name() { String name = globalTrxAnno.name(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); // 格式化原方法名,不重要 } @Override public TransactionInfo getTransactionInfo() { // 若注解上有定义的事务执行合法超时时间,则使用定义的超时时间,否则使用默认值 int timeout = globalTrxAnno.timeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } // 以下内容都是添加使用者自定义的属性值,最后返回相关的事务信息,很简单,不过多描述 TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); transactionInfo.setPropagation(globalTrxAnno.propagation()); transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal()); transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes()); SetrollbackRules = new linkedHashSet<>(); for (Class> rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.rollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class> rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.noRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { // 事务执行失败 TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: succeed = false; failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: succeed = false; failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); case RollbackRetrying: failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); default: throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); } } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } }
从上面的handleGlobalTransaction方法中可以看到,其中主要是定义了事务执行器的各种方法,而真正开启并执行事务的动作,还是通过transactionalTemplate.execute方法:
public Object execute(TransactionalExecutor business) throws Throwable { TransactionInfo txInfo = business.getTransactionInfo(); // 获取事务相关信息 if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 获取当前事务对象(主要是通过全局事务XID进行判断) Propagation propagation = txInfo.getPropagation(); // 获取事务信息中的事务传播级别 SuspendedResourcesHolder suspendedResourcesHolder = null; try { // 针对各个事务级别做特殊处理,在此不多讲,继续往下看 switch (propagation) { case NOT_SUPPORTED: // If transaction is existing, suspend it. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } // Execute without transaction and return. return business.execute(); case REQUIRES_NEW: // If transaction is existing, suspend it, and then begin new transaction. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } // Continue and execute with new transaction break; case SUPPORTS: // If transaction is not existing, execute without transaction. if (notExistingTransaction(tx)) { return business.execute(); } // Continue and execute with new transaction break; case REQUIRED: // If current transaction is existing, execute with current transaction, // else continue and execute with new transaction. break; case NEVER: // If transaction is existing, throw exception. if (existingTransaction(tx)) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { // Execute without transaction and return. return business.execute(); } case MANDATORY: // If transaction is not existing, throw exception. if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } // Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } if (tx == null) { // 当前事务对象为空,说明当前未开启事务,新建事务对象 tx = GlobalTransactionContext.createNew(); } GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { beginTransaction(txInfo, tx); // 开启事务(重点) Object rs; try { rs = business.execute(); // 执行原方法 } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, tx, ex); // 原方法执行异常,回滚(重点) throw ex; } commitTransaction(tx); // 原方法执行无异常,提交(重点) return rs; } finally { // clear resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { // If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } }
开启事务:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); // 执行前置钩子函数,做些开启事务前的动作 tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 开启事务 triggerAfterBegin(); // 执行后置钩子函数,做些开启事务后的动作 } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure); } }
@Override public void begin(int timeout, String name) throws TransactionException { if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); String currentXid = RootContext.getXID(); if (currentXid != null) { throw new IllegalStateException("Global transaction already exists can't begin a new global transaction, currentXid = " + currentXid); } xid = transactionManager.begin(null, null, name, timeout); // 开启事务,通知TC,并返回全局事务XID status = GlobalStatus.Begin; // 设置事务状态为已开启 RootContext.bind(xid); // 为事务上下文绑定全局事务XID if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } }
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); // 发送同步请求,返回XID if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid(); }
以上便是开启事务的源码逻辑,而之后的提交/回滚其实也是同样的 *** 作,封装请求发送,返回响应码,这部分逻辑都是一致,再次不做赘述了。
但是,到此一阶段的核心才刚刚开始。对于Seata的AT模式,它是依赖于底层数据的事务驱动,它之所以能够做到本地分支事务的回滚,是因为它对原本的数据源加了一层代理,在代理中生成能够执行回滚的SQL语句,因此才能让回滚的数据恢复。而Seata所代理的类分别是:
其中我们需要重点关注的是SQL执行过程中的SQL解析部分,也就是Statement相关的代理类。此处选择最常用的PreparedStatementProxy.execute方法进行源码分析(其他方法最终入口都是一致的):
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder { ... @Override public boolean execute() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); // 调用ExecuteTemplate.execute通用类方法解析执行SQL } ... }
public staticT execute(StatementProxy statementProxy, StatementCallbackstatementCallback, Object... args) throws SQLException { return execute(null, statementProxy, statementCallback, args); } public static T execute(List sqlRecognizers, StatementProxy statementProxy, StatementCallbackstatementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // 获取事务全局锁,以及判断当前分支事务类型是否是AT模式 return statementCallback.execute(statementProxy.getTargetStatement(), args); // 获取锁失败,以及当前分支事务类型不是AT模式,则按原statement执行SQL } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { // 根据SQL和数据类型获取SQL识别器(为了接下来的SQL解析,不同的数据库可能解析出来的SQL不同,因此需要在此作区分) sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType); } Executor executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); // 区分SQL类型,获取不同的SQL执行器 switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); // 执行SQL } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }
调用SQL执行器解析并执行SQL:
@Override public T execute(Object... args) throws Throwable { // 为当前数据库代理连接绑定全局事务id String xid = RootContext.getXID(); if (xid != null) { statementProxy.getConnectionProxy().bind(xid); } statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args); } @Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { // 判断事务是否自动提交,调用不同的执行方法 return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
doExecute方法中可以看到两个方法,一个是属于事务自动提交的,另一个是属于事务手动提交的。这两个方法有何差异,先来看下自动提交的executeAutoCommitTrue:
protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); // 将自动提交改为false return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); // 调用非自动提交的事务方法,解析执行SQL connectionProxy.commit(); // 执行完后手动提交事务 return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); // 执行完SQL后,将自动提交重新改为true } }
从自动提交事务的方法中看到,最终还是调用了手动提交事务的方法解析执行SQL。这是因为在执行SQL之前,需要生成回滚SQL(undoSql),以及执行SQL前后的数据镜像。因此需要在自动提交的方法中,首先将自动提交改为false,执行完 *** 作后,再重置。
因此,重点又到了手动提交事务的方法executeAutoCommitFalse:
protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } TableRecords beforeImage = beforeImage(); // SQL执行前镜像 T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // SQL执行 TableRecords afterImage = afterImage(beforeImage); // SQL执行后镜像 prepareUndoLog(beforeImage, afterImage); // 准备回滚日志信息,用于SQL执行异常回滚 return result; }
到此,一阶段的部分的源码分析就结束了,这部分主要是针对于TM、RM如何初始化连接TC服务器,以及Seata在数据库底层解析执行SQL的过程代理 *** 作源码进行了分析。
二阶段提交:
执行完业务SQL,以及UndoLog后,接着就是在ConnectionProxy中进行提交commit:
@Override public void commit() throws SQLException { try { LOCK_RETRY_POLICY.execute(() -> { doCommit(); // 提交 return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); // 异常回滚 } throw e; } catch (Exception e) { throw new SQLException(e); } }
提交:
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); // 如果处于全局事务当中,则使用全局事务提交方法(重点) } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); // 如果处于全局事务,并使用了全局锁注解,则使用全局事务锁提交方法 } else { targetConnection.commit(); // 否则正常事务提交 } }
private void processGlobalTransactionCommit() throws SQLException { try { register(); // 注册分支事务 } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // UndoLog数据入库 targetConnection.commit(); // 本地事务提交 } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); // 本地事务提交异常,向TC汇报分支事务提交失败 throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true); // 本地事务提交成功,向TC汇报分支事务提交成功 } context.reset(); // 分支事务执行完毕,数据库连接上下文信息清空 }
RM向TC注册分支事务:
private void register() throws TransactionException { if (!context.hasUndoLog() || !context.hasLockKey()) { return; } Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), null, context.buildLockKeys()); // 根据本地事务信息向TC注册分支,并返回分支id context.setBranchId(branchId); // 将分支id绑定到数据库连接上下文 }
UndoLog入库:
@Override public void flushUndoLogs(ConnectionProxy cp) throws SQLException { ConnectionContext connectionContext = cp.getContext(); if (!connectionContext.hasUndoLog()) { return; } // 根据事务信息封装UndoLog数据 String xid = connectionContext.getXid(); long branchId = connectionContext.getBranchId(); BranchUndoLog branchUndoLog = new BranchUndoLog(); branchUndoLog.setXid(xid); branchUndoLog.setBranchId(branchId); branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems()); UndoLogParser parser = UndoLogParserFactory.getInstance(); byte[] undoLogContent = parser.encode(branchUndoLog); CompressorType compressorType = CompressorType.NONE; if (needCompress(undoLogContent)) { compressorType = ROLLBACK_INFO_COMPRESS_TYPE; undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET)); } insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection()); // UndoLog入库 }
RM向TC汇报本地事务提交情况:
private void report(boolean commitDone) throws SQLException { if (context.getBranchId() == null) { return; } int retry = REPORT_RETRY_COUNT; // 重试次数 while (retry > 0) { try { DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); // 向TC汇报本地分支事务提交情况 return; } catch (Throwable ex) { LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done [" + commitDone + "] Retry Countdown: " + retry); retry--; // 发送失败,重试次数减一 if (retry == 0) { // 重试次数达到最大值,抛出异常 throw new SQLException("Failed to report branch status " + commitDone, ex); } } } }
到此,二阶段提交的执行逻辑似乎已经完毕。其实不然,这一步的完成,只是RM向TC汇报本地分支事务的提交情况,汇报请求是发送了,但是如何处理TC的响应呢(UndoLog的异步删除、数据回滚)?
在本文的源码分析开头部分介绍了RM客户端的初始话,在这个过程中有一步:“设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求”(向上翻看)。而这个所谓的消息回调监听器,就是用于处理TC响应:
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class); @Override public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); exceptionHandleTemplate(new AbstractCallback() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { doBranchCommit(request, response); } }, request, response); return response; } @Override public BranchRollbackResponse handle(BranchRollbackRequest request) { BranchRollbackResponse response = new BranchRollbackResponse(); exceptionHandleTemplate(new AbstractCallback () { @Override public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { doBranchRollback(request, response); } }, request, response); return response; } ... }
全局事务执行成功(也就是当前TM所调用的RM分支事务全部执行并提交成功),分支事务提交。其实分支事务在汇报给TC之前就已经将本地事务提交了,因此在此处只需要将无用的UndoLog日志删除即可:
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); // 分支事务提交(重点) response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } }
将事务信息加入异步队列,以异步的方式处理,提高效率:
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(xid, branchId, resourceId); // 将事务信息加入异步请求队列 }
public BranchStatus branchCommit(String xid, long branchId, String resourceId) { // 将信息封装成异步队列元素,加入队列中,并返回分支状态 Phase2Context context = new Phase2Context(xid, branchId, resourceId); addToCommitQueue(context); return BranchStatus.PhaseTwo_Committed; }
private void addToCommitQueue(Phase2Context context) { if (commitQueue.offer(context)) { // 此处的commitQueue其实是阻塞队列,判断如果事务信息已经存在,则不需要加入 return; } CompletableFuture .runAsync(this::doBranchCommitSafely, scheduledExecutor) .thenRun(() -> addToCommitQueue(context)); // 不断尝试加入阻塞队列 }
到此,只看到了将事务信息添加到异步队列中,那么阻塞队列中的事务信息如何处理?答案是在AsyncWorker的构造器中提供了计划任务线程池来处理:
public AsyncWorker(DataSourceManager dataSourceManager) { this.dataSourceManager = dataSourceManager; LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT); commitQueue = new linkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT); ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true); scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory); // 定义计划任务线程池 scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS); // 按照指定相隔时间,不断执行doBranchCommitSafely方法,处理事务信息 }
真正处理提交事务信息的方法入口:
void doBranchCommitSafely() { try { doBranchCommit(); } catch (Throwable e) { LOGGER.error("Exception occur when doing branch commit", e); } }
private void doBranchCommit() { if (commitQueue.isEmpty()) { // 阻塞队列为空,无需处理,直接返回 return; } ListallContexts = new linkedList<>(); commitQueue.drainTo(allContexts); // 将阻塞队列中的事务信息转移到allContexts列表中 Map > groupedContexts = groupedByResourceId(allContexts); // 根据TM处理范围进行事务信息分类(同一次TM调用的RM本地分支事务为一组) groupedContexts.forEach(this::dealWithGroupedContexts); }
private void dealWithGroupedContexts(String resourceId, Listcontexts) { DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId); if (dataSourceProxy == null) { LOGGER.warn("Failed to find resource for {}", resourceId); return; } Connection conn; try { conn = dataSourceProxy.getPlainConnection(); } catch (SQLException sqle) { LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle); return; } UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()); List > splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE); // 对需要删除的UndoLog数据进行限制,每次删除一千条 splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition)); // 分批次批量删除UndoLog }
private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, Listcontexts) { // 以下就是很常规的SQL执行 *** 作了,在此不过多讲解 Set xids = new linkedHashSet<>(contexts.size()); Set branchIds = new linkedHashSet<>(contexts.size()); contexts.forEach(context -> { xids.add(context.xid); branchIds.add(context.branchId); }); try { undoLogManager.batchDeleteUndoLog(xids, branchIds, conn); if (!conn.getAutoCommit()) { conn.commit(); } } catch (SQLException e) { LOGGER.error("Failed to batch delete undo log", e); try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx); } } finally { try { conn.close(); } catch (SQLException closeEx) { LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx); } } }
以上便是全局事务提交成功,分支事务异步删除UndoLog的整体逻辑。
最后,剩下还有如何处理全局事务失败,分支事务回滚:
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId); } BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); // 回滚分支事务 response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacked result: " + status); } }
@Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); // 执行UndoLog,回滚数据 } catch (TransactionException te) { StackTraceLogger.info(LOGGER, te, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationdata:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()}); if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; boolean originalAutoCommit = true; for (; ; ) { try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); // 根据分支事务ID和全局事务ID为条件,构建查询指定UndoLog数据的SQL selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { exists = true; // 在一阶段提交中UndoLog写入成功,标志存在 int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // 查看UndoLog数据的状态,确保只处理正常状态下UndoLog if (!canUndo(state)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } return; } String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT); Mapcontext = parseContext(contextString); byte[] rollbackInfo = getRollbackInfo(rs); String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { // put serializer name to local setCurrentSerializer(parser.getName()); List sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) { Collections.reverse(sqlUndoLogs); } for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { Tablemeta tablemeta = TablemetaCacheFactory.getTablemetaCache(dataSourceProxy.getDbType()).getTablemeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTablemeta(tablemeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); // 回滚数据 } } finally { // remove serializer name removeCurrentSerializer(); } } if (exists) { // UndoLog存在,并已成功回滚数据 deleteUndoLog(xid, branchId, conn); // 删除UndoLog conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else { // UndoLog不存在,说明在一阶段可能写入UndoLog时发生异常,但业务SQL已执行 insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); // 重新添加UndoLog conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } catch (SQLIntegrityConstraintViolationException e) { // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); } } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } } }
至此,Seata框架的AT模式的源码分析到此结束。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)