aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsanjay@google.com <sanjay@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2012-01-25 23:07:20 +0000
committersanjay@google.com <sanjay@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2012-01-25 23:07:20 +0000
commite05bd5cade19e5de0f763f4f122eef9f35de3d9c (patch)
treefa5fbe9ca451a385cf615f772cbcfeeb7c90e87a
parentb14d5e1b0796330467b3d1535589270b1a173cc6 (diff)
downloadsrc-e05bd5cade19e5de0f763f4f122eef9f35de3d9c.tar.gz
fixed issues 66 (leaking files on disk error) and 68 (no sync of CURRENT file)
git-svn-id: http://leveldb.googlecode.com/svn/trunk@57 62dab493-f737-651d-591e-8d6aee1b9529
-rw-r--r--db/db_impl.cc20
-rw-r--r--db/db_test.cc63
-rw-r--r--db/filename.cc6
-rw-r--r--util/env.cc18
-rw-r--r--util/env_test.cc22
5 files changed, 95 insertions, 34 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index b4df80d..7b268ea 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -655,6 +655,8 @@ void DBImpl::BackgroundCompaction() {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
+ c->ReleaseInputs();
+ DeleteObsoleteFiles();
}
delete c;
@@ -672,6 +674,9 @@ void DBImpl::BackgroundCompaction() {
if (is_manual) {
ManualCompaction* m = manual_compaction_;
+ if (!status.ok()) {
+ m->done = true;
+ }
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
@@ -793,21 +798,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
- pending_outputs_.erase(out.number);
}
- compact->outputs.clear();
-
- Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
- if (s.ok()) {
- compact->compaction->ReleaseInputs();
- DeleteObsoleteFiles();
- } else {
- // Discard any files we may have created during this failed compaction
- for (size_t i = 0; i < compact->outputs.size(); i++) {
- env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
- }
- }
- return s;
+ return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
diff --git a/db/db_test.cc b/db/db_test.cc
index 5dc3b02..8318885 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -28,8 +28,12 @@ class SpecialEnv : public EnvWrapper {
// sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_;
+ // Simulate no-space errors while this pointer is non-NULL.
+ port::AtomicPointer no_space_;
+
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
+ no_space_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@@ -44,7 +48,14 @@ class SpecialEnv : public EnvWrapper {
base_(base) {
}
~SSTableFile() { delete base_; }
- Status Append(const Slice& data) { return base_->Append(data); }
+ Status Append(const Slice& data) {
+ if (env_->no_space_.Acquire_Load() != NULL) {
+ // Drop writes on the floor
+ return Status::OK();
+ } else {
+ return base_->Append(data);
+ }
+ }
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
@@ -239,6 +250,12 @@ class DBTest {
return result;
}
+ int CountFiles() {
+ std::vector<std::string> files;
+ env_->GetChildren(dbname_, &files);
+ return static_cast<int>(files.size());
+ }
+
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
@@ -1266,6 +1283,37 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}
+// Check that number of files does not grow when we are out of space
+TEST(DBTest, NoSpace) {
+ Options options;
+ options.env = env_;
+ Reopen(&options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+ Compact("a", "z");
+ const int num_files = CountFiles();
+ env_->no_space_.Release_Store(env_); // Force out-of-space errors
+ for (int i = 0; i < 10; i++) {
+ for (int level = 0; level < config::kNumLevels-1; level++) {
+ dbfull()->TEST_CompactRange(level, NULL, NULL);
+ }
+ }
+ env_->no_space_.Release_Store(NULL);
+ ASSERT_LT(CountFiles(), num_files + 5);
+}
+
+TEST(DBTest, FilesDeletedAfterCompaction) {
+ ASSERT_OK(Put("foo", "v2"));
+ Compact("a", "z");
+ const int num_files = CountFiles();
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put("foo", "v2"));
+ Compact("a", "z");
+ }
+ ASSERT_EQ(CountFiles(), num_files);
+}
+
// Multi-threaded test:
namespace {
@@ -1287,14 +1335,15 @@ struct MTThread {
static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
+ int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
- fprintf(stderr, "... starting thread %d\n", t->id);
- Random rnd(1000 + t->id);
+ fprintf(stderr, "... starting thread %d\n", id);
+ Random rnd(1000 + id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) {
- t->state->counter[t->id].Release_Store(reinterpret_cast<void*>(counter));
+ t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
@@ -1304,7 +1353,7 @@ static void MTThreadBody(void* arg) {
// Write values of the form <key, my id, counter>.
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
- key, t->id, static_cast<int>(counter));
+ key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
@@ -1325,8 +1374,8 @@ static void MTThreadBody(void* arg) {
}
counter++;
}
- t->state->thread_done[t->id].Release_Store(t);
- fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter));
+ t->state->thread_done[id].Release_Store(t);
+ fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
}
} // namespace
diff --git a/db/filename.cc b/db/filename.cc
index 24fd140..3c4d49f 100644
--- a/db/filename.cc
+++ b/db/filename.cc
@@ -11,6 +11,10 @@
namespace leveldb {
+// A utility routine: write "data" to the named file and Sync() it.
+extern Status WriteStringToFileSync(Env* env, const Slice& data,
+ const std::string& fname);
+
static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) {
char buf[100];
@@ -122,7 +126,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
- Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp);
+ Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp);
if (s.ok()) {
s = env->RenameFile(tmp, CurrentFileName(dbname));
}
diff --git a/util/env.cc b/util/env.cc
index 594811b..c2600e9 100644
--- a/util/env.cc
+++ b/util/env.cc
@@ -33,14 +33,18 @@ void Log(Logger* info_log, const char* format, ...) {
}
}
-Status WriteStringToFile(Env* env, const Slice& data,
- const std::string& fname) {
+static Status DoWriteStringToFile(Env* env, const Slice& data,
+ const std::string& fname,
+ bool should_sync) {
WritableFile* file;
Status s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
s = file->Append(data);
+ if (s.ok() && should_sync) {
+ s = file->Sync();
+ }
if (s.ok()) {
s = file->Close();
}
@@ -51,6 +55,16 @@ Status WriteStringToFile(Env* env, const Slice& data,
return s;
}
+Status WriteStringToFile(Env* env, const Slice& data,
+ const std::string& fname) {
+ return DoWriteStringToFile(env, data, fname, false);
+}
+
+Status WriteStringToFileSync(Env* env, const Slice& data,
+ const std::string& fname) {
+ return DoWriteStringToFile(env, data, fname, true);
+}
+
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
data->clear();
SequentialFile* file;
diff --git a/util/env_test.cc b/util/env_test.cc
index 3f8a8a2..b72cb44 100644
--- a/util/env_test.cc
+++ b/util/env_test.cc
@@ -22,29 +22,30 @@ class EnvPosixTest {
};
static void SetBool(void* ptr) {
- *(reinterpret_cast<bool*>(ptr)) = true;
+ reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
}
TEST(EnvPosixTest, RunImmediately) {
- bool called = false;
+ port::AtomicPointer called (NULL);
env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros);
- ASSERT_TRUE(called);
+ ASSERT_TRUE(called.NoBarrier_Load() != NULL);
}
TEST(EnvPosixTest, RunMany) {
- int last_id = 0;
+ port::AtomicPointer last_id (NULL);
struct CB {
- int* last_id_ptr; // Pointer to shared slot
- int id; // Order# for the execution of this callback
+ port::AtomicPointer* last_id_ptr; // Pointer to shared slot
+ uintptr_t id; // Order# for the execution of this callback
- CB(int* p, int i) : last_id_ptr(p), id(i) { }
+ CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }
static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v);
- ASSERT_EQ(cb->id-1, *cb->last_id_ptr);
- *cb->last_id_ptr = cb->id;
+ void* cur = cb->last_id_ptr->NoBarrier_Load();
+ ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
+ cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
}
};
@@ -59,7 +60,8 @@ TEST(EnvPosixTest, RunMany) {
env_->Schedule(&CB::Run, &cb4);
Env::Default()->SleepForMicroseconds(kDelayMicros);
- ASSERT_EQ(4, last_id);
+ void* cur = last_id.Acquire_Load();
+ ASSERT_EQ(4, reinterpret_cast<uintptr_t>(cur));
}
struct State {