Posted on 2012-03-21 14:41
小明 阅读(3402)
评论(0) 编辑 收藏 所属分类:
分布式计算
总体来说,leveldb的写操作有两个步骤,首先是针对log的append操作,然后是对memtable的插入操作。
影响写性能的因素有:
1.
write_buffer_size2.
kL0_SlowdownWritesTrigger and
kL0_StopWritesTrigger.提高这两个值,能够增加写的性能,但是降低读的性能
看看WriteOptions有哪些参数可以指定
struct WriteOptions {
//设置sync=true,leveldb会调用fsync(),这会降低插入性能
//同时会增加数据的安全性
//Default: false
bool sync;
WriteOptions()
: sync(false) {
}
};
首先把Key,value转成WriteBatch
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
接下来就是真正的插入了
这里使用了两把锁,主要是想提高并发能力,减少上锁的时间。
首先是检查是否可写,然后append log,最后是插入memtable
<db/dbimpl.cc>
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
//加锁
MutexLock l(&mutex_);
LoggerId self;
//拿到写log的权利
AcquireLoggingResponsibility(&self);
//检查是否可写
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock during
// this phase since the "logger_" flag protects against concurrent
// loggers and concurrent writes into mem_.
{
assert(logger_ == &self);
mutex_.Unlock();
//IO操作:写入LOG
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
//插入memtable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
assert(logger_ == &self);
}
//设置新的seqence number
versions_->SetLastSequence(last_sequence);
}
//释放写LOG锁
ReleaseLoggingResponsibility(&self);
return status;
}
写流量控制:
<db/dbimpl.cc>
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(logger_ != NULL);
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
mutex_.Unlock();
//如果level0的文件大于kL0_SlowdownWritesTrigger阈值,则sleep 1s,这样给compaction更多的CPU
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
//可写
break;
} else if (imm_ != NULL) {
// imm_:之前的memtable 没有被compaction,需要等待
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// level0文件个数大于kL0_StopWritesTrigger,需要等待
Log(options_.info_log, "waiting\n");
bg_cv_.Wait();
} else {
//生成新的额memtable和logfile,把当前memtable传给imm_
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
// 发起compaction,dump imm_
MaybeScheduleCompaction();
}
}
return s;
}