小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

leveldb研究9- 流程分析:打开数据库

Posted on 2012-03-20 16:02 小明 阅读(3427) 评论(0)  编辑  收藏 所属分类: 分布式计算
leveldb 是通过Open函数来打开/新建数据库。
static Status Open(const Options& options,
                     
const std::string& name,
                     DB
** dbptr);

其中options指定一些选项。
struct Options {
  
// -------------------
  
// 影响行为的参数

  
//comparator用于指定key的排列方式,默认按照字节排序
  const Comparator* comparator;

  
//如果不存在则创建
  
// Default: false
  bool create_if_missing;

  
// 如果存在则失败
  
// Default: false
  bool error_if_exists;

  
// 是否做严格的检查
  
// Default: false
  bool paranoid_checks;

  
// env: os 封装
  
// Default: Env::Default()
  Env* env;

  
// log file,默认和database相同路径
  
// Default: NULL
  Logger* info_log;

  
// -------------------
  
// 影响性能的参数

  
// 写缓冲大小,增加会提高写的性能,但是会增加启动的时间,因为有更多的数据需要恢复
  
//
  
// Default: 4MB
  size_t write_buffer_size;

  
// 最大打开的文件个数,用于TableCache
  
//
  
// Default: 1000
  int max_open_files;

  
// Control over blocks (user data is stored in a set of blocks, and
  
// a block is the unit of reading from disk).

  
// 指定Block cache,默认leveldb会自动创建8MB的internal cache
  
// Default: NULL
  Cache* block_cache;

  
//SST file中的Block size,为压缩之前的数据
  
//
  
// Default: 4K
  size_t block_size;

  
// SST file 中的restart pointer的间隔,参见SST的文件格式
  
//
  
// Default: 16
  int block_restart_interval;

  
// 压缩类型,默认为google的snappy压缩
  CompressionType compression;

  
// Create an Options object with default values for all fields.
  Options();
};

具体看看Open的实现:
<db/dbimpl.cc>
Status DB::Open(const Options& options, const std::string& dbname,
                DB
** dbptr) {
  
*dbptr = NULL;

  
//实例化对象:DBImpl
  DBImpl* impl = new DBImpl(options, dbname);
  
//加锁
  impl->mutex_.Lock();
  VersionEdit edit;
  
//从log中恢复数据,生成新的SST file
  Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
  if (s.ok()) { 
    
//创建新的log file
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    WritableFile
* lfile;
    s 
= options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                     
&lfile);
    
if (s.ok()) {
      edit.SetLogNumber(new_log_number);
      impl
->logfile_ = lfile;
      impl
->logfile_number_ = new_log_number;
      impl
->log_ = new log::Writer(lfile);
      
//生成新的manifest文件
      s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
    }
    
if (s.ok()) {
      
//删除失效文件
      impl->DeleteObsoleteFiles();
      
//进行compaction
      impl->MaybeScheduleCompaction();
    }
  }
  impl
->mutex_.Unlock();
  
if (s.ok()) {
    
*dbptr = impl;
  } 
else {
    delete impl;
  }
  
return s;
}

因为上次关闭数据库的时候,内存的数据可能并没有写入SST文件,所以要从*.log中读取记录,并写入新的SST文件。
<db/dbimpl.cc>
Status DBImpl::Recover(VersionEdit* edit) {
  mutex_.AssertHeld();

  
//创建folder
  env_->CreateDir(dbname_);
  assert(db_lock_ 
== NULL);
  
//生成LOCK文件并锁定
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  
if (!s.ok()) {
    
return s;
  }

  
if (!env_->FileExists(CurrentFileName(dbname_))) {
    
if (options_.create_if_missing) {
      
//新建database
      s = NewDB();
      
if (!s.ok()) {
        
return s;
      }
    } 
else {
      
return Status::InvalidArgument(
          dbname_, 
"does not exist (create_if_missing is false)");
    }
  } 
else {
    
if (options_.error_if_exists) {
      
return Status::InvalidArgument(
          dbname_, 
"exists (error_if_exists is true)");
    }
  }

  
//重建manifest信息
  s = versions_->Recover();
  
if (s.ok()) {
    SequenceNumber max_sequence(
0);

    
//得到上次的log file
    const uint64_t min_log = versions_->LogNumber();
    
const uint64_t prev_log = versions_->PrevLogNumber();
    std::vector
<std::string> filenames;
    s 
= env_->GetChildren(dbname_, &filenames);
    
if (!s.ok()) {
      
return s;
    }
    uint64_t number;
    FileType type;
    std::vector
<uint64_t> logs;
    
for (size_t i = 0; i < filenames.size(); i++) {
      
if (ParseFileName(filenames[i], &number, &type)
          
&& type == kLogFile
          
&& ((number >= min_log) || (number == prev_log))) {
        logs.push_back(number);
      }
    }

    
// Recover in the order in which the logs were generated
    std::sort(logs.begin(), logs.end());
    
for (size_t i = 0; i < logs.size(); i++) {
      
//从*.log中恢复数据
      s = RecoverLogFile(logs[i], edit, &max_sequence);

      
// The previous incarnation may not have written any MANIFEST
      
// records after allocating this log number.  So we manually
      
// update the file number allocation counter in VersionSet.
      versions_->MarkFileNumberUsed(logs[i]);
    }

    
if (s.ok()) {
      
if (versions_->LastSequence() < max_sequence) {
        versions_
->SetLastSequence(max_sequence);
      }
    }
  }

  
return s;
}

继续看RecoverLogFile的实现:
<db/dbimpl.cc>
Status DBImpl::RecoverLogFile(uint64_t log_number,
                              VersionEdit
* edit,
                              SequenceNumber
* max_sequence) {
  
//LogReporter:出现坏数据的时候报告
   struct LogReporter : public log::Reader::Reporter {
    Env
* env;
    Logger
* info_log;
    
const char* fname;
    Status
* status;  // NULL if options_.paranoid_checks==false
    virtual void Corruption(size_t bytes, const Status& s) {
      Log(info_log, 
"%s%s: dropping %d bytes; %s",
          (
this->status == NULL ? "(ignoring error) " : ""),
          fname, static_cast
<int>(bytes), s.ToString().c_str());
      
if (this->status != NULL && this->status->ok()) *this->status = s;
    }
  };

  mutex_.AssertHeld();

  
//打开Log file用于顺序读取
  std::string fname = LogFileName(dbname_, log_number);
  SequentialFile
* file;
  Status status 
= env_->NewSequentialFile(fname, &file);
  
if (!status.ok()) {
    MaybeIgnoreError(
&status);
    
return status;
  }

  LogReporter reporter;
  reporter.env 
= env_;
  reporter.info_log 
= options_.info_log;
  reporter.fname 
= fname.c_str();
  reporter.status 
= (options_.paranoid_checks ? &status : NULL);
  
// log::Reader读取数据
  log::Reader reader(file, &reporter, true/*checksum*/,
                     
0/*initial_offset*/);
  Log(options_.info_log, 
"Recovering log #%llu",
      (unsigned 
long long) log_number);

  std::
string scratch;
  Slice record;
  WriteBatch batch;
  MemTable
* mem = NULL;
  
//遍历log file,读取记录
  while (reader.ReadRecord(&record, &scratch) &&
         status.ok()) {
    
if (record.size() < 12) {
      reporter.Corruption(
          record.size(), Status::Corruption(
"log record too small"));
      
continue;
    }
    WriteBatchInternal::SetContents(
&batch, record);

    
if (mem == NULL) {
      
//新建MemTable用于保存数据
      mem = new MemTable(internal_comparator_);
      mem
->Ref();
    }
    
//插入memtable
    status = WriteBatchInternal::InsertInto(&batch, mem);
    MaybeIgnoreError(
&status);
    
if (!status.ok()) {
      
break;
    }
    
const SequenceNumber last_seq =
        WriteBatchInternal::Sequence(
&batch) +
        WriteBatchInternal::Count(
&batch) - 1;
    
if (last_seq > *max_sequence) {
      
*max_sequence = last_seq;
    }

    
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
      
//写入SST file:level 0
      status = WriteLevel0Table(mem, edit, NULL);
      
if (!status.ok()) {
        
break;
      }
      
//释放并删除memtable
      mem->Unref();
      mem 
= NULL;
    }
  }

  
if (status.ok() && mem != NULL) {
    status 
= WriteLevel0Table(mem, edit, NULL);
    
// Reflect errors immediately so that conditions like full
    
// file-systems cause the DB::Open() to fail.
  }

  
if (mem != NULL) mem->Unref();
  delete file;
  
return status;
}

至此完成SST file的写入。

接下来看看manifest文件的重建
mainfest的重建有两步,第一步是调用VersionSet::Recover函数恢复到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件记录也写入manifest文件中。
<db/version_set.cc>

Status VersionSet::Recover() {
  
struct LogReporter : public log::Reader::Reporter {
    Status
* status;
    
virtual void Corruption(size_t bytes, const Status& s) {
      
if (this->status->ok()) *this->status = s;
    }
  };

  
// 读取CURRENT文件,获取最新的MANIFEST文件
  std::string current;
  Status s 
= ReadFileToString(env_, CurrentFileName(dbname_), &current);
  
if (!s.ok()) {
    
return s;
  }
  
if (current.empty() || current[current.size()-1!= '\n') {
    
return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() 
- 1);

  std::
string dscname = dbname_ + "/" + current;
  SequentialFile
* file;
  
//打开当前MANIFEST文件
  s = env_->NewSequentialFile(dscname, &file);
  
if (!s.ok()) {
    
return s;
  }

  
bool have_log_number = false;
  
bool have_prev_log_number = false;
  
bool have_next_file = false;
  
bool have_last_sequence = false;
  uint64_t next_file 
= 0;
  uint64_t last_sequence 
= 0;
  uint64_t log_number 
= 0;
  uint64_t prev_log_number 
= 0;
  Builder builder(
this, current_);

  {
    LogReporter reporter;
    reporter.status 
= &s;
    
//使用log::Reader读取log记录:VersionEdit
    log::Reader reader(file, &reporter, true/*checksum*/0/*initial_offset*/);
    Slice record;
    std::
string scratch;
    
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
      VersionEdit edit;
      s 
= edit.DecodeFrom(record);
      
if (s.ok()) {
        
if (edit.has_comparator_ &&
            edit.comparator_ 
!= icmp_.user_comparator()->Name()) {
          s 
= Status::InvalidArgument(
              edit.comparator_ 
+ "does not match existing comparator ",
              icmp_.user_comparator()
->Name());
        }
      }

      
if (s.ok()) {
        
//应用Edit到VersionSet
        builder.Apply(&edit);
      }

      
if (edit.has_log_number_) {
        log_number 
= edit.log_number_;
        have_log_number 
= true;
      }

      
if (edit.has_prev_log_number_) {
        prev_log_number 
= edit.prev_log_number_;
        have_prev_log_number 
= true;
      }

      
if (edit.has_next_file_number_) {
        next_file 
= edit.next_file_number_;
        have_next_file 
= true;
      }

      
if (edit.has_last_sequence_) {
        last_sequence 
= edit.last_sequence_;
        have_last_sequence 
= true;
      }
    }
  }
  delete file;
  file 
= NULL;

  
if (s.ok()) {
    
if (!have_next_file) {
      s 
= Status::Corruption("no meta-nextfile entry in descriptor");
    } 
else if (!have_log_number) {
      s 
= Status::Corruption("no meta-lognumber entry in descriptor");
    } 
else if (!have_last_sequence) {
      s 
= Status::Corruption("no last-sequence-number entry in descriptor");
    }

    
if (!have_prev_log_number) {
      prev_log_number 
= 0;
    }

    MarkFileNumberUsed(prev_log_number);
    MarkFileNumberUsed(log_number);
  }

  
if (s.ok()) { //生成新的version,并设为current version
    Version* v = new Version(this);
    builder.SaveTo(v);
    
// Install recovered version
    Finalize(v);
    AppendVersion(v);
    manifest_file_number_ 
= next_file;
    next_file_number_ 
= next_file + 1;
    last_sequence_ 
= last_sequence;
    log_number_ 
= log_number;
    prev_log_number_ 
= prev_log_number;
  }

  
return s;
}

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
  
if (edit->has_log_number_) {
    assert(edit
->log_number_ >= log_number_);
    assert(edit
->log_number_ < next_file_number_);
  } 
else {
    edit
->SetLogNumber(log_number_);
  }

  
if (!edit->has_prev_log_number_) {
    edit
->SetPrevLogNumber(prev_log_number_);
  }

  edit
->SetNextFile(next_file_number_);
  edit
->SetLastSequence(last_sequence_);

  
//使用VersionEdit创建新的Version
  Version* v = new Version(this);
  {
    Builder builder(
this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }
  Finalize(v);

  
// Initialize new descriptor log file if necessary by creating
  
// a temporary file that contains a snapshot of the current version.
  std::string new_manifest_file;
  Status s;
  
//创建新的manifest文件
  if (descriptor_log_ == NULL) {
    
// No reason to unlock *mu here since we only hit this path in the
    
// first call to LogAndApply (when opening the database).
    assert(descriptor_file_ == NULL);
    new_manifest_file 
= DescriptorFileName(dbname_, manifest_file_number_);
    edit
->SetNextFile(next_file_number_);
    s 
= env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    
if (s.ok()) {
      descriptor_log_ 
= new log::Writer(descriptor_file_);
      s 
= WriteSnapshot(descriptor_log_);
    }
  }

  
// Unlock during expensive MANIFEST log write
  {
    mu
->Unlock();

    
// 写入manifest log
    if (s.ok()) {
      std::
string record;
      edit
->EncodeTo(&record);
      s 
= descriptor_log_->AddRecord(record);
      
if (s.ok()) {
        s 
= descriptor_file_->Sync();
      }
    }

    
// If we just created a new descriptor file, install it by writing a
    
// new CURRENT file that points to it.
    if (s.ok() && !new_manifest_file.empty()) {
      s 
= SetCurrentFile(env_, dbname_, manifest_file_number_);
    }

    mu
->Lock();
  }

  
// 设置新的version
  if (s.ok()) {
    AppendVersion(v);
    log_number_ 
= edit->log_number_;
    prev_log_number_ 
= edit->prev_log_number_;
  } 
else {
    delete v;
    
if (!new_manifest_file.empty()) {
      delete descriptor_log_;
      delete descriptor_file_;
      descriptor_log_ 
= NULL;
      descriptor_file_ 
= NULL;
      env_
->DeleteFile(new_manifest_file);
    }
  }

  
return s;
}







只有注册用户登录后才能发表评论。


网站导航: