RocksDB源码学习(八): 写(四)-Memtable


本篇博客将从源码层面分析 RocksDB 中 memtable 的创建与写入,且不考虑 pipelined_write,所用代码版本为 v7.7.4


当 WriteGroup 写完 WAL 后,就开始写 memtable,写分为两大类,parallel 和 ! parallel。前者的话,Leader 会唤醒 WriteGroup 中的所有 Writer,一起并行写入,后者的话,Leader 独自负责整个 WriteGroup 的串行写入。二者仅仅是 WriteBatch 的串行写和并行写的区别,而每个 WriteBatch 的写入,实现方式还是一致的。本篇博客以 ! parallel 为例,分析写 memtable 的源码。

还是在 WriteImpl() ,WAL 写完之后,会做一些 seq 和 callback 的操作,随后进入 memtable 的写流程:

// DBImpl::WriteImpl
        if (!parallel) {
        // w.sequence will be set inside InsertInto
        w.status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, column_family_memtables_.get(),
            &flush_scheduler_, &trim_history_scheduler_,
            write_options.ignore_missing_column_families,
            0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
            batch_per_txn_);
      } else {
        write_group.last_sequence = last_sequence;
        write_thread_.LaunchParallelMemTableWriters(&write_group);
        in_parallel_group = true;

        // Each parallel follower is doing each own writes. The leader should
        // also do its own.
        if (w.ShouldWriteToMemtable()) {
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          w.status = WriteBatchInternal::InsertInto(
              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
              &trim_history_scheduler_,
              write_options.ignore_missing_column_families, 0 /*log_number*/,
              this, true /*concurrent_memtable_writes*/, seq_per_batch_,
              w.batch_cnt, batch_per_txn_,
              write_options.memtable_insert_hint_per_batch);
        }
      }

可以看到,不管是并行还是非并行,都是调用了 WriteBatchInternal::InsertInto(),因为这个函数有三个重载:

static Status InsertInto(WriteThread::WriteGroup& write_group, xxx)
static Status InsertInto(const WriteBatch* batch, xxx)
static Status InsertInto(WriteThread::Writer* writer, xxx)

在非并行的情况下,就是调用的第一个重载,Leader 传入整个 WriteGroup,我们以此作为分析入口。

InsertInto

函数完整源码如下:

Status WriteBatchInternal::InsertInto(
    WriteThread::WriteGroup& write_group, SequenceNumber sequence,
    ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
    TrimHistoryScheduler* trim_history_scheduler,
    bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
    bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
  MemTableInserter inserter(
      sequence, memtables, flush_scheduler, trim_history_scheduler,
      ignore_missing_column_families, recovery_log_number, db,
      concurrent_memtable_writes, nullptr /* prot_info */,
      nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
  for (auto w : write_group) {
    if (w->CallbackFailed()) {
      continue;
    }
    w->sequence = inserter.sequence();
    if (!w->ShouldWriteToMemtable()) {
      // In seq_per_batch_ mode this advances the seq by one.
      inserter.MaybeAdvanceSeq(true);
      continue;
    }
    SetSequence(w->batch, inserter.sequence());
    inserter.set_log_number_ref(w->log_ref);
    inserter.set_prot_info(w->batch->prot_info_.get());
    w->status = w->batch->Iterate(&inserter);
    if (!w->status.ok()) {
      return w->status;
    }
    assert(!seq_per_batch || w->batch_cnt != 0);
    assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
  }
  return Status::OK();
}

先介绍一下 sequence 是啥。上一篇博客有讲,在写 WAL 时,RocksDB 会给 WriteGroup 分配一个 seq num,值为 last_sequence + 1,就是最新的 seq num。这里的 sequence 就是它,和 WAL 中的那个对应。

在循环体内,RocksDB 给 WriteGroup 中的每一个 Writer 以及每一个 WriteBatch 都设置了这个 sequence,应该是 WAL 恢复 memtable 时用来对应的,我现在也没搞清。接下来,会给每一个 WriteBatch 调用 WriteBatch::Iterate(),其完整源码如下:

Status WriteBatch::Iterate(Handler* handler) const {
  if (rep_.size() < WriteBatchInternal::kHeader) {
    return Status::Corruption("malformed WriteBatch (too small)");
  }
  return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
                                     rep_.size());
}

这里解释一下 WriteBatchInternal::kHeader 是什么,先来看看注释是怎么说的:

// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static constexpr size_t kHeader = 12;

WriteBatch::rep_ 的结构为:8 字节的 seq num,4 字节的 record cnt,后面就是 record 数组了,如下:

// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]

因此,kHeader 就是 WriteBatch 中 record[] 的首址。接下来,我们进入 WriteBatchInternal::Iterate(),其主要源码如下:


文章作者: SrcMiLe
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 SrcMiLe !
评论
  目录