本篇博客将从源码层面分析 RocksDB 中 memtable 的创建与写入,且不考虑 pipelined_write,所用代码版本为 v7.7.4
当 WriteGroup 写完 WAL 后,就开始写 memtable,写分为两大类,parallel 和 ! parallel。前者的话,Leader 会唤醒 WriteGroup 中的所有 Writer,一起并行写入,后者的话,Leader 独自负责整个 WriteGroup 的串行写入。二者仅仅是 WriteBatch 的串行写和并行写的区别,而每个 WriteBatch 的写入,实现方式还是一致的。本篇博客以 ! parallel 为例,分析写 memtable 的源码。
还是在 WriteImpl() ,WAL 写完之后,会做一些 seq 和 callback 的操作,随后进入 memtable 的写流程:
// DBImpl::WriteImpl
if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
} else {
write_group.last_sequence = last_sequence;
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// Each parallel follower is doing each own writes. The leader should
// also do its own.
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_,
write_options.memtable_insert_hint_per_batch);
}
}
可以看到,不管是并行还是非并行,都是调用了 WriteBatchInternal::InsertInto(),因为这个函数有三个重载:
static Status InsertInto(WriteThread::WriteGroup& write_group, xxx)
static Status InsertInto(const WriteBatch* batch, xxx)
static Status InsertInto(WriteThread::Writer* writer, xxx)
在非并行的情况下,就是调用的第一个重载,Leader 传入整个 WriteGroup,我们以此作为分析入口。
InsertInto
函数完整源码如下:
Status WriteBatchInternal::InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, recovery_log_number, db,
concurrent_memtable_writes, nullptr /* prot_info */,
nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
}
w->sequence = inserter.sequence();
if (!w->ShouldWriteToMemtable()) {
// In seq_per_batch_ mode this advances the seq by one.
inserter.MaybeAdvanceSeq(true);
continue;
}
SetSequence(w->batch, inserter.sequence());
inserter.set_log_number_ref(w->log_ref);
inserter.set_prot_info(w->batch->prot_info_.get());
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {
return w->status;
}
assert(!seq_per_batch || w->batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
}
return Status::OK();
}
先介绍一下 sequence 是啥。上一篇博客有讲,在写 WAL 时,RocksDB 会给 WriteGroup 分配一个 seq num,值为 last_sequence + 1,就是最新的 seq num。这里的 sequence 就是它,和 WAL 中的那个对应。
在循环体内,RocksDB 给 WriteGroup 中的每一个 Writer 以及每一个 WriteBatch 都设置了这个 sequence,应该是 WAL 恢复 memtable 时用来对应的,我现在也没搞清。接下来,会给每一个 WriteBatch 调用 WriteBatch::Iterate(),其完整源码如下:
Status WriteBatch::Iterate(Handler* handler) const {
if (rep_.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
rep_.size());
}
这里解释一下 WriteBatchInternal::kHeader 是什么,先来看看注释是怎么说的:
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static constexpr size_t kHeader = 12;
WriteBatch::rep_ 的结构为:8 字节的 seq num,4 字节的 record cnt,后面就是 record 数组了,如下:
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
因此,kHeader 就是 WriteBatch 中 record[] 的首址。接下来,我们进入 WriteBatchInternal::Iterate(),其主要源码如下: