leveldb读数据
先看看ReadOptions有哪些参数可以指定:
// Options that control read operations
struct ReadOptions {
// 是否检查checksum
// Default: false
bool verify_checksums;
// 是否将此次结果放入cache
// Default: true
bool fill_cache;
//是否指定snapshot,否则读取当前版本
// Default: NULL
const Snapshot* snapshot;
ReadOptions()
: verify_checksums(false),
fill_cache(true),
snapshot(NULL) {
}
};
下面看看读取的详细过程:
查询memtable=>查询previous memtable(imm_)=>查询文件(缓冲)
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
//设置snapshot
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
LookupKey lkey(key, snapshot);
//先查询memtable
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) { //然后查询previous memtable:imm_
// Done
} else {
//从文件中读取
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
//是否有文件需要被compaction,参见allowed_seek
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
重点来看看从version中读取:
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
//从level0向高层查找,如果再低级level中查到,则不再查询
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
//本层文件数为空,则返回
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
//level0特殊处理,因为key是重叠,所有符合条件的文件必须被查找
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// 二分法查找,某个key只可能属于一个文件
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
//没有查到
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) { //遍历本层符合条件的文件
if (last_file_read != NULL && stats->seek_file == NULL) {
//seek_file只记录第一个
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
//从table cache中读取
Iterator* iter = vset_->table_cache_->NewIterator(
options,
f->number,
f->file_size);
iter->Seek(ikey);
const bool done = GetValue(ucmp, iter, user_key, value, &s);
if (!iter->status().ok()) { //查找到
s = iter->status();
delete iter;
return s;
} else {
delete iter;
if (done) {
return s;
}
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
继续跟踪:TableCache
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
Table** tableptr) {
if (tableptr != NULL) {
*tableptr = NULL;
}
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
//从LRU cache中查找
Cache::Handle* handle = cache_->Lookup(key);
if (handle == NULL) {
/加载文件
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
Table* table = NULL;
Status s = env_->NewRandomAccessFile(fname, &file);
if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table);
}
if (!s.ok()) {
assert(table == NULL);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
return NewErrorIterator(s);
}
//插入Cache
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
//从Table对象中生成iterator
Iterator* result = table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != NULL) {
*tableptr = table;
}
return result;
}