小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理
我们知道,leveldb在写数据的时候,除了log文件,都要在内存中写一份。

先看看SkipList【跳表】这个数据结构:


SkipList有如下特点:
1. 本质上一个排序好的链表
2. 分层,上层节点比下层的少,更具有跳跃性
3. 查询的复杂度是O(logn)

SkipList跟红黑树等还是比较容易实现和理解的,主要长处是比较容易实现Lock free和遍历。
我们来看看leveldb的实现
插入:
//插入一个新的key
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
  
//查找插入节点,prev为各层的前置节点
  Node* prev[kMaxHeight];
  Node
* x = FindGreaterOrEqual(key, prev);

  
// Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key));

  
//生成随机高度
  int height = RandomHeight();
  
if (height > GetMaxHeight()) {
    
for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] 
= head_;
    }
    
//设置当前最大高度
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  
//生成新节点
  x = NewNode(key, height);
  
for (int i = 0; i < height; i++) {
    
//设置新节点的各层的下一跳
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    
//设置前节点的next为当前节点,完成插入
    prev[i]->SetNext(i, x);
  }
}

查询:
template<typename Key, class Comparator>
typename SkipList
<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    
const {
  Node
* x = head_;
  
int level = GetMaxHeight() - 1//从高层开始查找,依次到0 level
  while (true) {
    Node
* next = x->Next(level); 
    
if (KeyIsAfterNode(key, next)) { //比next key 要大
      
// Keep searching in this list
      x = next;
    } 
else { //比next key小,查找下一层
      
//标记当前level的前置节点
      if (prev != NULL) prev[level] = x;
      
if (level == 0) {
        
return next;
      } 
else {
        level
--;
      }
    }
  }
}

template
<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
  Node
* x = FindGreaterOrEqual(key, NULL);
  
if (x != NULL && Equal(key, x->key)) {
    
return true;
  } 
else {
    
return false;
  }
}


接着我们看看leveldb MemTable的实现,很简单了,基本是对SkipList访问一个封装
<db/memtable.h>
class MemTable {
 
public:
  
explicit MemTable(const InternalKeyComparator& comparator);

  
//增加引用计数
  void Ref() { ++refs_; }

  
//减少引用计数
  void Unref() {
    
--refs_;
    assert(refs_ 
>= 0);
    
if (refs_ <= 0) {
      delete 
this;
    }
  }

  
//内存使用量
  size_t ApproximateMemoryUsage();

  
//遍历操作
  Iterator* NewIterator();

  
//插入
  void Add(SequenceNumber seq, ValueType type,
           
const Slice& key,
           
const Slice& value);

  
//查询
  bool Get(const LookupKey& key, std::string* value, Status* s);

 
private:
  
~MemTable();  // Private since only Unref() should be used to delete it

  
//key compartor,用于排序
  struct KeyComparator {
    
const InternalKeyComparator comparator;
    
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
    
int operator()(const char* a, const char* b) const;
  };
  friend 
class MemTableIterator;
  friend 
class MemTableBackwardIterator;

  typedef SkipList
<const char*, KeyComparator> Table;

  KeyComparator comparator_;
  
int refs_; //引用计数
  Arena arena_; //内存分配器
  Table table_; //数据存放SkipList

  
// No copying allowed
  MemTable(const MemTable&);
  
void operator=(const MemTable&);
};

先看看插入
<db/memtable.cc>
void MemTable::Add(SequenceNumber s, ValueType type,
                   
const Slice& key,
                   
const Slice& value) {
  
//数据结构:
  
//1.internal key size : Varint32 (length of 2+3)
  
//2.key data
  
//3.SequenceNumber+Key type: 8 bytes
  
//4 value size: Varint32
  
//5 value data
  size_t key_size = key.size();
  size_t val_size 
= value.size();
  size_t internal_key_size 
= key_size + 8;
  
const size_t encoded_len =
      VarintLength(internal_key_size) 
+ internal_key_size +
      VarintLength(val_size) 
+ val_size;
  
char* buf = arena_.Allocate(encoded_len);
  
char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p 
+= key_size;
  EncodeFixed64(p, (s 
<< 8| type);
  p 
+= 8;
  p 
= EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
  assert((p 
+ val_size) - buf == encoded_len);
  table_.Insert(buf);
}

查询
<db/memtable.cc>
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey 
= key.memtable_key();
  Table::Iterator iter(
&table_);
  iter.Seek(memkey.data());
  
if (iter.Valid()) {
    
// entry format is:
    
//    klength  varint32
    
//    userkey  char[klength]
    
//    tag      uint64
    
//    vlength  varint32
    
//    value    char[vlength]
    
// Check that it belongs to same user key.  We do not check the
    
// sequence number since the Seek() call above should have skipped
    
// all entries with overly large sequence numbers.
    const char* entry = iter.key();
    uint32_t key_length;
    
const char* key_ptr = GetVarint32Ptr(entry, entry+5&key_length);
    
if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length 
- 8),
            key.user_key()) 
== 0) {
      
// Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      
switch (static_cast<ValueType>(tag & 0xff)) {
        
case kTypeValue: {
          Slice v 
= GetLengthPrefixedSlice(key_ptr + key_length);
          value
->assign(v.data(), v.size());
          
return true;
        }
        
case kTypeDeletion:
          
*= Status::NotFound(Slice());
          
return true;
      }
    }
  }
  
return false;
}

只有注册用户登录后才能发表评论。


网站导航: