博客只能分析部分,完整分析见代码注释:LevelDB-annotated
上一篇博客分析了 Flush 流程,本篇开始分析 Compaction 流程,后者相对于前者要复杂一些,主要体现在 Compaction SST 的选择。
Flush 和 Compactoin 有统一的入口:BackgroundCompaction(),故从此开始分析
BackgroundCompaction
该函数是一个 BGWork,也即需要通过 env_ -> Schedule() 来另起新线程来执行,因此函数体中的所有内容都是在新的线程中执行的。该函数任务就两个:
- imm_ 不为空,则执行 Flush;
- 否则执行 Compaction;
任务1上一篇讨论过了,本篇只讨论任务2。
在贴源码之前,先总结出该函数 Compaction 的主要流程,如下:
- 生成 Compaction 对象 c,该对象包括要压缩的 level 和 SST,后者称为 input[],其中,input[0] 指层1中需要压缩的 SST,input[1] 指层2中需要压缩的 SST。Compaction 分为两类:
- ManualCompaction,手动,即靠用户指定需要压缩的 level 和 input[];
- 自动 Compaction,也就是根据策略选择出需要压缩的 level 和 input[],是我们的主要讨论对象;
- 判断是否 IsTrivialMove()。IsTrivialMove 指:本次Compaction不需要合并,仅仅将SST往下移一层即可。并不是 Compaction 的核心内容,这里不用过多留意。
- 根据对象 c 执行 Compaction。
- 执行完毕后对 Compaction 所在的 Version 进行解引用。
函数源码以及注释如下:
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
// imm_不为空的话,则执行Flush
// 执行完后返回即可
if (imm_ != nullptr) {
CompactMemTable();
return;
}
// 开始Compaction
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
// 生成Compaction对象c
if (is_manual) {
// 手动Compaction,即指定输入
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
// 如果c为空,那可以认为Compaction已经完成
m->done = (c == nullptr);
if (c != nullptr) {
// 获取input[0]层的最后一个SST的最后一个key,给manual_end
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
// 自动Compaction,即按照策略选择输入
c = versions_->PickCompaction();
}
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// IsTrivialMove:
// 本次Compaction不需要合并,仅仅将SST往下移一层即可
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
// 常规的Compaction
CompactionState* compact = new CompactionState(c);
// 执行Compaction
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
// 对所在的Version解引用
c->ReleaseInputs();
RemoveObsoleteFiles();
}
delete c;
// ....
// 一些收尾工作
}
ManualCompaction 就不深究了,我们重点看 PickCompaction(),也就是选择 Compaction 输入。
PickCompaction
看之前,先概括一下 LevelDB 选择 Compaction 输入的策略:
LevelDB 会给每一个 level 分配一个 score,其值于该层的大小有关,大小约接近阈值 score 就越大,当然 L0 有特殊处理。score >= 1 的 level 为需要 Compaction 。每次要 Compaction 时,LevelDB 会选出 score 最大的 level 及其下一层来作为输入层。
但很不巧的是,score 的计算与 level 的选择并不由 PickCompaction 决定,它只是获取已经计算好的 level 而已。计算工作在 Version & VersionEdit 那一部分中完成,这个 level 记录在当前的 Version 中。
实际上,除了 ManualCompaction,Compaction 的触发有两种形式:
- size_compaction:就是上文说的 score >= 1,由大小超过阈值触发;
- seek_compaction:由 seek 频率过高触发;
我们只讨论第1种。函数的主要流程如下:
- 获取输入层 level;
- 根据 level 生成初始化的 Compaction 对象 c,后续的操作多为确定 c 的字段;
- 遍历该 level 中的所有 SST,选出第一个 >=compact_pointer_ 的SST作为输入的第一个SST,即 input[0] 中的第一个元素。其中 compact_pointer_ 记录在 VersionSet 中,意为:每一层中下一次 Compaction 的起点;
- 如果上一步没有选出输入,那么直接用该 level 的第一个 SST 作为输入;
- 给当前 Version 加引用;
- 如果输入的层1是 level0,那么就选出所有重叠的 SST;
- 获得 level+1 层的输入,作为 input[1],即和 input[0] 中 SST 重叠的所有 SST;
函数源码以及注释如下:
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
// 两种Compaction触发情况:
// 1.由大小超过阈值触发
const bool size_compaction = (current_->compaction_score_ >= 1);
// 2.由seek频率过高触发
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
// LevelDB Compaction思路:
// 给每一level计算一个score,score最高且>=1的level优先Compaction
// 这个level_会记录在当前Version中
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
// 生成Compaction对象c
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
// 遍历该level,选出第一个>=compact_pointer_的SST作为输入的第一个SST
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
// 如果上一步没有选出输入
// 那么直接用该level的第一个SST作为输入
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
// seek_compaction的输入由file_to_compact_决定
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}
c->input_version_ = current_;
// 当前Version加引用
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
// 如果是level0,那么就选出所有重叠的SST
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
// 获得level+1层的输入,作为input[1]
// 即和input[0]中SST重叠的所有SST
SetupOtherInputs(c);
return c;
}
回到 BackgroundCompaction() 中,PickCompaction() 构造完对象 c 之后,交给 DoCompactionWork() 执行压缩。也就是输入 SST 的删除与输出 SST 的生成,均由后者完成。