LevelDB源码: Compaction的流程(一)


博客只能分析部分,完整分析见代码注释:LevelDB-annotated

上一篇博客分析了 Flush 流程,本篇开始分析 Compaction 流程,后者相对于前者要复杂一些,主要体现在 Compaction SST 的选择。

Flush 和 Compactoin 有统一的入口:BackgroundCompaction(),故从此开始分析

BackgroundCompaction

该函数是一个 BGWork,也即需要通过 env_ -> Schedule() 来另起新线程来执行,因此函数体中的所有内容都是在新的线程中执行的。该函数任务就两个:

  1. imm_ 不为空,则执行 Flush;
  2. 否则执行 Compaction;

任务1上一篇讨论过了,本篇只讨论任务2。

在贴源码之前,先总结出该函数 Compaction 的主要流程,如下:

  1. 生成 Compaction 对象 c,该对象包括要压缩的 level 和 SST,后者称为 input[],其中,input[0] 指层1中需要压缩的 SST,input[1] 指层2中需要压缩的 SST。Compaction 分为两类:
    1. ManualCompaction,手动,即靠用户指定需要压缩的 level 和 input[];
    2. 自动 Compaction,也就是根据策略选择出需要压缩的 level 和 input[],是我们的主要讨论对象;
  2. 判断是否 IsTrivialMove()。IsTrivialMove 指:本次Compaction不需要合并,仅仅将SST往下移一层即可。并不是 Compaction 的核心内容,这里不用过多留意。
  3. 根据对象 c 执行 Compaction。
  4. 执行完毕后对 Compaction 所在的 Version 进行解引用。

函数源码以及注释如下:

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  // imm_不为空的话,则执行Flush
  // 执行完后返回即可
  if (imm_ != nullptr) {
    CompactMemTable();
    return;
  }

  // 开始Compaction
  Compaction* c;
  bool is_manual = (manual_compaction_ != nullptr);
  InternalKey manual_end;
  
  // 生成Compaction对象c
  if (is_manual) {
    // 手动Compaction,即指定输入
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    // 如果c为空,那可以认为Compaction已经完成
    m->done = (c == nullptr);
    if (c != nullptr) {
      // 获取input[0]层的最后一个SST的最后一个key,给manual_end
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
    Log(options_.info_log,
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
        m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  } else {
    // 自动Compaction,即按照策略选择输入
    c = versions_->PickCompaction();
  }

  Status status;
  if (c == nullptr) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {
    // IsTrivialMove:
    // 本次Compaction不需要合并,仅仅将SST往下移一层即可
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->RemoveFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
                       f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    VersionSet::LevelSummaryStorage tmp;
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
        static_cast<unsigned long long>(f->number), c->level() + 1,
        static_cast<unsigned long long>(f->file_size),
        status.ToString().c_str(), versions_->LevelSummary(&tmp));
  } else {
    // 常规的Compaction
    CompactionState* compact = new CompactionState(c);
    // 执行Compaction
    status = DoCompactionWork(compact);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    // 对所在的Version解引用
    c->ReleaseInputs();
    RemoveObsoleteFiles();
  }
  delete c;

  // ....
  // 一些收尾工作
}

ManualCompaction 就不深究了,我们重点看 PickCompaction(),也就是选择 Compaction 输入。

PickCompaction

看之前,先概括一下 LevelDB 选择 Compaction 输入的策略:

LevelDB 会给每一个 level 分配一个 score,其值于该层的大小有关,大小约接近阈值 score 就越大,当然 L0 有特殊处理。score >= 1 的 level 为需要 Compaction 。每次要 Compaction 时,LevelDB 会选出 score 最大的 level 及其下一层来作为输入层。

但很不巧的是,score 的计算与 level 的选择并不由 PickCompaction 决定,它只是获取已经计算好的 level 而已。计算工作在 Version & VersionEdit 那一部分中完成,这个 level 记录在当前的 Version 中。

实际上,除了 ManualCompaction,Compaction 的触发有两种形式:

  1. size_compaction:就是上文说的 score >= 1,由大小超过阈值触发;
  2. seek_compaction:由 seek 频率过高触发;

我们只讨论第1种。函数的主要流程如下:

  1. 获取输入层 level;
  2. 根据 level 生成初始化的 Compaction 对象 c,后续的操作多为确定 c 的字段;
  3. 遍历该 level 中的所有 SST,选出第一个 >=compact_pointer_ 的SST作为输入的第一个SST,即 input[0] 中的第一个元素。其中 compact_pointer_ 记录在 VersionSet 中,意为:每一层中下一次 Compaction 的起点;
  4. 如果上一步没有选出输入,那么直接用该 level 的第一个 SST 作为输入;
  5. 给当前 Version 加引用;
  6. 如果输入的层1是 level0,那么就选出所有重叠的 SST;
  7. 获得 level+1 层的输入,作为 input[1],即和 input[0] 中 SST 重叠的所有 SST;

函数源码以及注释如下:

Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  // 两种Compaction触发情况:
  // 1.由大小超过阈值触发
  const bool size_compaction = (current_->compaction_score_ >= 1);
  // 2.由seek频率过高触发
  const bool seek_compaction = (current_->file_to_compact_ != nullptr);

  if (size_compaction) {
    // LevelDB Compaction思路:
    // 给每一level计算一个score,score最高且>=1的level优先Compaction
    // 这个level_会记录在当前Version中

    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level + 1 < config::kNumLevels);
    // 生成Compaction对象c
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    // 遍历该level,选出第一个>=compact_pointer_的SST作为输入的第一个SST
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }

    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      // 如果上一步没有选出输入
      // 那么直接用该level的第一个SST作为输入
      c->inputs_[0].push_back(current_->files_[level][0]);
    }

  } else if (seek_compaction) {
    // seek_compaction的输入由file_to_compact_决定
    level = current_->file_to_compact_level_;
    c = new Compaction(options_, level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return nullptr;
  }

  c->input_version_ = current_;
  // 当前Version加引用
  c->input_version_->Ref();

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    // 如果是level0,那么就选出所有重叠的SST
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  // 获得level+1层的输入,作为input[1]
  // 即和input[0]中SST重叠的所有SST
  SetupOtherInputs(c);

  return c;
}

回到 BackgroundCompaction() 中,PickCompaction() 构造完对象 c 之后,交给 DoCompactionWork() 执行压缩。也就是输入 SST 的删除与输出 SST 的生成,均由后者完成。

DoCompactionWork


文章作者: SrcMiLe
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 SrcMiLe !
评论
  目录