源码分析 | MySQL 的 commit 是怎么 commit 的?
作者:李鹏博
爱可生 DBA 团队成员,主要负责 MySQL 故障处理和 SQL 审核优化。对技术执着,为客户负责。
本文来源:原创投稿
* 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
MySQL 的 commit
命令提交事务时,内部会进行两阶段(Prepare 和 Commit)提交,这篇文章基于 MySQL 8.0.33 对 MySQL 的两阶段提交进行源码分析,带你了解提交事务过程中都经历了什么。
以下是整体逻辑:
一、Prepare 阶段
1. Binlog Prepare
获取上一个事务最大的 sequence number
时间戳。
2. InnoDB Prepare
-
事务状态设置为
prepared
; - 释放 RC 及以下隔离级别的 GAP Lock;
-
将
Undo log
segment 的状态从TRX_UNDO_ACTIVE
修改为TRX_UNDO_PREPARED
; -
Undo log
写入事务 XID。
二、Commit 阶段
1. Stage 0
保证从实例的 commit order。
2. Flush Stage
-
根据
innodb_flush_log_at_trx_commit
参数进行redo log
的刷盘操作
-
获取并清空
BINLOG_FLUSH_STAGE
和COMMIT_ORDER_FLUSH_STAGE
队列 -
存储引擎层将
prepare
状态的redo log
根据innodb_flush_log_at_trx_commit
参数刷盘 - 不再阻塞 slave 的 preserve commit order 的执行
get_server_sidno()
和
Gtid_state::get_automatic_gno()
生成 GTID
- Flush stmt_cache
- Flush trx_cache
- 生成 last_committed 和 sequence_number
- flush GTID log event
- 将 trx_cache 中的数据 flush 到 binlog cache 中
- 准备提交事务后的 Binlog pos
- 递增 prepread XID
sync_binlog!=1
,在 flush stage 更新 Binlog 位点,并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新
3. Sync Stage
- 根据 sync_binlog 的参数设置进行刷盘前的等待并调用 fsync() 进行刷盘
- 如果 sync_binlog==1,在 sync stage 阶段更新 binog 位点,并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新
4. Commit Stage
- after_sync hook(半同步复制 after_sync 的钩子)
- 更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始化事务上下文的 sequence number
- Binlog 层提交,什么也不做
- 存储引擎层提交
- 为持久化 GTID 提前分配 update undo segment
- 更新数据字典中被修改表的 update_time 时间
- 分配 Mini-transaction handle和buffer
- 更新 undo 状态
- 对于 insert 状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_TO_FREE,update 修改为 TRX_UNDO_TO_PURGE
- 如果事务为 update 还需要将 rollback segments 分配 trx no,并将其添加到 purge 队列中
- 将 update undo log header 添加到 history list 开头释放一些内存对象
- 在系统事务表记录 binlog 位点
- 关闭 mvcc read view
- 持久化 GTID
- 释放insert undo log
- 唤醒后台线程开始干活,如 master thread、purge thread、page_cleaner
m_stage_cond_binlog
信号变量,唤醒挂起的 follower
了解完整体逻辑,对源码分析感兴趣的请继续往下(建议 PC 端阅读)。
ha_commit_trans
函数主要判断是否需要写入 GTID 信息,并开始两阶段提交:
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock) {<br> /*<br> Save transaction owned gtid into table before transaction prepare<br> if binlog is disabled, or binlog is enabled and log_replica_updates<br> is disabled with slave SQL thread or slave worker thread.<br> */<br> std::tie(error, need_clear_owned_gtid) = commit_owned_gtids(thd, all);<br>...<br> // Prepare 阶段<br> if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))<br> error = tc_log->prepare(thd, all);<br>...<br> // Commit 阶段<br> if (error || (error = tc_log->commit(thd, all))) {<br> ha_rollback_trans(thd, all);<br> error = 1;<br> goto end;<br> }<br>}<br>
Prepare 阶段
两阶段提交的 Prepare 阶段相对简单,以下是 commit
命令入口及 Prepare 阶段的堆栈和相关作用:
|mysql_execute_command<br>|--trans_commit<br>|----ha_commit_trans<br>|------MYSQL_BIN_LOG::prepare<br><br>// 开启 binlog prepare 和 innodb prepare<br>|--------ha_prepare_low <br><br>// Binlog prepare:获取上一个事务最大的 sequence number 时间戳<br>|----------binlog_prepare <br><br>// innodb prepare<br>|----------innobase_xa_prepare <br>|------------trx_prepare_for_mysql<br><br>// 1. 调用 trx_prepare_low <br>// 2. 事务状态设置为Prepared <br>// 3. 释放 RC 及以下隔离级别的 GAP Lock <br>// 4. 刷盘 Redo(已推迟到 Commit 阶段的 Flush stage)<br>|--------------trx_prepare <br>|----------------trx_prepare_low<br><br>// 1. 将 undo log segment 的状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_PREPARED <br>// 2. undo log 写入事务 XID<br>|------------------trx_undo_set_state_at_prepare<br>
Commit 阶段
Commit 阶段的功能实现主要集中在 MYSQL_BIN_LOG::ordered_commit
函数中。
Flush 阶段
首先看下 Stage 0 和 Stage 1,stage 0 主要是 8.0 新增的一个阶段,主要是针对从库保证 commit order。stage 1 就是大家耳熟能详的 Commit 阶段的三个小阶段其一的 Flush 阶段了:
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {<br> <br> /*<br> Stage #0: 保证从实例的 SQL 线程按照 Relay log 的事务顺序进行提交<br> */<br> if (Commit_order_manager::wait_for_its_turn_before_flush_stage(thd) ||<br> ending_trans(thd, all) ||<br> Commit_order_manager::get_rollback_status(thd)) {<br> if (Commit_order_manager::wait(thd)) {<br> return thd->commit_error;<br> }<br> }<br> <br> /*<br> Stage #1: flushing transactions to binary log<br> <br> While flushing, we allow new threads to enter and will process<br> them in due time. Once the queue was empty, we cannot reap<br> anything more since it is possible that a thread entered and<br> appointed itself leader for the flush phase.<br> */<br> <br> if (change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr,<br> &LOCK_log)) {<br> DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br> thd->commit_error));<br> return finish_commit(thd);<br> }<br> <br> THD *wait_queue = nullptr, *final_queue = nullptr;<br> mysql_mutex_t *leave_mutex_before_commit_stage = nullptr;<br> my_off_t flush_end_pos = 0;<br> bool update_binlog_end_pos_after_sync;<br> <br> // Flush 阶段主要的处理逻辑<br> flush_error =<br> process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);<br> <br> if (flush_error == 0 && total_bytes > 0)<br> /*<br> flush binlog cache到file cache<br> */<br> flush_error = flush_cache_to_file(&flush_end_pos);<br> <br> // 后面根据 sync_binlog 参数决定更新 binlog pos 的位置并广播 Binlog 更新信号<br> update_binlog_end_pos_after_sync = (get_sync_period() == 1);<br> <br> /*<br> If the flush finished successfully, we can call the after_flush<br> hook. Being invoked here, we have the guarantee that the hook is<br> executed before the before/after_send_hooks on the dump thread<br> preventing race conditions among these plug-ins.<br> */<br> if (flush_error == 0) {<br> const char *file_name_ptr = log_file_name + dirname_length(log_file_name);<br> assert(flush_end_pos != 0);<br> /*<br> 插桩调用 after_flush,将已经 flush 的 binlog file 和 position 注册到半同步复制插件中,<br> 用于后续对比 slave 应答接受到的 binlog position。<br> */<br> if (RUN_HOOK(binlog_storage, after_flush,<br> (thd, file_name_ptr, flush_end_pos))) {<br> LogErr(ERROR_LEVEL, ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK);<br> flush_error = ER_ERROR_ON_WRITE;<br> }<br> <br> // 如果 sync_binlog!=1,在 flush stage 更新 binlog 位点并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新<br> if (!update_binlog_end_pos_after_sync) update_binlog_end_pos();<br> }<br>
Flush stage 的主要处理逻辑集中在 process_flush_stage_queue
:
<br>int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,<br> bool *rotate_var,<br> THD **out_queue_var) {<br> <br> int no_flushes = 0;<br> my_off_t total_bytes = 0;<br> mysql_mutex_assert_owner(&LOCK_log);<br> // 根据 innodb_flush_log_at_trx_commit 参数进行 redo log 的刷盘操作<br> THD *first_seen = fetch_and_process_flush_stage_queue();<br> <br> // 调用 get_server_sidno() 和 Gtid_state::get_automatic_gno 生成 GTID<br> assign_automatic_gtids_to_flush_group(first_seen);<br> /* Flush thread caches to binary log. */<br> for (THD *head = first_seen; head; head = head->next_to_commit) {<br> Thd_backup_and_restore switch_thd(current_thd, head);<br> /*<br> flush binlog_cache_mngr 的 stmt_cache和trx_cache。<br> flush trx_cache:<br> - 生成 last_committed 和 sequence_number<br> - flush GTID log event<br> - 将 trx_cache 中的数据 flush 到 binlog cache 中<br> - 准备提交事务后的 Binlog pos<br> - 递增 prepread XID<br> */<br> std::pair<int, my_off_t> result = flush_thread_caches(head);<br> total_bytes += result.second;<br> if (flush_error == 1) flush_error = result.first;<br>#ifndef NDEBUG<br> no_flushes++;<br>#endif<br> }<br> <br> *out_queue_var = first_seen;<br> *total_bytes_var = total_bytes;<br> if (total_bytes > 0 &&<br> (m_binlog_file->get_real_file_size() >= (my_off_t)max_size ||<br> DBUG_EVALUATE_IF("simulate_max_binlog_size", true, false)))<br> *rotate_var = true;<br>#ifndef NDEBUG<br> DBUG_PRINT("info", ("no_flushes:= %d", no_flushes));<br> no_flushes = 0;<br>#endif<br> return flush_error;<br>}<br>
redo log 刷盘的堆栈如下:
<br>// 获取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 队列,flush 事务到磁盘;不再阻塞 slave 的 preserve commit order 的执行<br>|fetch_and_process_flush_stage_queue <br><br>// 存储引擎层将 prepare 状态的 redo log 根据 innodb_flush_log_at_trx_commit 参数刷盘<br>|--ha_flush_logs <br>|----innobase_flush_logs<br>|------log_buffer_flush_to_disk<br>
SYNC 阶段
Sync 阶段的代码如下:
/*<br> Stage #2: Syncing binary log file to disk<br>*/<br> <br>if (change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log,<br> &LOCK_sync)) {<br> DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br> thd->commit_error));<br> return finish_commit(thd);<br>}<br> <br>/*<br> - sync_counter:commit group的数量<br> - get_sync_period():获取sync_binlog参数的值<br> - 如果sync stage队列中的commit group大于等于sync_binlog的值,当前leader就调用fsync()进行刷盘操作(sync_binlog_file(false)),<br> 在sync之前可能会进行等待,等待更多的commit group入队,等待的时间为binlog_group_commit_sync_no_delay_count或binlog_group_commit_sync_delay,默认都为0。<br> - 如果sync stage队列中的commit group小于sync_binlog的值,当前leader不会调用fsync()进行刷盘也不会等待<br> - 如果sync_binlog为0,每个commit group都会触发等待动作,但是不会sync<br> - 如果sync_binlog为1,每个commit group都会触发等待动作,且会sync<br>*/<br>if (!flush_error && (sync_counter + 1 >= get_sync_period()))<br> Commit_stage_manager::get_instance().wait_count_or_timeout(<br> opt_binlog_group_commit_sync_no_delay_count,<br> opt_binlog_group_commit_sync_delay, Commit_stage_manager::SYNC_STAGE);<br> <br>final_queue = Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br> Commit_stage_manager::SYNC_STAGE);<br> <br>if (flush_error == 0 && total_bytes > 0) {<br> DEBUG_SYNC(thd, "before_sync_binlog_file");<br> std::pair<bool, bool> result = sync_binlog_file(false);<br> sync_error = result.first;<br>}<br> <br>/*<br> 如果sync_binlog==1,在sync stage阶段更新binog位点,并广播update信号,从库的Dump线程可以由此感知Binlog的更新<br> (位点在flush stage中的process_flush_stage_queue()<br> |--flush_thread_caches()<br> |-----set_trans_pos()函数中设置)<br>*/<br>if (update_binlog_end_pos_after_sync && flush_error == 0 && sync_error == 0) {<br> THD *tmp_thd = final_queue;<br> const char *binlog_file = nullptr;<br> my_off_t pos = 0;<br> <br> while (tmp_thd != nullptr) {<br> if (tmp_thd->commit_error == THD::CE_NONE) {<br> tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);<br> }<br> tmp_thd = tmp_thd->next_to_commit;<br> }<br> <br> if (binlog_file != nullptr && pos > 0) {<br> update_binlog_end_pos(binlog_file, pos);<br> }<br>}<br> <br>DEBUG_SYNC(thd, "bgc_after_sync_stage_before_commit_stage");<br> <br>leave_mutex_before_commit_stage = &LOCK_sync;<br>
COMMIT 阶段
Commit 阶段的代码如下:
/*<br> Stage #3: Commit all transactions in order.<br> */<br>commit_stage:<br> /* binlog_order_commits:是否进行 order commit,即保持 redo 和 binlog 的提交顺序一致 */<br> if ((opt_binlog_order_commits || Clone_handler::need_commit_order()) &&<br> (sync_error == 0 || binlog_error_action != ABORT_SERVER)) {<br> if (change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue,<br> leave_mutex_before_commit_stage, &LOCK_commit)) {<br> DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br> thd->commit_error));<br> return finish_commit(thd);<br> }<br> THD *commit_queue =<br> Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br> Commit_stage_manager::COMMIT_STAGE);<br> DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",<br> DEBUG_SYNC(thd, "before_process_commit_stage_queue"););<br> <br> if (flush_error == 0 && sync_error == 0)<br> /* after_sync hook */<br> sync_error = call_after_sync_hook(commit_queue);<br> <br> /*<br> Commit 阶段的主要处理逻辑<br> */<br> process_commit_stage_queue(thd, commit_queue);<br> <br> /**<br> * After commit stage<br> */<br> if (change_stage(thd, Commit_stage_manager::AFTER_COMMIT_STAGE,<br> commit_queue, &LOCK_commit, &LOCK_after_commit)) {<br> DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br> thd->commit_error));<br> return finish_commit(thd);<br> }<br> <br> THD *after_commit_queue =<br> Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br> Commit_stage_manager::AFTER_COMMIT_STAGE);<br> /* after_commit hook */<br> process_after_commit_stage_queue(thd, after_commit_queue);<br> <br> final_queue = after_commit_queue;<br> mysql_mutex_unlock(&LOCK_after_commit);<br> } else {<br> if (leave_mutex_before_commit_stage)<br> mysql_mutex_unlock(leave_mutex_before_commit_stage);<br> if (flush_error == 0 && sync_error == 0)<br> sync_error = call_after_sync_hook(final_queue);<br> }<br> <br> <br> /* 广播 m_stage_cond_binlog 信号变量,唤醒挂起的 follower */<br> Commit_stage_manager::get_instance().signal_done(final_queue);<br> DBUG_EXECUTE_IF("block_leader_after_delete", {<br> const char action[] = "now SIGNAL leader_proceed";<br> assert(!debug_sync_set_action(thd, STRING_WITH_LEN(action)));<br> };);<br> <br> /*<br> Finish the commit before executing a rotate, or run the risk of a<br> deadlock. We don't need the return value here since it is in<br> thd->commit_error, which is returned below.<br> */<br> (void)finish_commit(thd);<br> DEBUG_SYNC(thd, "bgc_after_commit_stage_before_rotation");<br> <br> return thd->commit_error == THD::CE_COMMIT_ERROR;<br>}<br>
Commit 阶段的主要处理逻辑集中在 process_commit_stage_queue
函数中:
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) {<br> mysql_mutex_assert_owner(&LOCK_commit);<br>#ifndef NDEBUG<br> thd->get_transaction()->m_flags.ready_preempt =<br> true; // formality by the leader<br>#endif<br> for (THD *head = first; head; head = head->next_to_commit) {<br> DBUG_PRINT("debug", ("Thread ID: %u, commit_error: %d, commit_pending: %s",<br> head->thread_id(), head->commit_error,<br> YESNO(head->tx_commit_pending)));<br> DBUG_EXECUTE_IF(<br> "block_leader_after_delete",<br> if (thd != head) { DBUG_SET("+d,after_delete_wait"); };);<br> /*<br> If flushing failed, set commit_error for the session, skip the<br> transaction and proceed with the next transaction instead. This<br> will mark all threads as failed, since the flush failed.<br> <br> If flush succeeded, attach to the session and commit it in the<br> engines.<br> */<br>#ifndef NDEBUG<br> Commit_stage_manager::get_instance().clear_preempt_status(head);<br>#endif<br> /*<br> 更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),<br> 并初始本事务上下文的 sequence number<br> */<br> if (head->get_transaction()->sequence_number != SEQ_UNINIT) {<br> mysql_mutex_lock(&LOCK_replica_trans_dep_tracker);<br> m_dependency_tracker.update_max_committed(head);<br> mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker);<br> }<br> /*<br> Flush/Sync error should be ignored and continue<br> to commit phase. And thd->commit_error cannot be<br> COMMIT_ERROR at this moment.<br> */<br> assert(head->commit_error != THD::CE_COMMIT_ERROR);<br> Thd_backup_and_restore switch_thd(thd, head);<br> bool all = head->get_transaction()->m_flags.real_commit;<br> assert(!head->get_transaction()->m_flags.commit_low ||<br> head->get_transaction()->m_flags.ready_preempt);<br> // Binlog Commit、Innodb Commit<br> ::finish_transaction_in_engines(head, all, false);<br> DBUG_PRINT("debug", ("commit_error: %d, commit_pending: %s",<br> head->commit_error, YESNO(head->tx_commit_pending)));<br> }<br> <br> /*<br> 锁定 sidno,更新整组事务 的executed_gtid<br> - 如果没开启 binlog,@@GLOBAL.GTID_PURGED 的值是从 executed_gtid 获取的,<br> 此时 @@GLOBAL.GTID_PURGED 的值和 @@GLOBAL.GTID_EXECUTED 永远是一致的,<br> 就不需要在记录 lost_gtids<br> - 如果开启了 binlog,但是未开启 log_replica_updates,slave 的 SQL 线程或 slave worker 线程<br> 将自身的 GTID 更新到 executed_gtids、lost_gtids<br> */<br> gtid_state->update_commit_group(first);<br> <br> for (THD *head = first; head; head = head->next_to_commit) {<br> Thd_backup_and_restore switch_thd(thd, head);<br> auto all = head->get_transaction()->m_flags.real_commit;<br> // 只针对外部 XA 事务,在存储引擎层将事务标记为 Prepared<br> trx_coordinator::set_prepared_in_tc_in_engines(head, all);<br> /*<br> 在存储引擎层提交之后,递减 Prepared 状态下的 XID 计数器<br> */<br> if (head->get_transaction()->m_flags.xid_written) dec_prep_xids(head);<br> }<br>}<br>
其中 ::finish_transaction_in_engines
函数是主要的存储引擎层提交逻辑,相关堆栈如下:
|::finish_transaction_in_engines<br>|--trx_coordinator::commit_in_engines<br>|----ha_commit_low<br><br>// Binlog 层提交什么也不做(空函数)<br>|------binlog_commit<br><br>// 存储引擎层提交<br>|------innobase_commit <br>|--------innobase_commit_low<br>|----------trx_commit_for_mysql<br><br>// 为持久化 GTID 提前分配 update undo segment<br>|------------trx_undo_gtid_add_update_undo <br><br>// 更新数据字典中被修改表的 update_time 时间<br>|------------trx_update_mod_tables_timestamp <br><br>// 分配 Mini-transaction handle 和 buffer<br>|------------trx_commit <br><br>// 提交 mini-transaction<br>|--------------trx_commit_low <br>|----------------trx_write_serialisation_history<br><br>// 更新 undo 状态:<br>// 对于 insert 状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_TO_FREE<br>// update 修改为 TRX_UNDO_TO_PURGE<br>// 如果事务为 update 还需要将 rollback segments 分配 trx no,并将其添加到 purge 队列中<br>|------------------trx_undo_set_state_at_finish <br><br>//将 update undo log header 添加到 history list 开头释放一些内存对象;<br>|------------------trx_undo_update_cleanup <br><br> // 在系统事务表记录 binlog 位点<br>|------------------trx_sys_update_mysql_binlog_offset <br>|----------------trx_commit_in_memory<br><br>//- 关闭 mvcc read view<br>//- 持久化 GTID<br>//- 释放 insert undo log<br>//- 唤醒后台线程开始干活,如:master thread、purge thread、page_cleaner<br>
阶段转换
阶段转换的逻辑主要是由 change_stage
中的 enroll_for
函数实现:
- 进入队列的第一个线程会作为整组事务的 leader
- 后续进入队列的线程会作为整组事务的 follower
- follower 线程挂起等待m_stage_cond_binlog信号变量唤醒
- leader 负责提交整组事务,提交完成后,发送m_stage_cond_binlog 信号变量唤醒挂起的 follower
- 队列转化的主要逻辑是线程先入下个阶段的队列,然后再释放上一个阶段的 mutex,然后再获取下一个阶段的 mutex
- Flush Stage 不会获取 mutex
- Sync Stage 需要获取 LOCK_sync
- Commit Stage 需要获取 LOCK_commit mutex
- After Commit Stage 需要获取 LOCK_after_commit mutex
bool Commit_stage_manager::enroll_for(StageID stage, THD *thd,<br> mysql_mutex_t *stage_mutex,<br> mysql_mutex_t *enter_mutex) {<br> // 如果队列为空,线程就是 leader<br> thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket();<br> bool leader = this->append_to(stage, thd);<br><br> /*<br> 如果 FLUSH stage 队列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不为空,此线程就不能成为 leader。leader<br> 需要获取 enter_mutex<br> */<br> if (leader) {<br> if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br> leader = m_queue[BINLOG_FLUSH_STAGE].is_empty();<br> /*<br> leader 转换的逻辑。<br> session 的队列有5种:<br> - Binlog flush queue: flush redo 并写 Binlog File<br> - Commit order flush queue: 针对 commit order 的事务,但是会参与 group commit 的开头部分,直到引擎层的 flush。<br> - Sync queue: sync transaction<br> - Commit queue: 提交事务<br> - After commit queue: 调用事务的 after_commit hook<br> */<br> } else if (stage == BINLOG_FLUSH_STAGE && // 当前线程是 BINLOG_FLUSH_STAGE 中的第一个线程;但是 COMMIT_ORDER_FLUSH_STAGE<br> // 已经有了 leader,此时当前线程会挂起,等待 COMMIT_ORDER_FLUSH_STAGE 的 leader 的信号唤醒<br> !m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty()) {<br> /*<br> 当前事务是 binlog queue 中的第一个线程,但是在 commit order queue 中已经有了一个 leader。<br> 此时当前线程会作为 leader,而 commit order leader 会转变为 follower。<br> 改变 leader 的原因是 commit order leader 不能作为 binlog 线程的 leader,因为 commit order threads<br> 必须在 binlog threads 操作完之前离开 commit group。<br> 转变 leader 为 followers 的步骤如下:<br> 1. commit order thread 首先进入 flush stage,并成为 commit order leader。<br> 2. commit order leader 尝试获取 stage mutex,这可能会需要一些时间,比如 mutex 已经被上一个<br> commit group的leader获取。<br> 3. 在此期间,一个 binlog 线程进入了 flush stage。它需要等待来自 commit order leader 的信号。<br> 4. commit order leader 获取了 stage mutex,然后它会检查是否有 binlog thread进入了 flush stage,<br> 如果发现了就转变 leader。<br> 5. commit order leader 给 binlog leader发送一个信号,并成为 follower,等待 commit 的完成<br> (和其他 follower 的行为一致)。<br> 6. binlog leader 被 commit order leader 的信号唤醒并执行 group commit。<br> */<br> CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait");<br> while (thd->tx_commit_pending)<br> mysql_cond_wait(&m_stage_cond_leader,<br> &m_queue_lock[BINLOG_FLUSH_STAGE]);<br> }<br> }<br><br> unlock_queue(stage);<br><br> /*<br> 通知下一个组提交事务进入队列<br> */<br> if (stage == BINLOG_FLUSH_STAGE) {<br> Commit_order_manager::finish_one(thd);<br> CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait");<br> } else if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br> Commit_order_manager::finish_one(thd);<br> }<br><br> /*<br> 当进入第一个 stage 时,可以不用获取 stage mutex<br> */<br> if (stage_mutex && need_unlock_stage_mutex) mysql_mutex_unlock(stage_mutex);<br><br> /*<br> 如果队列非空,当前线程作为 follower 等待 leader 处理队列<br> */<br> if (!leader) {<br> CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait");<br> mysql_mutex_lock(&m_lock_done);<br>#ifndef NDEBUG<br> thd->get_transaction()->m_flags.ready_preempt = true;<br> if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt);<br>#endif<br> // tx_commit_pending:还有事务 commit 操作未完成<br> while (thd->tx_commit_pending) {<br> if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br> mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);<br> } else {<br> // follower 线程在此处挂起,等待 leader 提交事务完成后被唤醒<br> mysql_cond_wait(&m_stage_cond_binlog, &m_lock_done);<br> }<br> }<br><br> mysql_mutex_unlock(&m_lock_done);<br> return false;<br> }<br><br>#ifndef NDEBUG<br> if (stage == Commit_stage_manager::SYNC_STAGE)<br> DEBUG_SYNC(thd, "bgc_between_flush_and_sync");<br>#endif<br><br> bool need_lock_enter_mutex = false;<br> if (leader && enter_mutex != nullptr) {<br> /*<br> 如果由于在轮替 Binlog 时已经获取了 LOCK_log,就不在需要获取 enter_mutex。<br> */<br> need_lock_enter_mutex = !(mysql_bin_log.is_rotating_caused_by_incident &&<br> enter_mutex == mysql_bin_log.get_log_lock());<br><br> if (need_lock_enter_mutex)<br> mysql_mutex_lock(enter_mutex);<br> else<br> mysql_mutex_assert_owner(enter_mutex);<br> }<br><br> // leader 转换的逻辑<br> if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br> CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(<br> "after_commit_order_thread_becomes_leader");<br> lock_queue(stage);<br><br> if (!m_queue[BINLOG_FLUSH_STAGE].is_empty()) {<br> if (need_lock_enter_mutex) mysql_mutex_unlock(enter_mutex);<br><br> THD *binlog_leader = m_queue[BINLOG_FLUSH_STAGE].get_leader();<br> binlog_leader->tx_commit_pending = false;<br><br> mysql_cond_signal(&m_stage_cond_leader);<br> unlock_queue(stage);<br><br> mysql_mutex_lock(&m_lock_done);<br> /* wait for signal from binlog leader */<br> CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(<br> "before_commit_order_leader_waits_for_binlog_leader");<br> while (thd->tx_commit_pending)<br> mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);<br> mysql_mutex_unlock(&m_lock_done);<br><br> leader = false;<br> return leader;<br> }<br> }<br><br> return leader;<br><br>
本文关键字
:#MySQL# #事务# #源码#
<h5 data-tool="mdnice编辑器"></h5>
文章推荐:
技术分享 | 一文了解 MySQL Optimizer Trace 的神奇功效
技术分享 | OceanBase 慢查询排查思路
技术分享 | OceanBase写入限速源码解读
故障分析 | innodb_thread_concurrency 导致数据库异常的问题分析
故障分析 | OceanBase 频繁更新数据后读性能下降的排查
故障分析 | MySQL 升级到 8.0 变慢问题分析
技术分享 | 一招解决 MySQL 中 DDL 被阻塞的问题
故障分析 | 一条本该记录到慢日志的 SQL 是如何被漏掉的
关于 SQLE
爱可生开源社区的 SQLE 是一款面向数据库使用者和管理者,支持多场景审核,支持标准化上线流程,原生支持 MySQL 审核且数据库类型可扩展的 SQL 审核工具。
SQLE 获取
<table>
<tr>
<th>类型</th>
<th>地址</th>
</tr>
<tbody>
<tr>
<td>版本库</td>
<td>https://github.com/actiontech/sqle</td>
</tr>
<tr>
<td>文档</td>
<td>https://actiontech.github.io/sqle-docs-cn/</td>
</tr>
<tr>
<td>发布信息</td>
<td>https://github.com/actiontech/sqle/releases</td>
</tr>
<tr>
<td>数据审核插件开发文档</td>
<td>https://actiontech.github.io/sqle-docs-cn/3.modules/3.7_auditplugin/auditplugin_development.html</td>
</tr>
</tbody>
</table>
提交有效 pr,高质量 issue,将获赠面值 200-500 元(具体面额依据质量而定)京东卡以及爱可生开源社区精美周边!
更多关于 SQLE 的信息和交流,请加入官方QQ交流群:637150065
本文分享自微信公众号 - 爱可生开源社区(ActiontechOSS)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。