Seata源码(四)全局锁GlobalLock
Java极客 | 作者 / 铿然一叶
这是Java极客的第 93 篇原创文章
相关阅读:
萌新快速成长之路 JAVA编程思想(一)通过依赖注入增加扩展性 JAVA编程思想(二)如何面向接口编程 JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则 JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选? Java编程思想(五)事件通知模式解耦过程 Java编程思想(六)事件通知模式解耦过程 Java编程思想(七)使用组合和继承的场景 JAVA基础(一)简单、透彻理解内部类和静态内部类 JAVA基础(二)内存优化-使用Java引用做缓存 JAVA基础(三)ClassLoader实现热加载 JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比 JAVA基础(五)函数式接口-复用,解耦之利刃 HikariPool源码(二)设计思想借鉴 【极客源码】JetCache源码(一)开篇 【极客源码】JetCache源码(二)顶层视图 如何编写软件设计文档 Seata源码(一)初始化 Seata源码(二)事务基础对象 Seata源码(三)事务处理类结构和流程
- 概述
1.1 作用
对某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。
1.2 场景
1.在微服务A中,业务方法A和业务方法C都会修改同一个表T。
2.在全局事务执行过程中,如果业务方法A修改了表T,但全局事务还没执行完,此时业务方法C修改了表T,就会影响业务方法A的回退,因为UNDO镜像和实际数据对不上了。
如果业务方法C也使用全局事务,因全局事务要和TC交互那么代价会很大,而GlobalLock仅使用本地事务,不需要和TC交互,且会判断是否和全局事务冲突,会等待全局事务执行完之后才执行。
- 类结构
类 | 描述 |
---|---|
GlobalLock | 全局锁注解,控制业务方法是否启用全局锁 |
GlobalTransactionalInterceptor | 全局事务拦截器,控制是走全局事务处理流程,还是全局锁处理流程 |
GlobalLockTemplate | 全局锁执行模版,编排全局锁的大流程 |
GlobalLockExecutor | 全员锁执行器,获取参数,调用业务方法,这个类的作用很弱,可要可不要 |
GlobalLockConfig | 全局锁配置,参数来自GlobalLock注解 |
GlobalLockConfigHolder | 全局锁Holder,缓存GlobalLockConfig到线程变量中 |
ConnectionProxy | DB Connection代理,增加事务和全局锁处理逻辑 |
DefaultResourceManager | 默认资源管理类,用于查询全局事务是否开启 |
LockRetryPolicy | 全局锁重试策略 |
LockRetryController | 全局锁重试控制器,根据重试次数,重试间隔参数进行重试 |
-
源码
3.1 入口
GlobalTransactionalInterceptor.java
有GlobalLock注解则调用如下方法:
Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable { return globalLockTemplate.execute(new GlobalLockExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } @Override public GlobalLockConfig getGlobalLockConfig() { GlobalLockConfig config = new GlobalLockConfig(); config.setLockRetryInternal(globalLockAnno.lockRetryInternal()); config.setLockRetryTimes(globalLockAnno.lockRetryTimes()); return config; } }); }
3.2 全局锁控制流
GlobalLockTemplate.java
public Object execute(GlobalLockExecutor executor) throws Throwable { // 判断全局锁是否存在,不存在则绑定全局锁标志 boolean alreadyInGlobalLock = RootContext.requireGlobalLock(); if (!alreadyInGlobalLock) { RootContext.bindGlobalLockFlag(); } // set my config to config holder so that it can be access in further execution // for example, LockRetryController can access it with config holder GlobalLockConfig myConfig = executor.getGlobalLockConfig(); GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig); try { return executor.execute(); } finally { // only unbind when this is the root caller. // otherwise, the outer caller would lose global lock flag if (!alreadyInGlobalLock) { RootContext.unbindGlobalLockFlag(); } // if previous config is not null, we need to set it back // so that the outer logic can still use their config if (previousConfig != null) { GlobalLockConfigHolder.setAndReturnPrevious(previousConfig); } else { GlobalLockConfigHolder.remove(); } } }
3.3 全局锁执行者
在GlobalTransactionalInterceptor类中创建匿名GlobalLockExecutor类来执行业务方法。
GlobalLockExecutor这个类非常简单,也没有提供扩展能力,看起了可以不需要此类,直接在globalLockTemplate中调用业务方法则可。
GlobalTransactionalInterceptor.java
return globalLockTemplate.execute(new GlobalLockExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } @Override public GlobalLockConfig getGlobalLockConfig() { GlobalLockConfig config = new GlobalLockConfig(); config.setLockRetryInternal(globalLockAnno.lockRetryInternal()); config.setLockRetryTimes(globalLockAnno.lockRetryTimes()); return config; } });
3.4 全局锁的使用
GlobalLockExecutor类调用了GlobalLock注解的业务方法,最终调用ConnectionProxy提交事务或回滚,在ConnectionProxy中使用全局锁。
3.4.1 根据是全局锁还是全局事务走不同分支:
ConnectionProxy.java
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } } private void processLocalCommitWithGlobalLocks() throws SQLException { checkLock(context.buildLockKeys()); try { targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); } context.reset(); }
3.4.2 检查全局锁
检查全局锁,AT模式的的全局事务会参与一起判断,获取不到锁则抛出锁冲突异常:
ConnectionProxy.java
public void checkLock(String lockKeys) throws SQLException { if (StringUtils.isBlank(lockKeys)) { return; } // Just check lock without requiring lock by now. try { // 检查AT模式的全局事务是否存在 boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT, getDataSourceProxy().getResourceId(), context.getXid(), lockKeys); if (!lockable) { throw new LockConflictException(); } } catch (TransactionException e) { recognizeLockKeyConflictException(e, lockKeys); } }
3.4.3 锁重试策略入口
全局锁检查抛出的异常在LockRetryPolicy的execute方法中被捕捉并重试:
ConnectionProxy.java
public void commit() throws SQLException { try { // 锁重试策略,全局锁检查抛出的异常在execute方法中被捕捉并重试 LOCK_RETRY_POLICY.execute(() -> { doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } }
3.4.4 锁重试策略
根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。
LockRetryPolicy.java
public T execute(Callable callable) throws Exception { // 根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。 if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { return callable.call(); } else { // 走重试逻辑 return doRetryOnLockConflict(callable); } } protected T doRetryOnLockConflict(Callable callable) throws Exception { LockRetryController lockRetryController = new LockRetryController(); while (true) { try { return callable.call(); } catch (LockConflictException lockConflict) { onException(lockConflict); lockRetryController.sleep(lockConflict); } catch (Exception e) { onException(e); throw e; } } }
3.4.5 锁重试控制器
根据重试次数和重试间隔决定是否重试:
参数 | 配置参数 |
---|---|
lockRetryInternal | lock.retryInterval |
lockRetryTimes | lock.retryTimes |
LockRetryController.java
public void sleep(Exception e) throws LockWaitTimeoutException {
if (--lockRetryTimes < 0) {
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
Thread.sleep(lockRetryInternal);
} catch (InterruptedException ignore) {
}
}
end.