LevelDB将用户写入的数据临时存储在MemTable中,内存数据足够多后才排序写入SSTable文件(避免写出过小的文件)。那么如果在写磁盘之前,系统崩溃了怎么办呢?作为数据库系统,最基本的要求就是保证数据持久,即向用户承诺已写成功的数据不会丢失,无论是系统重启还是进程退出。经典的解决方案是在向用户返回写操作成功之前,首先在磁盘上将这个写操作记录在日志文件中。一旦写日志成功,后续操作便可以放心的只在内存中进行了。当数据库因为任何原因重启时,它会首先重播日志文件中的操作记录,保证之前承诺过的操作不会丢失。
写操作分为两种:插入和删除。LevelDB允许用户原子地执行若干插入和删除,或者全部成功或者全部失败。这些写操作放在一起称作write batch,write batch是日志记录中的最小单位。
图1:日志中的一条记录,即一个write batch(格式详见include/write_batch.h和db/write_batch.cc)
每个写操作的格式如下:日志文件是由若干个32KB的定长数据块组成的,每个块头有整个块的校验和。过长的write batch会被拆开保存到几个连续的数据块中。LevelDB的作者在文档中提到了使用定长数据块的好处(doc/log_format.txt),笔者在此简单复述和解释一下 :
- 若日志文件部分损坏,我们可以直接定位到下一个32KB的数据块,继续读接下来的数据,不至于整个日志文件读不出。
- 容易对数据进行切分,特别适合用于MapReduce。MapReduce需要在不读出文件内容的情况下对文件按chunk进行切分,典型的chunk长度是64MB,32KB对齐的数据块自然也是64MB对齐的。考虑到过长的write batch会跨chunk边界分布在的连续的块中,这里需要特殊处理一下。
- 处理大记录时不需要分配额外的缓冲区。这里不太明白,无论在写数据还是从重播日志,都需要将数据复制到MemTable中。(原文:We do not need to put extra buffering for large records)
坏处是这样做会造成空间浪费:
- 一个32KB定长数据块只存储一个小write batch,比如key/value都只有几字节。
- 没有压缩。
好在日志文件占空间较小,并且是在SSTable写入磁盘后就会被删除,所以造成的空间浪费并不严重。
保证日志文件正确无误非常十分重要。在系统崩溃或掉电后所有内存数据都会丢失(包括还在系统page cache中的数据),在Linux系统中,只有fsync()成功后的数据才保证写入磁盘不丢失。在响应用户读写请求的流程中,fsync()是一个耗时的操作。在传统机械硬盘和ext3文件系统环境下,开启fsync后顺序写32KB数据每秒仅能完成百次左右,这直接影响数据库的写入性能。
首先不是所有的数据库数据都重要到立即fsync()的程序(比如在桌面应用的本地数据就不值得)。LevelDB为用户提供了一个开关控制每个写操作是否需要进行fsync()。但到目前的版本为止,LevelDB都没有提供周期性触发fsync()的折衷方案(MySQL和MongoDB都可以使用这个方案,典型的触发周期是几百亳秒)。
另一方面,服务器应用中通常写入操作是多线程同时进行的。为了防止写日志在高并发下成为瓶颈,LevelDB将不同线程同时产生的日志数据聚合在一起写入文件并进行一次fsync(),而非对每个线程写的数据进行单独fsync()。这个做法称为group commit。这样一来,虽然每秒完成的fsync()数量无法突破硬件限制,但实际上每次写入的数据量包含更多数据,对用户来说这样也就可以提高写入性能。顺便提一句:这部分代码写得短小精悍,值得学习(db/db_impl.cc中的DBImpl::Write()方法,或见下文)。
图3:线程1已完成,线程2和线程3的将日志合并在一起写入文件,只需进行一次write()和fsync()
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); // 并发写的线程在这里排队 while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } // 排在队首的线程负责将数据聚合起来写入日志。 // 只有排在队首的线程会进入这里。其它线程则等待被通知。 // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == NULL); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != NULL) { // NULL batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); // 将排在队首的几个batch聚合在一起 WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); // 写日志 if (status.ok() && options.sync) { status = logfile_->Sync(); } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); // 写MemTable } mutex_.Lock(); } if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } // 通知这些在等待的线程,我们已经将日志写完了 while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); } return status; }
参考资料:
LevelDB官方性能测试报告