小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

leveldb研究11- 流程分析:读数据

Posted on 2012-03-21 17:30 小明 阅读(2724) 评论(0)  编辑  收藏 所属分类: 分布式计算
leveldb读数据

先看看ReadOptions有哪些参数可以指定:
// Options that control read operations
struct ReadOptions {
  
// 是否检查checksum
  
// Default: false
  bool verify_checksums;

  
// 是否将此次结果放入cache
  
// Default: true
  bool fill_cache;

  
//是否指定snapshot,否则读取当前版本
  
// Default: NULL
  const Snapshot* snapshot;

  ReadOptions()
      : verify_checksums(
false),
        fill_cache(
true),
        snapshot(NULL) {
  }
};

下面看看读取的详细过程:
查询memtable=>查询previous memtable(imm_)=>查询文件(缓冲)

Status DBImpl::Get(const ReadOptions& options,
                   
const Slice& key,
                   std::
string* value) {
  Status s;
  MutexLock l(
&mutex_);
  SequenceNumber snapshot;
  
//设置snapshot
  if (options.snapshot != NULL) {
    snapshot 
= reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } 
else {
    snapshot 
= versions_->LastSequence();
  }

  MemTable
* mem = mem_;
  MemTable
* imm = imm_;
  Version
* current = versions_->current();
  mem
->Ref();
  
if (imm != NULL) imm->Ref();
  current
->Ref();

  
bool have_stat_update = false;
  Version::GetStats stats;

  
// Unlock while reading from files and memtables
  {
    mutex_.Unlock();
    LookupKey lkey(key, snapshot);
    
//先查询memtable
    if (mem->Get(lkey, value, &s)) {
      
// Done
    } else if (imm != NULL && imm->Get(lkey, value, &s)) { //然后查询previous memtable:imm_
      
// Done
    } else {
      
//从文件中读取
      s = current->Get(options, lkey, value, &stats);
      have_stat_update 
= true;
    }
    mutex_.Lock();
  }

  
//是否有文件需要被compaction,参见allowed_seek
  if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();
  }
  mem
->Unref();
  
if (imm != NULL) imm->Unref();
  current
->Unref();
  
return s;
}


重点来看看从version中读取:
Status Version::Get(const ReadOptions& options,
                    
const LookupKey& k,
                    std::
string* value,
                    GetStats
* stats) {
  Slice ikey 
= k.internal_key();
  Slice user_key 
= k.user_key();
  
const Comparator* ucmp = vset_->icmp_.user_comparator();
  Status s;

  stats
->seek_file = NULL;
  stats
->seek_file_level = -1;
  FileMetaData
* last_file_read = NULL;
  
int last_file_read_level = -1;

  
//从level0向高层查找,如果再低级level中查到,则不再查询
  std::vector<FileMetaData*> tmp;
  FileMetaData
* tmp2;
  
for (int level = 0; level < config::kNumLevels; level++) {
    size_t num_files 
= files_[level].size();
    
//本层文件数为空,则返回
    if (num_files == 0continue;

    
// Get the list of files to search in this level
    FileMetaData* const* files = &files_[level][0];
    
if (level == 0) {
      
//level0特殊处理,因为key是重叠,所有符合条件的文件必须被查找
      tmp.reserve(num_files);
      
for (uint32_t i = 0; i < num_files; i++) {
        FileMetaData
* f = files[i];
        
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
            ucmp
->Compare(user_key, f->largest.user_key()) <= 0) {
          tmp.push_back(f);
        }
      }
      
if (tmp.empty()) continue;

      std::sort(tmp.begin(), tmp.end(), NewestFirst);
      files 
= &tmp[0];
      num_files 
= tmp.size();
    } 
else {
      
// 二分法查找,某个key只可能属于一个文件
      uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
      
//没有查到
      if (index >= num_files) {
        files 
= NULL;
        num_files 
= 0;
      } 
else {
        tmp2 
= files[index];
        
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
          
// All of "tmp2" is past any data for user_key
          files = NULL;
          num_files 
= 0;
        } 
else {
          files 
= &tmp2;
          num_files 
= 1;
        }
      }
    }

    
for (uint32_t i = 0; i < num_files; ++i) { //遍历本层符合条件的文件
      if (last_file_read != NULL && stats->seek_file == NULL) {
        
//seek_file只记录第一个
        stats->seek_file = last_file_read;
        stats
->seek_file_level = last_file_read_level;
      }

      FileMetaData
* f = files[i];
      last_file_read 
= f;
      last_file_read_level 
= level;
      
      
//从table cache中读取
      Iterator* iter = vset_->table_cache_->NewIterator(
          options,
          f
->number,
          f
->file_size);
      iter
->Seek(ikey);
      
const bool done = GetValue(ucmp, iter, user_key, value, &s);
      
if (!iter->status().ok()) { //查找到
        s = iter->status();
        delete iter;
        
return s;
      } 
else {
        delete iter;
        
if (done) {
          
return s;
        }
      }
    }
  }

  
return Status::NotFound(Slice());  // Use an empty error message for speed
}

继续跟踪:TableCache

Iterator* TableCache::NewIterator(const ReadOptions& options,
                                  uint64_t file_number,
                                  uint64_t file_size,
                                  Table
** tableptr) {
  
if (tableptr != NULL) {
    
*tableptr = NULL;
  }

  
char buf[sizeof(file_number)];
  EncodeFixed64(buf, file_number);
  Slice key(buf, 
sizeof(buf));

  
//从LRU cache中查找
  Cache::Handle* handle = cache_->Lookup(key);
  
if (handle == NULL) { 
    
/加载文件
    std::
string fname = TableFileName(dbname_, file_number);
    RandomAccessFile
* file = NULL;
    Table
* table = NULL;
    Status s 
= env_->NewRandomAccessFile(fname, &file);
    
if (s.ok()) {
      s 
= Table::Open(*options_, file, file_size, &table);
    }

    
if (!s.ok()) {
      assert(table 
== NULL);
      delete file;
      
// We do not cache error results so that if the error is transient,
      
// or somebody repairs the file, we recover automatically.
      return NewErrorIterator(s);
    }

    
//插入Cache
    TableAndFile* tf = new TableAndFile;
    tf
->file = file;
    tf
->table = table;
    handle 
= cache_->Insert(key, tf, 1&DeleteEntry);
  }

  Table
* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
  
//从Table对象中生成iterator
  Iterator* result = table->NewIterator(options);
  result
->RegisterCleanup(&UnrefEntry, cache_, handle);
  
if (tableptr != NULL) {
    
*tableptr = table;
  }
  
return result;
}



只有注册用户登录后才能发表评论。


网站导航: