小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

leveldb研究10- 流程分析:写数据

Posted on 2012-03-21 14:41 小明 阅读(3403) 评论(0)  编辑  收藏 所属分类: 分布式计算
总体来说,leveldb的写操作有两个步骤,首先是针对log的append操作,然后是对memtable的插入操作。

影响写性能的因素有:
1. write_buffer_size
2. kL0_SlowdownWritesTrigger and kL0_StopWritesTrigger.提高这两个值,能够增加写的性能,但是降低读的性能

看看WriteOptions有哪些参数可以指定
struct WriteOptions {
  
//设置sync=true,leveldb会调用fsync(),这会降低插入性能
  
//同时会增加数据的安全性 
  
//Default: false
  bool sync;

  WriteOptions()
      : sync(
false) {
  }
};


首先把Key,value转成WriteBatch
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  
return Write(opt, &batch);
}

接下来就是真正的插入了
这里使用了两把锁,主要是想提高并发能力,减少上锁的时间。
首先是检查是否可写,然后append log,最后是插入memtable
<db/dbimpl.cc>

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Status status;
  
//加锁
  MutexLock l(&mutex_);
  LoggerId self;
  
//拿到写log的权利
  AcquireLoggingResponsibility(&self);
  
//检查是否可写
  status = MakeRoomForWrite(false);  // May temporarily release lock and wait
  uint64_t last_sequence = versions_->LastSequence();
  
if (status.ok()) {
    WriteBatchInternal::SetSequence(updates, last_sequence 
+ 1);
    last_sequence 
+= WriteBatchInternal::Count(updates);

    
// Add to log and apply to memtable.  We can release the lock during
    
// this phase since the "logger_" flag protects against concurrent
    
// loggers and concurrent writes into mem_.
    {
      assert(logger_ 
== &self);
      mutex_.Unlock();
      
//IO操作:写入LOG
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      
if (status.ok() && options.sync) {
        status 
= logfile_->Sync();
      }
      
//插入memtable
      if (status.ok()) {
        status 
= WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      assert(logger_ 
== &self);
    }
    
//设置新的seqence number
    versions_->SetLastSequence(last_sequence);
  }
  
//释放写LOG锁
  ReleaseLoggingResponsibility(&self);
  
return status;
}

写流量控制:
<db/dbimpl.cc>
Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(logger_ 
!= NULL);
  
bool allow_delay = !force;
  Status s;
  
while (true) {
    
if (!bg_error_.ok()) {
      
// Yield previous error
      s = bg_error_;
      
break;
    } 
else if ( 
        allow_delay 
&&
        versions_
->NumLevelFiles(0>= config::kL0_SlowdownWritesTrigger) {
      mutex_.Unlock();
      
//如果level0的文件大于kL0_SlowdownWritesTrigger阈值,则sleep 1s,这样给compaction更多的CPU
      env_->SleepForMicroseconds(1000);
      allow_delay 
= false;  // Do not delay a single write more than once
      mutex_.Lock();
    } 
else if (!force &&
               (mem_
->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      
//可写
      break;
    } 
else if (imm_ != NULL) {
      
// imm_:之前的memtable 没有被compaction,需要等待
      bg_cv_.Wait();
    } 
else if (versions_->NumLevelFiles(0>= config::kL0_StopWritesTrigger) {
      
// level0文件个数大于kL0_StopWritesTrigger,需要等待
      Log(options_.info_log, "waiting\n");
      bg_cv_.Wait();
    } 
else {
      
//生成新的额memtable和logfile,把当前memtable传给imm_
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number 
= versions_->NewFileNumber();
      WritableFile
* lfile = NULL;
      s 
= env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      
if (!s.ok()) {
        
break;
      }
      delete log_;
      delete logfile_;
      logfile_ 
= lfile;
      logfile_number_ 
= new_log_number;
      log_ 
= new log::Writer(lfile);
      imm_ 
= mem_;
      has_imm_.Release_Store(imm_);
      mem_ 
= new MemTable(internal_comparator_);
      mem_
->Ref();
      force 
= false;   // Do not force another compaction if have room
      // 发起compaction,dump imm_
      MaybeScheduleCompaction();
    }
  }
  
return s;
}


只有注册用户登录后才能发表评论。


网站导航: