Seata源码(七)Seata事务故障处理
Java极客 | 作者 / 铿然一叶
这是Java极客的第 98 篇原创文章
相关阅读:
萌新快速成长之路 JAVA编程思想(一)通过依赖注入增加扩展性 JAVA编程思想(二)如何面向接口编程 JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则 JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选? Java编程思想(五)事件通知模式解耦过程 Java编程思想(六)事件通知模式解耦过程 Java编程思想(七)使用组合和继承的场景 JAVA基础(一)简单、透彻理解内部类和静态内部类 JAVA基础(二)内存优化-使用Java引用做缓存 JAVA基础(三)ClassLoader实现热加载 JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比 JAVA基础(五)函数式接口-复用,解耦之利刃 HikariPool源码(二)设计思想借鉴 【极客源码】JetCache源码(一)开篇 【极客源码】JetCache源码(二)顶层视图 如何编写软件设计文档 Seata源码(一)初始化 Seata源码(二)事务基础对象 Seata源码(三)事务处理类结构和流程 Seata源码(四)全局锁GlobalLock Seata源码(五)Seata数据库操作 Seata源码(六)Seata的undo日志操作
- 概述
Seata提供了故障处理接口和默认实现类,在发生故障时可以处理。
- 核心类结构
引入不同的库实例化FailureHandler子类的入口类不同:
groupId | artifactId | 入口类 |
---|---|---|
io.seata | seata-spring-boot-starter | SeataAutoConfiguration |
com.alibaba.cloud | spring-cloud-alibaba-seata | GlobalTransactionScanner |
-
代码
3.1 故障处理类实例化
3.1.1 seata-spring-boot-starter库
SeataAutoConfiguration.java
由SeataAutoConfiguration实例化默认实现类DefaultFailureHandlerImpl后通过构造器传入GlobalTransactionScanner:
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
3.1.2 spring-cloud-alibaba-seata库
GlobalTransactionAutoConfiguration.java
创建GlobalTransactionScanner,但并没有实例化FailureHandler的子类,调用的构造器为return new GlobalTransactionScanner(applicationName, txServiceGroup):
@Bean public GlobalTransactionScanner globalTransactionScanner() { String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name"); String txServiceGroup = this.seataProperties.getTxServiceGroup(); if (StringUtils.isEmpty(txServiceGroup)) { txServiceGroup = applicationName + "-fescar-service-group"; this.seataProperties.setTxServiceGroup(txServiceGroup); } return new GlobalTransactionScanner(applicationName, txServiceGroup); }
在GlobalTransactionScanner类中也没有实例化FailureHandler的子类,传给GlobalTransactionalInterceptor构造器的是个空对象:
if (globalTransactionalInterceptor == null) { // 这种场景下failureHandlerHook为null globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
GlobalTransactionalInterceptor.java 在此类中实例化了默认实现类DefaultFailureHandlerImpl:
// 常量定义默认实现类 private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl(); // 构造器中判断没有实例化则使用默认实现类 public GlobalTransactionalInterceptor(FailureHandler failureHandler) { this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
3.2 故障处理调用点
GlobalTransactionalInterceptor.java
在方法handleGlobalTransaction中对不同异常场景调用对应的接口:
} catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: succeed = false; failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: succeed = false; failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); case RollbackRetrying: failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); default: throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); }
3.3 默认实现类
DefaultFailureHandlerImpl.java
从实现来看,仅仅是实现了日志打印能力:
public class DefaultFailureHandlerImpl implements FailureHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class); /** * Retry 1 hours by default */ private static final int RETRY_MAX_TIMES = 6 * 60; private static final long SCHEDULE_INTERVAL_SECONDS = 10; private static final long TICK_DURATION = 1; private static final int TICKS_PER_WHEEL = 8; private HashedWheelTimer timer = new HashedWheelTimer( new NamedThreadFactory("failedTransactionRetry", 1), TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL); @Override public void onBeginFailure(GlobalTransaction tx, Throwable cause) { LOGGER.warn("Failed to begin transaction. ", cause); } @Override public void onCommitFailure(GlobalTransaction tx, Throwable cause) { LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause); timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS); } @Override public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) { LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException); timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS); } @Override public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) { StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()}); timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS); } protected class CheckTimerTask implements TimerTask { private final GlobalTransaction tx; private final GlobalStatus required; private int count = 0; private boolean isStopped = false; protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) { this.tx = tx; this.required = required; } @Override public void run(Timeout timeout) throws Exception { if (!isStopped) { if (++count > RETRY_MAX_TIMES) { LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES); return; } isStopped = shouldStop(tx, required); timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS); } } } private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) { try { GlobalStatus status = tx.getStatus(); LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status); if (status == required || status == GlobalStatus.Finished) { return true; } } catch (TransactionException e) { LOGGER.error("fetch GlobalTransaction status error", e); } return false; } }
- 扩展性
当使用spring-cloud-alibaba-seata库时,FailureHandler的子类实现是在GlobalTransactionalInterceptor类中使用默认的DefaultFailureHandlerImpl,没有办法扩展自定义实现。
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
当使用seata-spring-boot-starter库时,默认的FailureHandler按照如下方式创建: SeataAutoConfiguration.java
@Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); }
此时可以动态创建自定义的FailureHandler实现类来重载默认实现类实现扩展。
end.