aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsanjay@google.com <sanjay@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2012-03-09 00:27:49 +0000
committersanjay@google.com <sanjay@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2012-03-09 00:27:49 +0000
commit13daa9f29c999ee40a257ee0775abee2c78a0ad9 (patch)
treebeacf85da681b45003f9149562c4c6b85e893eba
parent3366031b7b6638f4c1d4c80a501b12e0c83a92e4 (diff)
downloadsrc-13daa9f29c999ee40a257ee0775abee2c78a0ad9.tar.gz
added group commit; drastically speeds up mult-threaded synchronous write workloads
git-svn-id: http://leveldb.googlecode.com/svn/trunk@60 62dab493-f737-651d-591e-8d6aee1b9529
-rw-r--r--db/db_impl.cc143
-rw-r--r--db/db_impl.h18
-rw-r--r--db/write_batch.cc17
-rw-r--r--db/write_batch_internal.h2
-rw-r--r--db/write_batch_test.cc31
5 files changed, 162 insertions, 49 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 7b268ea..dde3711 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -35,6 +35,17 @@
namespace leveldb {
+// Information kept for every waiting writer
+struct DBImpl::Writer {
+ Status status;
+ WriteBatch* batch;
+ bool sync;
+ bool done;
+ port::CondVar cv;
+
+ explicit Writer(port::Mutex* mu) : cv(mu) { }
+};
+
struct DBImpl::CompactionState {
Compaction* const compaction;
@@ -113,8 +124,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
- logger_(NULL),
- logger_cv_(&mutex_),
+ tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
mem_->Ref();
@@ -144,6 +154,7 @@ DBImpl::~DBImpl() {
delete versions_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
+ delete tmp_batch_;
delete log_;
delete logfile_;
delete table_cache_;
@@ -554,13 +565,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
}
Status DBImpl::TEST_CompactMemTable() {
- MutexLock l(&mutex_);
- LoggerId self;
- AcquireLoggingResponsibility(&self);
- Status s = MakeRoomForWrite(true /* force compaction */);
- ReleaseLoggingResponsibility(&self);
+ // NULL batch means just wait for earlier writes to be done
+ Status s = Write(WriteOptions(), NULL);
if (s.ok()) {
// Wait until the compaction completes
+ MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
@@ -1094,38 +1103,35 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
-// There is at most one thread that is the current logger. This call
-// waits until preceding logger(s) have finished and becomes the
-// current logger.
-void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
- while (logger_ != NULL) {
- logger_cv_.Wait();
- }
- logger_ = self;
-}
+Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
+ Writer w(&mutex_);
+ w.batch = my_batch;
+ w.sync = options.sync;
+ w.done = false;
-void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
- assert(logger_ == self);
- logger_ = NULL;
- logger_cv_.SignalAll();
-}
-
-Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
- Status status;
MutexLock l(&mutex_);
- LoggerId self;
- AcquireLoggingResponsibility(&self);
- status = MakeRoomForWrite(false); // May temporarily release lock and wait
+ writers_.push_back(&w);
+ while (!w.done && &w != writers_.front()) {
+ w.cv.Wait();
+ }
+ if (w.done) {
+ return w.status;
+ }
+
+ // May temporarily unlock and wait.
+ Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
- if (status.ok()) {
+ Writer* last_writer = &w;
+ if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
+ WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
- // Add to log and apply to memtable. We can release the lock during
- // this phase since the "logger_" flag protects against concurrent
- // loggers and concurrent writes into mem_.
+ // Add to log and apply to memtable. We can release the lock
+ // during this phase since &w is currently responsible for logging
+ // and protects against concurrent loggers and concurrent writes
+ // into mem_.
{
- assert(logger_ == &self);
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
@@ -1135,20 +1141,85 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
- assert(logger_ == &self);
}
+ if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
- ReleaseLoggingResponsibility(&self);
+
+ while (true) {
+ Writer* ready = writers_.front();
+ writers_.pop_front();
+ if (ready != &w) {
+ ready->status = status;
+ ready->done = true;
+ ready->cv.Signal();
+ }
+ if (ready == last_writer) break;
+ }
+
+ // Notify new head of write queue
+ if (!writers_.empty()) {
+ writers_.front()->cv.Signal();
+ }
+
return status;
}
+// REQUIRES: Writer list must be non-empty
+// REQUIRES: First writer must have a non-NULL batch
+WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
+ assert(!writers_.empty());
+ Writer* first = writers_.front();
+ WriteBatch* result = first->batch;
+ assert(result != NULL);
+
+ size_t size = WriteBatchInternal::ByteSize(first->batch);
+
+ // Allow the group to grow up to a maximum size, but if the
+ // original write is small, limit the growth so we do not slow
+ // down the small write too much.
+ size_t max_size = 1 << 20;
+ if (size <= (128<<10)) {
+ max_size = size + (128<<10);
+ }
+
+ *last_writer = first;
+ std::deque<Writer*>::iterator iter = writers_.begin();
+ ++iter; // Advance past "first"
+ for (; iter != writers_.end(); ++iter) {
+ Writer* w = *iter;
+ if (w->sync && !first->sync) {
+ // Do not include a sync write into a batch handled by a non-sync write.
+ break;
+ }
+
+ if (w->batch != NULL) {
+ size += WriteBatchInternal::ByteSize(w->batch);
+ if (size > max_size) {
+ // Do not make batch too big
+ break;
+ }
+
+ // Append to *reuslt
+ if (result == first->batch) {
+ // Switch to temporary batch instead of disturbing caller's batch
+ result = tmp_batch_;
+ assert(WriteBatchInternal::Count(result) == 0);
+ WriteBatchInternal::Append(result, first->batch);
+ }
+ WriteBatchInternal::Append(result, w->batch);
+ }
+ *last_writer = w;
+ }
+ return result;
+}
+
// REQUIRES: mutex_ is held
-// REQUIRES: this thread is the current logger
+// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
- assert(logger_ != NULL);
+ assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
diff --git a/db/db_impl.h b/db/db_impl.h
index fc40d1e..e665c0e 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
+#include <deque>
#include <set>
#include "db/dbformat.h"
#include "db/log_writer.h"
@@ -59,6 +60,8 @@ class DBImpl : public DB {
private:
friend class DB;
+ struct CompactionState;
+ struct Writer;
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
@@ -85,14 +88,8 @@ class DBImpl : public DB {
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
- // Only thread is allowed to log at a time.
- struct LoggerId { }; // Opaque identifier for logging thread
- void AcquireLoggingResponsibility(LoggerId* self);
- void ReleaseLoggingResponsibility(LoggerId* self);
-
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
-
- struct CompactionState;
+ WriteBatch* BuildBatchGroup(Writer** last_writer);
void MaybeScheduleCompaction();
static void BGWork(void* db);
@@ -129,8 +126,11 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
- LoggerId* logger_; // NULL, or the id of the current logging thread
- port::CondVar logger_cv_; // For threads waiting to log
+
+ // Queue of writers.
+ std::deque<Writer*> writers_;
+ WriteBatch* tmp_batch_;
+
SnapshotList snapshots_;
// Set of table files to protect from deletion because they are
diff --git a/db/write_batch.cc b/db/write_batch.cc
index a0e812f..33f4a42 100644
--- a/db/write_batch.cc
+++ b/db/write_batch.cc
@@ -23,6 +23,9 @@
namespace leveldb {
+// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
+static const size_t kHeader = 12;
+
WriteBatch::WriteBatch() {
Clear();
}
@@ -33,16 +36,16 @@ WriteBatch::Handler::~Handler() { }
void WriteBatch::Clear() {
rep_.clear();
- rep_.resize(12);
+ rep_.resize(kHeader);
}
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
- if (input.size() < 12) {
+ if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
- input.remove_prefix(12);
+ input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
@@ -131,8 +134,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b,
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
- assert(contents.size() >= 12);
+ assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());
}
+void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
+ SetCount(dst, Count(dst) + Count(src));
+ assert(src->rep_.size() >= kHeader);
+ dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
+}
+
} // namespace leveldb
diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h
index 49aeb84..4423a7f 100644
--- a/db/write_batch_internal.h
+++ b/db/write_batch_internal.h
@@ -39,6 +39,8 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
+
+ static void Append(WriteBatch* dst, const WriteBatch* src);
};
} // namespace leveldb
diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc
index 1ee6d7b..9064e3d 100644
--- a/db/write_batch_test.cc
+++ b/db/write_batch_test.cc
@@ -18,6 +18,7 @@ static std::string PrintContents(WriteBatch* b) {
mem->Ref();
std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem);
+ int count = 0;
Iterator* iter = mem->NewIterator();
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey;
@@ -29,11 +30,13 @@ static std::string PrintContents(WriteBatch* b) {
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
+ count++;
break;
case kTypeDeletion:
state.append("Delete(");
state.append(ikey.user_key.ToString());
state.append(")");
+ count++;
break;
}
state.append("@");
@@ -42,6 +45,8 @@ static std::string PrintContents(WriteBatch* b) {
delete iter;
if (!s.ok()) {
state.append("ParseError()");
+ } else if (count != WriteBatchInternal::Count(b)) {
+ state.append("CountMismatch()");
}
mem->Unref();
return state;
@@ -82,6 +87,32 @@ TEST(WriteBatchTest, Corruption) {
PrintContents(&batch));
}
+TEST(WriteBatchTest, Append) {
+ WriteBatch b1, b2;
+ WriteBatchInternal::SetSequence(&b1, 200);
+ WriteBatchInternal::SetSequence(&b2, 300);
+ WriteBatchInternal::Append(&b1, &b2);
+ ASSERT_EQ("",
+ PrintContents(&b1));
+ b2.Put("a", "va");
+ WriteBatchInternal::Append(&b1, &b2);
+ ASSERT_EQ("Put(a, va)@200",
+ PrintContents(&b1));
+ b2.Clear();
+ b2.Put("b", "vb");
+ WriteBatchInternal::Append(&b1, &b2);
+ ASSERT_EQ("Put(a, va)@200"
+ "Put(b, vb)@201",
+ PrintContents(&b1));
+ b2.Delete("foo");
+ WriteBatchInternal::Append(&b1, &b2);
+ ASSERT_EQ("Put(a, va)@200"
+ "Put(b, vb)@202"
+ "Put(b, vb)@201"
+ "Delete(foo)@203",
+ PrintContents(&b1));
+}
+
} // namespace leveldb
int main(int argc, char** argv) {