HBase Compaction 原理与线上调优实践
作者:vivo 互联网存储技术团队- Hang Zhengbo
本文对 HBase Compaction 的原理、流程以及限流的策略进行了详细的介绍,列举了几个线上进行调优的案例,最后对 Compaction 的相关参数进行了总结。
一、Compaction 介绍
HBase 是基于一种 LSM-Tree(Log-Structured Merge Tree)体系架构的存储模型设计的,写入时先写入 WAL(Write-Ahead-Log)日志,再写入 Memstore 缓存,满足一定条件后,会执行 Flush 操作将缓存数据刷写到磁盘,生成一个 HFile 数据文件。随着数据不断写入,HFile 文件会越来越多,文件太多导致查询数据时 IO 次数增加,进而影响到 HBase 的查询性能。为了优化读的性能,采用合并小 HFile 的方法来减少文件数量,这种合并 HFile 的操作就称为 Compaction。Compaction 是从一个 Region 的一个 Store 中选择部分 HFile 文件进行合并的过程。合并原理是从这些待合并的数据文件中依次读出 KeyValue,由小到大排序后写入一个新的文件中。之后这个新生成的文件就会取代之前已合并的所有文件对外提供服务。
1.1 Compaction 的分类
HBase 根据合并规模将 Compaction 分为两类:Minor Compaction 和 Major Compaction。
-
Minor Compaction 是指选取部分小的、相邻的 HFile,将它们合并成一个更大的 HFile;
-
Major Compaction 是指将一个Store 中所有的 HFile 合并成一个 HFile,这个过程会清理三种无意义的数据:TTL 过期数据、被删除的数据与版本号超过设定版本号的数据。
下图形象的描述了2种 Compaction 的区别:
一般情况下,Major Compaction 持续时间比较长,整个过程消耗大量系统资源,因此线上数据量较大的业务通常推荐关闭自动触发 Major Compaction 功能,改为在业务低峰期手动触发(或设置策略自动在低峰期触发)。
1.2 Compaction的意义
合并小文件,减少文件数,提升读取性能,稳定随机读延迟;
合并的时候会读取远程 DataNode 上的文件写入本地 DataNode,提高数据的本地化率;
清除过期数据和被删除的数据,减少表的存储量。
1.3 Compaction触发时机
HBase 中触发 Compaction 的时机有很多种,最常见的触发时机有三种:后台线程周期性检查时触发、MemStore Flush 触发以及手动触发。
(1)后台线程周期性检查:后台线程 CompactionChecker 会定期检查是否需要执行 Compaction,检查周期为hbase.server.thread.wakefrequency *hbase.server.compactchecker.interval.multiplier,这里主要考虑的是一段时间内没有写入请求导致 Flush 触发不了 Compaction 的情况。其中参数hbase.server.thread.wakefrequency 默认值是10s,是 HBase 服务端线程唤醒时间间隔,参数hbase.server.compactchecker.interval.multiplier 默认值1000,是 Compaction 操作周期性检查乘数因子。10 * 1000 s 约等于 2hrs 46mins 40sec。
(2)MemStore Flush:Compaction 的根源在于 Flush,MemStore 达到一定阈值就会触发 Flush ,将内存中的数据刷写到磁盘生成 HFile 文件,随着 HFile 文件越来越多就需要执行 Compaction。HBase 每次 Flush之后,都会判断是否需要进行 Compaction,一旦满足 Minor Compaction 或 Major Compaction 的条件便会触发执行。
(3)手动:是指通过 HBase API、HBase Shell 或者 Master UI 界面等方式执行 compact、major_compact 等命令。
二、Compaction流程
了解完基本的背景后,接下来介绍 Compaction 的整个过程。
-
RegionServer 启动一个 Compaction 检查线程,定期对 Region 的 Store 进行检查;
-
Compaction 始于特定的触发条件。一旦触发,HBase 会将该 Compaction 交由一个独立的线程处理;
-
从对应的 Store 中选择合适的 HFile 文件,这步是整个 Compaction 的核心,选取文件时需要遵循很多条件,比如文件数既不能太多也不能太少、文件大小不能太大等,尽可能地选取承载 IO 负载重的文件集。基于此,HBase 实现了多种文件选取策略:常用的有RatioBasedCompactionPolicy、ExploringCompactionPolicy 和 StripeCompactionPolicy 等,也支持自定义的 Compaction 算法;
-
选出待合并的文件后,会根据这些 HFile 文件的总大小选择对应的线程池来进行处理;
-
对这些文件执行具体的 Compaction 操作。
下图简单的描述了上述流程。
下面对图2中具体的每一步进行详细说明。
2.1 启动 Compaction 定时线程
在 RegionServer 启动时,会初始化 CompactSplitThread 线程以及定时检查的 CompactionChecker ,默认10s执行一次。
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
其中 CompactSplitThread 是用来实现 Compaction 以及 Split 流程的类,而 CompactChecker 是用来周期性检查是否执行 Compaction 的。
CompactionChecker 是 ScheduledChore 类型,而 ScheduledChore 是 HBase定期执行的一个 Task。
2.2 触发 Compaction
Compaction 的触发时机在上面已经介绍过,下面对这3种触发机制进行详细的介绍。
2.2.1 后台线程周期性检查
后台线程 CompactionChecker 定期检查是否需要执行 Compaction,检查周期为hbase.regionserver.compaction.check.period(默认10s)。
(1)首先检查文件数是否大于可执行 Compaction 的文件数,一旦大于就会触发 Compaction。
(2)如果不满足,会接着检查是否到了 Major Compaction 的执行周期。如果当前 Store 中 HFile 的最早更新时间早于某个值 mcTime,就会触发 Major Compaction,其中 mcTime 是一个浮动值,浮动区间默认为[7-7*0.2,7+7*0.2],其中7为配置项 hbase.hregion.majorcompaction 设置,0.2为配置项 hbase.hregion.majorcompaction.jitter,所以在7天左右就会执行一次 Major Compaction。用户如果想禁用 Major Compaction,只需要将参数hbase.hregion.majorcompaction 设为0。
(3)如果到了 Major Compaction 的执行周期:
-
首先判断有几个 HFile 文件,如果只有1个文件,会判断是否有过期数据、本地化率是否比较低,如果都不满足就不做 Major Compaction;
-
如果大于1个文件,也会做 Major Compaction。
后台线程周期性检查的流程如图3所示。
下面是该线程的关键代码:
//ScheduledChore的run方法会一直调用chore函数
@Override
protected void chore() {
// 遍历instance下的所有online的region 进行循环检测
// onlineRegions是HRegionServer上存储的所有能够提供有效服务的在线Region集合;
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
// 取出每个region的store
for (Store s : r.getStores().values()) {
try {
// 检查是否需要compact的时间间隔 hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,multiplier默认1000;
long multiplier = s.getCompactionCheckMultiplier();
assert multiplier > 0;
// 未到multiplier的倍数跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查
if (iteration % multiplier != 0) continue;
// 需要合并的话,发起SystemCompaction请求,此处最终比较的是是否当前hfile数量减去正在compacting的文件数大于设置的compact min值。若满足则执行systemcompact
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+ " requests compaction");
} else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}
2.2.2 Memstore Flush 触发
Memstore Flush 会产生 HFile 文件,文件越来越多就需要 Compaction。因此在每次执行完 Flush 操作之后,都会对当前 Store 中的文件数进行判断,一旦文件数超过 Compaction 的阈值 ,就会触发 Compaction。这里需要强调的是,Compaction 是以 Store 为单位进行的,而在 Flush 触发条件下,整个 Region 的所有 Store 都会执行 Compaction,所以会在短时间内可能会执行多次 Compaction。下面是 Flush 操作触发 Compaction 的代码。
/**
* Flush a region.
* @param region Region to flush.
* @param emergencyFlush Set if we are being force flushed. If true the region
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
* @param forceFlushAllStores whether we want to flush all store.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
private boolean flushRegion(final Region region, final boolean emergencyFlush,
boolean forceFlushAllStores) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
lock.readLock().lock();
try {
// flush
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flush(forceFlushAllStores);
// 检查是否需要compact
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
// 检查是否需要split
boolean shouldSplit = ((HRegion)region).checkSplit() != null;
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
// 发起compact请求
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" + (region != null ? (" for region " +
Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
}
return true;
}
2.2.3 手动触发
手动触发就是通过命令或者 API 接口手动触发 Compaction,手动触发的原因有三个:
-
很多业务担心自动 Major Compaction 影响读写性能,因此会选择低峰期手动触发;
-
用户在执行完修改ttl的属性后希望立刻生效,执行手动触发 Major Compaction;
-
硬盘容量不够的情况下手动触发 Major Compaction 删除大量过期数据。
大多数都是基于第1点原因进行手动触发。
2.3 选择待合并的文件
Compaction 的核心就是选择合适的文件进行合并,因为合并文件的大小以及其当前承载的 IO 直接决定了 Compaction 的效果。希望能找到这样的文件:承载了大量 IO 请求但是文件大小很小,这样 Compaction 本身不会消耗太多 IO,而且合并完成之后对读的性能会有显著提升。现实情况可能大部分都不会是这样。目前 HBase 提供了多种 Minor Compaction 文件选择策略,通过配置项hbase.hstore.engine.class 设置。不管哪种策略,在执行之前都要做对文件做一些筛选操作,排除不符合条件的文件,以减少 Compaction 的工作量,减少对读写的影响。
-
排除当前正在执行 Compaction 的文件;
-
如果一个文件所有的记录都已经过期,则直接将文件删除;
-
排除过大的单个文件,如果文件大小大于 hbase.hstore.compaction.max.size(默认Long最大值)则被排除,不排除会产生大量 IO 消耗。
排除完后剩下的文件称为候选文件,接下来会再判断是否满足 Major Compaction 条件,如果满足,就会选择全部文件进行合并。判断条件有下面三条,只要满足其中一条就会执行 Major Compaction:
-
到了 Compaction 自动执行的周期且候选文件数小于 hbase.hstore.compaction.max(默认10),如果关掉自动 Major Compaction 执行则不适用;
-
Store 中含有 Reference 文件,Reference 文件是 Split Region 产生的临时引用文件,在 Compaction 过程中删除;
-
用户手动执行的 Major Compaction。
如果不满足上述执行条件,则为 Minor compaction。Minor Compaction 的策略有很多种,下面重点介绍 RationBasedCompactionPolicy(0.98之前的版本)、ExploringCompactionPolicy(0.98之后默认的版本) 和 StripeCompactionPolicy 的执行策略。
2.3.1 Compaction文件选择策略的建模
所谓的 Compaction 文件选择策略可以建模为下面的问题:
图中的每个数字表示了文件的 Sequence ID,数字越大则文件越新,很有可能刚刚Flush而成,意味着文件 Size 也可能越小。这样的文件在 Compaction 时优先选择,因此 Store下的 Storefile 文件会依据 Sequence ID 从小到大排序,依次标记为 f[0]、f[1]。。。。f[n-1],筛选策略就是要确定一个连续范围 [Start, End] 内的 Storefile 参与 Compaction。
Compaction 的目的是减少文件数量和删除无用的数据,优化读性能,Compaction 实现是将原文件的内容重写到新的文件,如果文件过大意味着 Compaction 的时间长,Compaction 过程中产生的 IO 放大越明显,因此文件筛选的准则是用最小的 IO 代价去合并减少最多的文件数。
Compaction 依赖两个先决条件:
-
所有 StoreFile 按照顺序进行排序(此顺序为:老文件在前,新文件在后);
-
参与 Compaction 的文件必须是连续的。
2.3.2 RationBasedCompactionPolicy
基本思想就是选择在固定 End 为最后一个文件的前提下(一般情况),从队列头开始滑动寻找 Start,直到 Start 满足下面的条件之一便停止扫描:
当前文件大小 < 比当前文件新的所有文件大小总和 * ratio,就是满足公式 f[start].size = hbase.store.compaction.min(默认为3),因为要保证本次 Compaction 的时候文件个数要大于配置的 Compaction 最小值。
下面附上 RationBasedCompactionPolicy 的具体逻辑代码。
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. > maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
ArrayList applyCompactionPolicy(ArrayList candidates,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
if (candidates.isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
// 获取文件合并比例:取参数hbase.hstore.compaction.ratio,默认为1.2
double ratio = comConf.getCompactionRatio();
if (mayUseOffPeak) {
// 取参数hbase.hstore.compaction.ratio.offpeak,默认为5.0
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
}
// get store file sizes for incremental compacting selection.
final int countOfFiles = candidates.size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
// tooFar表示后移动最大文件数位置的文件大小,也就是刚刚满足达到最大文件数位置的那个文件,从i至tooFar数目为合并时允许的最大文件数
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar = comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start = 0) {
start = filesToLeave;
}
}
candidates.subList(0, start).clear();
return candidates;
}
2.3.3 ExploringCompactionPolicy
该策略继承自 RatioBasedCompactionPolicy,不同的是 Ration 策略在找到一个合适的文件集合之后就停止扫描了,而 Exploring 策略会把 Storefile 列表划分成多个子队列,从中找出一个最优解参与 Compaction。最优解可以理解为:待合并文件数最多或者待合并文件数相同的情况下文件较小,这样有利于减少 Compaction 带来的 IO 消耗。算法流程可以描述为:
从头到尾遍历文件,判断所有符合条件的组合;
选择组合内文件数 >= minFiles,且 comConf.getMaxCompactSize(mayUseOffPeak)) { continue; } ++opts; if (size >= comConf.getMinCompactSize() && !filesInRatio(potentialMatchFiles, currentRatio)) { continue; } ++optsInRatio; if (isBetterSelection(bestSelection, bestSize, potentialMatchFiles, size, mightBeStuck)) { bestSelection = potentialMatchFiles; bestSize = size; bestStart = start; } } } if (bestSelection.size() == 0 && mightBeStuck) { LOG.debug("Exploring compaction algorithm has selected " + smallest.size() + " files of size "+ smallestSize + " because the store might be stuck"); return new ArrayList(smallest); } LOG.debug("Exploring compaction algorithm has selected " + bestSelection.size() + " files of size " + bestSize + " starting at candidate #" + bestStart + " after considering " + opts + " permutations with " + optsInRatio + " in ratio"); return new ArrayList(bestSelection); }
2.3.4 StripeCompactionPolicy
Stripe Compaction (HBASE-7667)还是为了减少 Major Compaction 的压力而提出的。其思想是:减少 Major Compaction 压力最直接办法是减少 Region 的大小,最好整个集群都是由很多小 Region 组成,这样参与 Compaction 的文件总大小就必然不会太大。可是 Region 设置过小会导致 Region 数量很多,这一方面会导致 HBase 管理 Region 的开销很大,另一方面 Region 过多也要求 HBase 能够分配更多的内存作为 Memstore 使用,否则有可能导致整个 RegionServer 级别的 Flush,进而引起长时间的写阻塞。因此单纯地通过将 Region 大小设置过小并不能本质解决问题。
(1) Level Compaction
社区开发者借鉴了 Leveldb 的 Compaction 策略 Level Compaction。Level Compaction 设计思路是将 Store 中的所有数据划分为很多层,每一层都会有一部分数据,如下图所示:
数据组织形式不再按照时间前后进行组织,而是按照 KeyRange 进行组织,每个 KeyRange 中会包含多个文件,这些文件所有数据的 Key 必须分布在同一个范围。比如 Key 分布在 Key0 - KeyN之间的所有数据都会落在第一个 KeyRange 区间的文件中,Key 分布在 KeyN+1 - KeyT 之间的所有数据会分布在第二个区间的文件中,以此类推。
整个数据体系会被划分为很多层,最上层(Level 0)表示最新数据,最下层(Level 6)表示最旧数据。每一层都由大量 KeyRange 块组成(Level 0除外),KeyRange 之间没有 Key 重合。而且层数越大,对应层的每个 KeyRange 块大小越大,下层 KeyRange 块大小是上一层大小的10倍。图中 Range 颜色越深,对应的 Range 块越大。
数据从 Memstore 中 Flush 之后,会首先落入 Level 0,此时落入 Level 0 的数据可能包含所有可能的 Key。此时如果需要执行 Compaction,只需要将 Level 0 中的 KV 一个一个读出来,然后按照 Key 的分布分别插入 Level 1 中对应 KeyRange 块的文件中,如果此时刚好 Level 1 中的某个KeyRange 块大小超过了一定阈值,就会继续往下一层合并。
Level Compaction 依然会有 Major Compaction 的概念,发生 Major Compaction 只需要将 Range 块内的文件执行合并就可以,而不需要合并整个 Region 内的数据文件。
可见,这种 Compaction 在合并的过程中,从上到下只需要部分文件参与,而不需要对所有文件执行 Compaction 操作。另外,Level Compaction 还有另外一个好处,对于很多只读最近写入数据’的业务来说,大部分读请求都会落到 Level 0,这样可以使用 SSD 作为上层 Level 存储介质,进一步优化读。然而,这种 Compaction 因为 Level 层数太多导致 Compaction 的次数明显增多,经过测试,发现这种 Compaction 并没有对 IO 利用率有任何提升。
(2)Stripe Compaction
虽然原生的 Level Compaction 并不适用于 HBase,但是这种 Compaction 的思想却激发了HBase 研发者的灵感,再结合之前提到的小 Region 策略,就形成了 Stripe Compaction。
同 Level Compaction 相同,Stripe Compaction 会将整个 Store 中的文件按照 Key 划分为多个 Range,称为 Stripe,Stripe 的数量可以通过参数设定,相邻的 Stripe 之间 Key 不会重合。Stripe 类似于 Sub-Region 的概念,即将一个大 Region 切分成了很多小的 Sub-Region。
随着数据写入,Memstore 执行 Flush 之后形成 HFile,这些 HFile 并不会马上写入对应的 Stripe,而是放到一个称为 L0 的地方,用户可以配置 L0 可以放置 HFile 的数量。一旦 L0 放置的文件数超过设定值,系统就会将这些 HFile 写入对应的 Stripe:首先读出 HFile 的 KVs,再根据每个 KV 的 Key 定位到具体的 Stripe,将该 KV 插入对应 Stripe 的文件中即可,如图6所示。由于 Stripe 是个小的 Region,所以 Compaction 并不会太多消耗系统资源。另外,读取数据时,根据对应的 Key 查找到对应的 Stripe,然后在 Stripe 内部执行查找,因为 Stripe 内数据量相对很小,所以一定程度上也可以提升数据查找性能。
2.4 执行 Compaction 操作
挑选好待合并文件后,就是执行真正的合并。合并流程主要分为以下几步: