源码分析 | MySQL 的 commit 是怎么 commit 的?

作者:李鹏博

爱可生 DBA 团队成员,主要负责 MySQL 故障处理和 SQL 审核优化。对技术执着,为客户负责。

本文来源:原创投稿

* 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


MySQL 的 commit 命令提交事务时,内部会进行两阶段(Prepare 和 Commit)提交,这篇文章基于 MySQL 8.0.33 对 MySQL 的两阶段提交进行源码分析,带你了解提交事务过程中都经历了什么。

以下是整体逻辑:

一、Prepare 阶段

1. Binlog Prepare

获取上一个事务最大的 sequence number 时间戳。

2. InnoDB Prepare

  1. 事务状态设置为 prepared
  2. 释放 RC 及以下隔离级别的 GAP Lock;
  3. Undo log segment 的状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_PREPARED
  4. Undo log 写入事务 XID。

二、Commit 阶段

1. Stage 0

保证从实例的 commit order。

2. Flush Stage

  1. 根据 innodb_flush_log_at_trx_commit 参数进行 redo log 的刷盘操作
  • 获取并清空 BINLOG_FLUSH_STAGECOMMIT_ORDER_FLUSH_STAGE 队列
  • 存储引擎层将 prepare 状态的 redo log 根据 innodb_flush_log_at_trx_commit 参数刷盘
  • 不再阻塞 slave 的 preserve commit order 的执行
  • 调用 get_server_sidno()Gtid_state::get_automatic_gno() 生成 GTID
  • Flush binlog_cache_mngr
    • Flush stmt_cache
    • Flush trx_cache
      • 生成 last_committed 和 sequence_number
      • flush GTID log event
      • 将 trx_cache 中的数据 flush 到 binlog cache 中
      • 准备提交事务后的 Binlog pos
      • 递增 prepread XID
  • 插桩调用after_flush,将已经 flush 的 binlog file 和 position 注册到半同步复制插件中
  • 如果 sync_binlog!=1,在 flush stage 更新 Binlog 位点,并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新
  • 3. Sync Stage

    1. 根据 sync_binlog 的参数设置进行刷盘前的等待并调用 fsync() 进行刷盘
    2. 如果 sync_binlog==1,在 sync stage 阶段更新 binog 位点,并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新

    4. Commit Stage

    1. after_sync hook(半同步复制 after_sync 的钩子)
    2. 更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始化事务上下文的 sequence number
    3. Binlog 层提交,什么也不做
    4. 存储引擎层提交
    • 为持久化 GTID 提前分配 update undo segment
    • 更新数据字典中被修改表的 update_time 时间
    • 分配 Mini-transaction handle和buffer
    • 更新 undo 状态
      • 对于 insert 状态从 TRX_UNDO_ACTIVE  修改为 TRX_UNDO_TO_FREE,update 修改为 TRX_UNDO_TO_PURGE
      • 如果事务为 update 还需要将 rollback segments 分配 trx no,并将其添加到 purge 队列中
    • 将 update undo log header 添加到 history list 开头释放一些内存对象
    • 在系统事务表记录 binlog 位点
    • 关闭 mvcc read view
    • 持久化 GTID
    • 释放insert undo log
    • 唤醒后台线程开始干活,如 master thread、purge thread、page_cleaner
  • 更新整组事务的 executed_gtid
  • 在存储引擎层提交之后,递减 Prepared 状态下的 XID 计数器
  • after_sync hook(半同步复制 after_commit的钩子)
  • 广播 m_stage_cond_binlog 信号变量,唤醒挂起的 follower

  • 了解完整体逻辑,对源码分析感兴趣的请继续往下(建议 PC 端阅读)。


    ha_commit_trans 函数主要判断是否需要写入 GTID 信息,并开始两阶段提交:

    int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock) {<br>  /*<br>    Save transaction owned gtid into table before transaction prepare<br>    if binlog is disabled, or binlog is enabled and log_replica_updates<br>    is disabled with slave SQL thread or slave worker thread.<br>  */<br>  std::tie(error, need_clear_owned_gtid) = commit_owned_gtids(thd, all);<br>...<br>  // Prepare 阶段<br>  if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))<br>    error = tc_log->prepare(thd, all);<br>...<br>  // Commit 阶段<br> if (error || (error = tc_log->commit(thd, all))) {<br>    ha_rollback_trans(thd, all);<br>    error = 1;<br>    goto end;<br>  }<br>}<br>

    Prepare 阶段

    两阶段提交的 Prepare 阶段相对简单,以下是 commit 命令入口及 Prepare 阶段的堆栈和相关作用:

    |mysql_execute_command<br>|--trans_commit<br>|----ha_commit_trans<br>|------MYSQL_BIN_LOG::prepare<br><br>// 开启 binlog prepare 和 innodb prepare<br>|--------ha_prepare_low       <br><br>// Binlog prepare:获取上一个事务最大的 sequence number 时间戳<br>|----------binlog_prepare   <br><br>// innodb prepare<br>|----------innobase_xa_prepare                <br>|------------trx_prepare_for_mysql<br><br>// 1. 调用 trx_prepare_low <br>// 2. 事务状态设置为Prepared <br>// 3. 释放 RC 及以下隔离级别的 GAP Lock <br>// 4. 刷盘 Redo(已推迟到 Commit 阶段的 Flush stage)<br>|--------------trx_prepare                        <br>|----------------trx_prepare_low<br><br>// 1. 将 undo log segment 的状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_PREPARED <br>// 2. undo log 写入事务 XID<br>|------------------trx_undo_set_state_at_prepare<br>

    Commit 阶段

    Commit 阶段的功能实现主要集中在 MYSQL_BIN_LOG::ordered_commit 函数中。

    Flush 阶段

    首先看下 Stage 0 和 Stage 1,stage 0 主要是 8.0 新增的一个阶段,主要是针对从库保证 commit order。stage 1 就是大家耳熟能详的 Commit 阶段的三个小阶段其一的 Flush 阶段了:

    int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {<br> <br>  /*<br>    Stage #0: 保证从实例的 SQL 线程按照 Relay log 的事务顺序进行提交<br>  */<br>  if (Commit_order_manager::wait_for_its_turn_before_flush_stage(thd) ||<br>      ending_trans(thd, all) ||<br>      Commit_order_manager::get_rollback_status(thd)) {<br>    if (Commit_order_manager::wait(thd)) {<br>      return thd->commit_error;<br>    }<br>  }<br> <br>  /*<br>    Stage #1: flushing transactions to binary log<br> <br>    While flushing, we allow new threads to enter and will process<br>    them in due time. Once the queue was empty, we cannot reap<br>    anything more since it is possible that a thread entered and<br>    appointed itself leader for the flush phase.<br>  */<br> <br>  if (change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr,<br>                   &LOCK_log)) {<br>    DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br>                          thd->commit_error));<br>    return finish_commit(thd);<br>  }<br> <br>  THD *wait_queue = nullptr, *final_queue = nullptr;<br>  mysql_mutex_t *leave_mutex_before_commit_stage = nullptr;<br>  my_off_t flush_end_pos = 0;<br>  bool update_binlog_end_pos_after_sync;<br> <br>  // Flush 阶段主要的处理逻辑<br>  flush_error =<br>      process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);<br> <br>  if (flush_error == 0 && total_bytes > 0)<br>    /*<br>      flush binlog cache到file cache<br>    */<br>    flush_error = flush_cache_to_file(&flush_end_pos);<br> <br>  // 后面根据 sync_binlog 参数决定更新 binlog pos 的位置并广播 Binlog 更新信号<br>  update_binlog_end_pos_after_sync = (get_sync_period() == 1);<br> <br>  /*<br>    If the flush finished successfully, we can call the after_flush<br>    hook. Being invoked here, we have the guarantee that the hook is<br>    executed before the before/after_send_hooks on the dump thread<br>    preventing race conditions among these plug-ins.<br>  */<br>  if (flush_error == 0) {<br>    const char *file_name_ptr = log_file_name + dirname_length(log_file_name);<br>    assert(flush_end_pos != 0);<br>    /*<br>      插桩调用 after_flush,将已经 flush 的 binlog file 和 position 注册到半同步复制插件中,<br>      用于后续对比 slave 应答接受到的 binlog position。<br>    */<br>    if (RUN_HOOK(binlog_storage, after_flush,<br>                 (thd, file_name_ptr, flush_end_pos))) {<br>      LogErr(ERROR_LEVEL, ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK);<br>      flush_error = ER_ERROR_ON_WRITE;<br>    }<br> <br>    // 如果 sync_binlog!=1,在 flush stage 更新 binlog 位点并广播 update 信号,从库的 Dump 线程可以由此感知 Binlog 的更新<br>    if (!update_binlog_end_pos_after_sync) update_binlog_end_pos();<br>  }<br>

    Flush stage 的主要处理逻辑集中在 process_flush_stage_queue

    <br>int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,<br>                                             bool *rotate_var,<br>                                             THD **out_queue_var) {<br> <br>  int no_flushes = 0;<br>  my_off_t total_bytes = 0;<br>  mysql_mutex_assert_owner(&LOCK_log);<br>  // 根据 innodb_flush_log_at_trx_commit 参数进行 redo log 的刷盘操作<br>  THD *first_seen = fetch_and_process_flush_stage_queue();<br> <br>  // 调用 get_server_sidno() 和 Gtid_state::get_automatic_gno 生成 GTID<br>  assign_automatic_gtids_to_flush_group(first_seen);<br>  /* Flush thread caches to binary log. */<br>  for (THD *head = first_seen; head; head = head->next_to_commit) {<br>    Thd_backup_and_restore switch_thd(current_thd, head);<br>    /*<br>      flush binlog_cache_mngr 的 stmt_cache和trx_cache。<br>      flush trx_cache:<br>        - 生成 last_committed 和 sequence_number<br>        - flush GTID log event<br>        - 将 trx_cache 中的数据 flush 到 binlog cache 中<br>        - 准备提交事务后的 Binlog pos<br>        - 递增 prepread XID<br>    */<br>    std::pair<int, my_off_t> result = flush_thread_caches(head);<br>    total_bytes += result.second;<br>    if (flush_error == 1) flush_error = result.first;<br>#ifndef NDEBUG<br>    no_flushes++;<br>#endif<br>  }<br> <br>  *out_queue_var = first_seen;<br>  *total_bytes_var = total_bytes;<br>  if (total_bytes > 0 &&<br>      (m_binlog_file->get_real_file_size() >= (my_off_t)max_size ||<br>       DBUG_EVALUATE_IF("simulate_max_binlog_size", true, false)))<br>    *rotate_var = true;<br>#ifndef NDEBUG<br>  DBUG_PRINT("info", ("no_flushes:= %d", no_flushes));<br>  no_flushes = 0;<br>#endif<br>  return flush_error;<br>}<br>

    redo log 刷盘的堆栈如下:

    <br>// 获取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 队列,flush 事务到磁盘;不再阻塞 slave 的 preserve commit order 的执行<br>|fetch_and_process_flush_stage_queue  <br><br>// 存储引擎层将 prepare 状态的 redo log 根据 innodb_flush_log_at_trx_commit 参数刷盘<br>|--ha_flush_logs                      <br>|----innobase_flush_logs<br>|------log_buffer_flush_to_disk<br>

    SYNC 阶段

    Sync 阶段的代码如下:

    /*<br>  Stage #2: Syncing binary log file to disk<br>*/<br> <br>if (change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log,<br>                 &LOCK_sync)) {<br>  DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br>                        thd->commit_error));<br>  return finish_commit(thd);<br>}<br> <br>/*<br>  - sync_counter:commit group的数量<br>  - get_sync_period():获取sync_binlog参数的值<br>  - 如果sync stage队列中的commit group大于等于sync_binlog的值,当前leader就调用fsync()进行刷盘操作(sync_binlog_file(false)),<br>    在sync之前可能会进行等待,等待更多的commit group入队,等待的时间为binlog_group_commit_sync_no_delay_count或binlog_group_commit_sync_delay,默认都为0。<br>  - 如果sync stage队列中的commit group小于sync_binlog的值,当前leader不会调用fsync()进行刷盘也不会等待<br>  - 如果sync_binlog为0,每个commit group都会触发等待动作,但是不会sync<br>  - 如果sync_binlog为1,每个commit group都会触发等待动作,且会sync<br>*/<br>if (!flush_error && (sync_counter + 1 >= get_sync_period()))<br>  Commit_stage_manager::get_instance().wait_count_or_timeout(<br>      opt_binlog_group_commit_sync_no_delay_count,<br>      opt_binlog_group_commit_sync_delay, Commit_stage_manager::SYNC_STAGE);<br> <br>final_queue = Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br>    Commit_stage_manager::SYNC_STAGE);<br> <br>if (flush_error == 0 && total_bytes > 0) {<br>  DEBUG_SYNC(thd, "before_sync_binlog_file");<br>  std::pair<bool, bool> result = sync_binlog_file(false);<br>  sync_error = result.first;<br>}<br> <br>/*<br> 如果sync_binlog==1,在sync stage阶段更新binog位点,并广播update信号,从库的Dump线程可以由此感知Binlog的更新<br> (位点在flush stage中的process_flush_stage_queue()<br>                       |--flush_thread_caches()<br>                       |-----set_trans_pos()函数中设置)<br>*/<br>if (update_binlog_end_pos_after_sync && flush_error == 0 && sync_error == 0) {<br>  THD *tmp_thd = final_queue;<br>  const char *binlog_file = nullptr;<br>  my_off_t pos = 0;<br> <br>  while (tmp_thd != nullptr) {<br>    if (tmp_thd->commit_error == THD::CE_NONE) {<br>      tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);<br>    }<br>    tmp_thd = tmp_thd->next_to_commit;<br>  }<br> <br>  if (binlog_file != nullptr && pos > 0) {<br>    update_binlog_end_pos(binlog_file, pos);<br>  }<br>}<br> <br>DEBUG_SYNC(thd, "bgc_after_sync_stage_before_commit_stage");<br> <br>leave_mutex_before_commit_stage = &LOCK_sync;<br>

    COMMIT 阶段

    Commit 阶段的代码如下:

      /*<br>    Stage #3: Commit all transactions in order.<br>  */<br>commit_stage:<br>  /* binlog_order_commits:是否进行 order commit,即保持 redo 和 binlog 的提交顺序一致 */<br>  if ((opt_binlog_order_commits || Clone_handler::need_commit_order()) &&<br>      (sync_error == 0 || binlog_error_action != ABORT_SERVER)) {<br>    if (change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue,<br>                     leave_mutex_before_commit_stage, &LOCK_commit)) {<br>      DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br>                            thd->commit_error));<br>      return finish_commit(thd);<br>    }<br>    THD *commit_queue =<br>        Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br>            Commit_stage_manager::COMMIT_STAGE);<br>    DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",<br>                    DEBUG_SYNC(thd, "before_process_commit_stage_queue"););<br> <br>    if (flush_error == 0 && sync_error == 0)<br>      /* after_sync hook */<br>      sync_error = call_after_sync_hook(commit_queue);<br> <br>    /*<br>      Commit 阶段的主要处理逻辑<br>    */<br>    process_commit_stage_queue(thd, commit_queue);<br> <br>    /**<br>     * After commit stage<br>     */<br>    if (change_stage(thd, Commit_stage_manager::AFTER_COMMIT_STAGE,<br>                     commit_queue, &LOCK_commit, &LOCK_after_commit)) {<br>      DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),<br>                            thd->commit_error));<br>      return finish_commit(thd);<br>    }<br> <br>    THD *after_commit_queue =<br>        Commit_stage_manager::get_instance().fetch_queue_acquire_lock(<br>            Commit_stage_manager::AFTER_COMMIT_STAGE);<br>    /* after_commit hook */<br>    process_after_commit_stage_queue(thd, after_commit_queue);<br> <br>    final_queue = after_commit_queue;<br>    mysql_mutex_unlock(&LOCK_after_commit);<br>  } else {<br>    if (leave_mutex_before_commit_stage)<br>      mysql_mutex_unlock(leave_mutex_before_commit_stage);<br>    if (flush_error == 0 && sync_error == 0)<br>      sync_error = call_after_sync_hook(final_queue);<br>  }<br> <br> <br>  /* 广播 m_stage_cond_binlog 信号变量,唤醒挂起的 follower */<br>  Commit_stage_manager::get_instance().signal_done(final_queue);<br>  DBUG_EXECUTE_IF("block_leader_after_delete", {<br>    const char action[] = "now SIGNAL leader_proceed";<br>    assert(!debug_sync_set_action(thd, STRING_WITH_LEN(action)));<br>  };);<br> <br>  /*<br>    Finish the commit before executing a rotate, or run the risk of a<br>    deadlock. We don't need the return value here since it is in<br>    thd->commit_error, which is returned below.<br>  */<br>  (void)finish_commit(thd);<br>  DEBUG_SYNC(thd, "bgc_after_commit_stage_before_rotation");<br> <br>  return thd->commit_error == THD::CE_COMMIT_ERROR;<br>}<br>

    Commit 阶段的主要处理逻辑集中在 process_commit_stage_queue 函数中:

    void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) {<br>  mysql_mutex_assert_owner(&LOCK_commit);<br>#ifndef NDEBUG<br>  thd->get_transaction()->m_flags.ready_preempt =<br>      true;  // formality by the leader<br>#endif<br>  for (THD *head = first; head; head = head->next_to_commit) {<br>    DBUG_PRINT("debug", ("Thread ID: %u, commit_error: %d, commit_pending: %s",<br>                         head->thread_id(), head->commit_error,<br>                         YESNO(head->tx_commit_pending)));<br>    DBUG_EXECUTE_IF(<br>        "block_leader_after_delete",<br>        if (thd != head) { DBUG_SET("+d,after_delete_wait"); };);<br>    /*<br>      If flushing failed, set commit_error for the session, skip the<br>      transaction and proceed with the next transaction instead. This<br>      will mark all threads as failed, since the flush failed.<br> <br>      If flush succeeded, attach to the session and commit it in the<br>      engines.<br>    */<br>#ifndef NDEBUG<br>    Commit_stage_manager::get_instance().clear_preempt_status(head);<br>#endif<br>    /*<br>      更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),<br>      并初始本事务上下文的 sequence number<br>    */<br>    if (head->get_transaction()->sequence_number != SEQ_UNINIT) {<br>      mysql_mutex_lock(&LOCK_replica_trans_dep_tracker);<br>      m_dependency_tracker.update_max_committed(head);<br>      mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker);<br>    }<br>    /*<br>      Flush/Sync error should be ignored and continue<br>      to commit phase. And thd->commit_error cannot be<br>      COMMIT_ERROR at this moment.<br>    */<br>    assert(head->commit_error != THD::CE_COMMIT_ERROR);<br>    Thd_backup_and_restore switch_thd(thd, head);<br>    bool all = head->get_transaction()->m_flags.real_commit;<br>    assert(!head->get_transaction()->m_flags.commit_low ||<br>           head->get_transaction()->m_flags.ready_preempt);<br>  // Binlog Commit、Innodb Commit<br>    ::finish_transaction_in_engines(head, all, false);<br>    DBUG_PRINT("debug", ("commit_error: %d, commit_pending: %s",<br>                         head->commit_error, YESNO(head->tx_commit_pending)));<br>  }<br> <br>  /*<br>    锁定 sidno,更新整组事务 的executed_gtid<br>    - 如果没开启 binlog,@@GLOBAL.GTID_PURGED 的值是从 executed_gtid 获取的,<br>      此时 @@GLOBAL.GTID_PURGED 的值和 @@GLOBAL.GTID_EXECUTED 永远是一致的,<br>      就不需要在记录 lost_gtids<br>    - 如果开启了 binlog,但是未开启 log_replica_updates,slave 的 SQL 线程或 slave worker 线程<br>      将自身的 GTID 更新到 executed_gtids、lost_gtids<br>  */<br>  gtid_state->update_commit_group(first);<br> <br>  for (THD *head = first; head; head = head->next_to_commit) {<br>    Thd_backup_and_restore switch_thd(thd, head);<br>    auto all = head->get_transaction()->m_flags.real_commit;<br>    // 只针对外部 XA 事务,在存储引擎层将事务标记为 Prepared<br>    trx_coordinator::set_prepared_in_tc_in_engines(head, all);<br>    /*<br>      在存储引擎层提交之后,递减 Prepared 状态下的 XID 计数器<br>    */<br>    if (head->get_transaction()->m_flags.xid_written) dec_prep_xids(head);<br>  }<br>}<br>

    其中 ::finish_transaction_in_engines  函数是主要的存储引擎层提交逻辑,相关堆栈如下:

    |::finish_transaction_in_engines<br>|--trx_coordinator::commit_in_engines<br>|----ha_commit_low<br><br>// Binlog 层提交什么也不做(空函数)<br>|------binlog_commit<br><br>// 存储引擎层提交<br>|------innobase_commit                                <br>|--------innobase_commit_low<br>|----------trx_commit_for_mysql<br><br>// 为持久化 GTID 提前分配 update undo segment<br>|------------trx_undo_gtid_add_update_undo  <br><br>// 更新数据字典中被修改表的 update_time 时间<br>|------------trx_update_mod_tables_timestamp     <br><br>// 分配 Mini-transaction handle 和 buffer<br>|------------trx_commit          <br><br>// 提交 mini-transaction<br>|--------------trx_commit_low                         <br>|----------------trx_write_serialisation_history<br><br>// 更新 undo 状态:<br>// 对于 insert 状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_TO_FREE<br>// update 修改为 TRX_UNDO_TO_PURGE<br>// 如果事务为 update 还需要将 rollback segments 分配 trx no,并将其添加到 purge 队列中<br>|------------------trx_undo_set_state_at_finish      <br><br>//将 update undo log header 添加到 history list 开头释放一些内存对象;<br>|------------------trx_undo_update_cleanup  <br><br> // 在系统事务表记录 binlog 位点<br>|------------------trx_sys_update_mysql_binlog_offset <br>|----------------trx_commit_in_memory<br><br>//- 关闭 mvcc read view<br>//- 持久化 GTID<br>//- 释放 insert undo log<br>//- 唤醒后台线程开始干活,如:master thread、purge thread、page_cleaner<br>

    阶段转换

    阶段转换的逻辑主要是由 change_stage 中的 enroll_for 函数实现:

    • 进入队列的第一个线程会作为整组事务的 leader
    • 后续进入队列的线程会作为整组事务的 follower
    • follower 线程挂起等待m_stage_cond_binlog信号变量唤醒
    • leader 负责提交整组事务,提交完成后,发送m_stage_cond_binlog 信号变量唤醒挂起的 follower
    • 队列转化的主要逻辑是线程先入下个阶段的队列,然后再释放上一个阶段的 mutex,然后再获取下一个阶段的 mutex
    • Flush Stage 不会获取 mutex
    • Sync Stage 需要获取 LOCK_sync
    • Commit Stage 需要获取 LOCK_commit mutex
    • After Commit Stage 需要获取 LOCK_after_commit mutex
    bool Commit_stage_manager::enroll_for(StageID stage, THD *thd,<br>                                     mysql_mutex_t *stage_mutex,<br>                                     mysql_mutex_t *enter_mutex) {<br> // 如果队列为空,线程就是 leader<br> thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket();<br> bool leader = this->append_to(stage, thd);<br><br> /*<br>  如果 FLUSH stage 队列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不为空,此线程就不能成为 leader。leader<br>  需要获取 enter_mutex<br> */<br> if (leader) {<br>   if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br>     leader = m_queue[BINLOG_FLUSH_STAGE].is_empty();<br>   /*<br>     leader 转换的逻辑。<br>     session 的队列有5种:<br>       - Binlog flush queue: flush redo 并写 Binlog File<br>       - Commit order flush queue: 针对 commit order 的事务,但是会参与 group commit 的开头部分,直到引擎层的 flush。<br>       - Sync queue: sync transaction<br>       - Commit queue: 提交事务<br>       - After commit queue: 调用事务的 after_commit hook<br>    */<br>   } else if (stage == BINLOG_FLUSH_STAGE &&  // 当前线程是 BINLOG_FLUSH_STAGE 中的第一个线程;但是 COMMIT_ORDER_FLUSH_STAGE<br>                                              // 已经有了 leader,此时当前线程会挂起,等待 COMMIT_ORDER_FLUSH_STAGE 的 leader 的信号唤醒<br>              !m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty()) {<br>     /*<br>       当前事务是 binlog queue 中的第一个线程,但是在 commit order queue 中已经有了一个 leader。<br>       此时当前线程会作为 leader,而 commit order leader 会转变为 follower。<br>       改变 leader 的原因是 commit order leader 不能作为 binlog 线程的 leader,因为 commit order threads<br>       必须在 binlog threads 操作完之前离开 commit group。<br>       转变 leader 为 followers 的步骤如下:<br>       1. commit order thread 首先进入 flush stage,并成为 commit order leader。<br>       2. commit order leader 尝试获取 stage mutex,这可能会需要一些时间,比如 mutex 已经被上一个<br>       commit group的leader获取。<br>       3. 在此期间,一个 binlog 线程进入了 flush stage。它需要等待来自 commit order leader 的信号。<br>       4. commit order leader 获取了 stage mutex,然后它会检查是否有  binlog thread进入了 flush stage,<br>       如果发现了就转变 leader。<br>       5. commit order leader 给  binlog leader发送一个信号,并成为 follower,等待 commit 的完成<br>       (和其他 follower 的行为一致)。<br>       6. binlog leader 被 commit order leader 的信号唤醒并执行 group commit。<br>     */<br>     CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait");<br>     while (thd->tx_commit_pending)<br>       mysql_cond_wait(&m_stage_cond_leader,<br>                       &m_queue_lock[BINLOG_FLUSH_STAGE]);<br>   }<br> }<br><br> unlock_queue(stage);<br><br> /*<br>   通知下一个组提交事务进入队列<br> */<br> if (stage == BINLOG_FLUSH_STAGE) {<br>   Commit_order_manager::finish_one(thd);<br>   CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait");<br> } else if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br>   Commit_order_manager::finish_one(thd);<br> }<br><br> /*<br>   当进入第一个 stage 时,可以不用获取 stage mutex<br> */<br> if (stage_mutex && need_unlock_stage_mutex) mysql_mutex_unlock(stage_mutex);<br><br> /*<br>   如果队列非空,当前线程作为 follower 等待 leader 处理队列<br> */<br> if (!leader) {<br>   CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait");<br>   mysql_mutex_lock(&m_lock_done);<br>#ifndef NDEBUG<br>   thd->get_transaction()->m_flags.ready_preempt = true;<br>   if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt);<br>#endif<br>   // tx_commit_pending:还有事务 commit 操作未完成<br>   while (thd->tx_commit_pending) {<br>     if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br>       mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);<br>     } else {<br>       // follower 线程在此处挂起,等待 leader 提交事务完成后被唤醒<br>       mysql_cond_wait(&m_stage_cond_binlog, &m_lock_done);<br>     }<br>   }<br><br>   mysql_mutex_unlock(&m_lock_done);<br>   return false;<br> }<br><br>#ifndef NDEBUG<br> if (stage == Commit_stage_manager::SYNC_STAGE)<br>   DEBUG_SYNC(thd, "bgc_between_flush_and_sync");<br>#endif<br><br> bool need_lock_enter_mutex = false;<br> if (leader && enter_mutex != nullptr) {<br>   /*<br>     如果由于在轮替 Binlog 时已经获取了 LOCK_log,就不在需要获取 enter_mutex。<br>   */<br>   need_lock_enter_mutex = !(mysql_bin_log.is_rotating_caused_by_incident &&<br>                             enter_mutex == mysql_bin_log.get_log_lock());<br><br>   if (need_lock_enter_mutex)<br>     mysql_mutex_lock(enter_mutex);<br>   else<br>     mysql_mutex_assert_owner(enter_mutex);<br> }<br><br> // leader 转换的逻辑<br> if (stage == COMMIT_ORDER_FLUSH_STAGE) {<br>   CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(<br>       "after_commit_order_thread_becomes_leader");<br>   lock_queue(stage);<br><br>   if (!m_queue[BINLOG_FLUSH_STAGE].is_empty()) {<br>     if (need_lock_enter_mutex) mysql_mutex_unlock(enter_mutex);<br><br>     THD *binlog_leader = m_queue[BINLOG_FLUSH_STAGE].get_leader();<br>     binlog_leader->tx_commit_pending = false;<br><br>     mysql_cond_signal(&m_stage_cond_leader);<br>     unlock_queue(stage);<br><br>     mysql_mutex_lock(&m_lock_done);<br>     /* wait for signal from binlog leader */<br>     CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(<br>         "before_commit_order_leader_waits_for_binlog_leader");<br>     while (thd->tx_commit_pending)<br>       mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);<br>     mysql_mutex_unlock(&m_lock_done);<br><br>     leader = false;<br>     return leader;<br>   }<br> }<br><br> return leader;<br><br>



                 本文关键字 
                 :#MySQL# #事务# #源码# 
    
              <h5 data-tool="mdnice编辑器"></h5> 

    文章推荐:

    技术分享 |  一文了解 MySQL Optimizer Trace 的神奇功效

    技术分享 | OceanBase 慢查询排查思路

    技术分享 | OceanBase写入限速源码解读

    故障分析 | innodb_thread_concurrency 导致数据库异常的问题分析

    故障分析 | OceanBase 频繁更新数据后读性能下降的排查

    故障分析 | MySQL 升级到 8.0 变慢问题分析

    技术分享 | 一招解决 MySQL 中 DDL 被阻塞的问题

    故障分析 | 一条本该记录到慢日志的 SQL 是如何被漏掉的


    关于 SQLE

    爱可生开源社区的 SQLE 是一款面向数据库使用者和管理者,支持多场景审核,支持标准化上线流程,原生支持 MySQL 审核且数据库类型可扩展的 SQL 审核工具。

    SQLE 获取

      <table> 
    
        <tr> 
         <th>类型</th> 
         <th>地址</th> 
        </tr> 
    
       <tbody> 
        <tr> 
         <td>版本库</td> 
         <td>https://github.com/actiontech/sqle</td> 
        </tr> 
        <tr> 
         <td>文档</td> 
         <td>https://actiontech.github.io/sqle-docs-cn/</td> 
        </tr> 
        <tr> 
         <td>发布信息</td> 
         <td>https://github.com/actiontech/sqle/releases</td> 
        </tr> 
        <tr> 
         <td>数据审核插件开发文档</td> 
         <td>https://actiontech.github.io/sqle-docs-cn/3.modules/3.7_auditplugin/auditplugin_development.html</td> 
        </tr> 
       </tbody> 
      </table> 

    提交有效 pr,高质量 issue,将获赠面值 200-500 元(具体面额依据质量而定)京东卡以及爱可生开源社区精美周边!

    更多关于 SQLE 的信息和交流,请加入官方QQ交流群:637150065

    本文分享自微信公众号 - 爱可生开源社区(ActiontechOSS)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。