leveldb使用SSTable格式来保存数据。
格式为:(当前没有META BLOCK)
SSTABLE = |DATA BLOCK1|DATA BLOCK2|...|DATA BLOCK N|META BLOCK1|...|META BLOCK N|META INDEX BLOCK|DATA INDEX BLOCK|Footer|
DATA BLOCK = |KeyValues|Restart arrays|array size|Compress Type|CRC
Footer(定长) = META INDEX BLOCK offset | DATA Index Block offset| Magic Numbers
比较细节的地方是数据块的压缩,针对key使用了前缀压缩法。
下面看看具体的实现。
//builder.cc
//dbname:数据库名称
//env:OS接口
//iter:指向MemTable的一个iterator
Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
//生成文件名:格式 "0000x.sst"
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
//创建一个可写文件
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
//TableBuilder负责table生成和写入
TableBuilder* builder = new TableBuilder(options, file);
//META:最小key
meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
//META:最大key
meta->largest.DecodeFrom(key);
//增加数据到builder
builder->Add(key, iter->value());
}
// Finish and check for builder errors
if (s.ok()) {
//完成写入
s = builder->Finish();
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
} else {
builder->Abandon();
}
delete builder;
// Finish and check for file errors
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
//sync & close,写入磁盘
s = file->Close();
}
delete file;
file = NULL;
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
meta->number,
meta->file_size);
s = it->status();
delete it;
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->DeleteFile(fname);
}
return s;
}
} // namespace leveldb
我们来看看TableBuilder类,主要的细节都在这个类中实现了
TableBuilder中含有一个Rep的数据结构的指针,主要是用于保存builder的一些状态和数据。为什么不在TableBuilder头文件中直接定义这些变量?主要是不想暴露过多的细节给使用者,真是一个很好的做法。
struct TableBuilder::Rep {
Options options;
Options index_block_options;
WritableFile* file; //sstable文件指针
uint64_t offset;
Status status;
BlockBuilder data_block; //数据块
BlockBuilder index_block; //索引块
std::string last_key;//上一次的key,用于比较和建立索引
int64_t num_entries; //
bool closed; // 是否结束
bool pending_index_entry; //是否要新增索引块
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
};
新加一条记录:
//增加一条数据记录
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
//检查是不是顺序添加
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) { //是否生成新的index block
//检查当前是否是一个新的BLOCK
assert(r->data_block.empty());
//根据当前的key和上一个DATA BLOCK的最后一个主键生成最短的索引
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
//增加新的INDEX BLOCK,但不立即写入
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
//检查是否已经达到BLOCK SIZE,默认4K
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
//写一个DATA BLOCK
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
}
//写BLOCK
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
//文件格式: 数据+类型(1个字节)+ CRC(4个字节)
assert(ok());
Rep* r = rep_;
//生成binary
Slice raw = block->Finish();
//压缩数据
Slice block_contents;
CompressionType type = r->options.compression;
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
r->compressed_output.clear();
block->Reset();
}
完成文件的写入:
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle metaindex_block_handle;
BlockHandle index_block_handle;
if (ok()) {
//写入META INDEX BLOCK
BlockBuilder meta_index_block(&r->options);
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
//写入索引块
WriteBlock(&r->index_block, &index_block_handle);
}
if (ok()) {
//写入Footer,包含META INDEX BLOCK和INDEX HANDLE的offset
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
这里面有两个类BlockBuilder和BlockHandle,BlockBuilder负责把数据按照一定格式进行序列化,而BlockHandle负责记录offset,size等,可以理解为BLOCK的文件中指针。
我们看看BlockBuilder的实现,这里leveldb实现了前缀压缩法,因为一个BLOCK的key很接近,所以前后两个key相差不会很大,所以采取了<shared_size><non_shared_size><value_size><non_shared_data><value_data>的格式,节省了空间。
其中size采用了变长格式,很有意思的格式,主要是针对小整形做的一个优化,用最多8个字节来表示4个字节的整形,每个byte的最高一个bit用来指示还有没有后续数据,如果最高位为0,则表示没有后续的bytes.这样小于7F的数据只需要一个字节来表示。
可以参考
这篇文章具体看实现variant32格式。
//完成写入
Slice BlockBuilder::Finish() {
// 写入restart数组,每隔options_->block_restart_interval(default:16)生成一个restart offset
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
//写入restart的大小
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= options_->block_restart_interval);
assert(buffer_.empty() // No values yet?
|| options_->comparator->Compare(key, last_key_piece) > 0);
size_t shared = 0;
//counter_内部计数器,用于记录当前restart后的个数
if (counter_ < options_->block_restart_interval) {
//看看当前的key和上一个有多少相同的bytes
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared;
// 写入 "<shared><non_shared><value_size>" to 缓冲
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());
// 写入 non_shared data和value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// 设置 last_key_ 等于 当前的key
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}