我们知道,leveldb在写数据的时候,除了log文件,都要在内存中写一份。
先看看SkipList【跳表】这个数据结构:
SkipList有如下特点:
1. 本质上一个排序好的链表
2. 分层,上层节点比下层的少,更具有跳跃性
3. 查询的复杂度是O(logn)
SkipList跟红黑树等还是比较容易实现和理解的,主要长处是比较容易实现Lock free和遍历。
我们来看看leveldb的实现
插入:
//插入一个新的key
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
//查找插入节点,prev为各层的前置节点
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key));
//生成随机高度
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//设置当前最大高度
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
//生成新节点
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
//设置新节点的各层的下一跳
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
//设置前节点的next为当前节点,完成插入
prev[i]->SetNext(i, x);
}
}
查询:
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
Node* x = head_;
int level = GetMaxHeight() - 1; //从高层开始查找,依次到0 level
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) { //比next key 要大
// Keep searching in this list
x = next;
} else { //比next key小,查找下一层
//标记当前level的前置节点
if (prev != NULL) prev[level] = x;
if (level == 0) {
return next;
} else {
level--;
}
}
}
}
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, NULL);
if (x != NULL && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
接着我们看看leveldb MemTable的实现,很简单了,基本是对SkipList访问一个封装
<db/memtable.h>
class MemTable {
public:
explicit MemTable(const InternalKeyComparator& comparator);
//增加引用计数
void Ref() { ++refs_; }
//减少引用计数
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
//内存使用量
size_t ApproximateMemoryUsage();
//遍历操作
Iterator* NewIterator();
//插入
void Add(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
//查询
bool Get(const LookupKey& key, std::string* value, Status* s);
private:
~MemTable(); // Private since only Unref() should be used to delete it
//key compartor,用于排序
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
int refs_; //引用计数
Arena arena_; //内存分配器
Table table_; //数据存放SkipList
// No copying allowed
MemTable(const MemTable&);
void operator=(const MemTable&);
};
先看看插入
<db/memtable.cc>
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
//数据结构:
//1.internal key size : Varint32 (length of 2+3)
//2.key data
//3.SequenceNumber+Key type: 8 bytes
//4 value size: Varint32
//5 value data
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len);
table_.Insert(buf);
}
查询
<db/memtable.cc>
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}