summaryrefslogtreecommitdiff
path: root/lib/python2.7/test/test_bsddb.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/test/test_bsddb.py')
-rwxr-xr-xlib/python2.7/test/test_bsddb.py357
1 files changed, 357 insertions, 0 deletions
diff --git a/lib/python2.7/test/test_bsddb.py b/lib/python2.7/test/test_bsddb.py
new file mode 100755
index 0000000..d1ee0a1
--- /dev/null
+++ b/lib/python2.7/test/test_bsddb.py
@@ -0,0 +1,357 @@
+#! /usr/bin/env python
+"""Test script for the bsddb C module by Roger E. Masse
+ Adapted to unittest format and expanded scope by Raymond Hettinger
+"""
+import os, sys
+import unittest
+from test import test_support
+
+# Skip test if _bsddb wasn't built.
+test_support.import_module('_bsddb')
+
+bsddb = test_support.import_module('bsddb', deprecated=True)
+# Just so we know it's imported:
+test_support.import_module('dbhash', deprecated=True)
+
+
+class TestBSDDB(unittest.TestCase):
+ openflag = 'c'
+
+ def setUp(self):
+ self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
+ self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
+ for k, v in self.d.iteritems():
+ self.f[k] = v
+
+ def tearDown(self):
+ self.f.sync()
+ self.f.close()
+ if self.fname is None:
+ return
+ try:
+ os.remove(self.fname)
+ except os.error:
+ pass
+
+ def test_getitem(self):
+ for k, v in self.d.iteritems():
+ self.assertEqual(self.f[k], v)
+
+ def test_len(self):
+ self.assertEqual(len(self.f), len(self.d))
+
+ def test_change(self):
+ self.f['r'] = 'discovered'
+ self.assertEqual(self.f['r'], 'discovered')
+ self.assertIn('r', self.f.keys())
+ self.assertIn('discovered', self.f.values())
+
+ def test_close_and_reopen(self):
+ if self.fname is None:
+ # if we're using an in-memory only db, we can't reopen it
+ # so finish here.
+ return
+ self.f.close()
+ self.f = self.openmethod[0](self.fname, 'w')
+ for k, v in self.d.iteritems():
+ self.assertEqual(self.f[k], v)
+
+ def assertSetEquals(self, seqn1, seqn2):
+ self.assertEqual(set(seqn1), set(seqn2))
+
+ def test_mapping_iteration_methods(self):
+ f = self.f
+ d = self.d
+ self.assertSetEquals(d, f)
+ self.assertSetEquals(d.keys(), f.keys())
+ self.assertSetEquals(d.values(), f.values())
+ self.assertSetEquals(d.items(), f.items())
+ self.assertSetEquals(d.iterkeys(), f.iterkeys())
+ self.assertSetEquals(d.itervalues(), f.itervalues())
+ self.assertSetEquals(d.iteritems(), f.iteritems())
+
+ def test_iter_while_modifying_values(self):
+ di = iter(self.d)
+ while 1:
+ try:
+ key = di.next()
+ self.d[key] = 'modified '+key
+ except StopIteration:
+ break
+
+ # it should behave the same as a dict. modifying values
+ # of existing keys should not break iteration. (adding
+ # or removing keys should)
+ loops_left = len(self.f)
+ fi = iter(self.f)
+ while 1:
+ try:
+ key = fi.next()
+ self.f[key] = 'modified '+key
+ loops_left -= 1
+ except StopIteration:
+ break
+ self.assertEqual(loops_left, 0)
+
+ self.test_mapping_iteration_methods()
+
+ def test_iter_abort_on_changed_size(self):
+ def DictIterAbort():
+ di = iter(self.d)
+ while 1:
+ try:
+ di.next()
+ self.d['newkey'] = 'SPAM'
+ except StopIteration:
+ break
+ self.assertRaises(RuntimeError, DictIterAbort)
+
+ def DbIterAbort():
+ fi = iter(self.f)
+ while 1:
+ try:
+ fi.next()
+ self.f['newkey'] = 'SPAM'
+ except StopIteration:
+ break
+ self.assertRaises(RuntimeError, DbIterAbort)
+
+ def test_iteritems_abort_on_changed_size(self):
+ def DictIteritemsAbort():
+ di = self.d.iteritems()
+ while 1:
+ try:
+ di.next()
+ self.d['newkey'] = 'SPAM'
+ except StopIteration:
+ break
+ self.assertRaises(RuntimeError, DictIteritemsAbort)
+
+ def DbIteritemsAbort():
+ fi = self.f.iteritems()
+ while 1:
+ try:
+ key, value = fi.next()
+ del self.f[key]
+ except StopIteration:
+ break
+ self.assertRaises(RuntimeError, DbIteritemsAbort)
+
+ def test_iteritems_while_modifying_values(self):
+ di = self.d.iteritems()
+ while 1:
+ try:
+ k, v = di.next()
+ self.d[k] = 'modified '+v
+ except StopIteration:
+ break
+
+ # it should behave the same as a dict. modifying values
+ # of existing keys should not break iteration. (adding
+ # or removing keys should)
+ loops_left = len(self.f)
+ fi = self.f.iteritems()
+ while 1:
+ try:
+ k, v = fi.next()
+ self.f[k] = 'modified '+v
+ loops_left -= 1
+ except StopIteration:
+ break
+ self.assertEqual(loops_left, 0)
+
+ self.test_mapping_iteration_methods()
+
+ def test_first_next_looping(self):
+ items = [self.f.first()]
+ for i in xrange(1, len(self.f)):
+ items.append(self.f.next())
+ self.assertSetEquals(items, self.d.items())
+
+ def test_previous_last_looping(self):
+ items = [self.f.last()]
+ for i in xrange(1, len(self.f)):
+ items.append(self.f.previous())
+ self.assertSetEquals(items, self.d.items())
+
+ def test_first_while_deleting(self):
+ # Test for bug 1725856
+ self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
+ for _ in self.d:
+ key = self.f.first()[0]
+ del self.f[key]
+ self.assertEqual([], self.f.items(), "expected empty db after test")
+
+ def test_last_while_deleting(self):
+ # Test for bug 1725856's evil twin
+ self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
+ for _ in self.d:
+ key = self.f.last()[0]
+ del self.f[key]
+ self.assertEqual([], self.f.items(), "expected empty db after test")
+
+ def test_set_location(self):
+ self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
+
+ def test_contains(self):
+ for k in self.d:
+ self.assertIn(k, self.f)
+ self.assertNotIn('not here', self.f)
+
+ def test_has_key(self):
+ for k in self.d:
+ self.assertTrue(self.f.has_key(k))
+ self.assertTrue(not self.f.has_key('not here'))
+
+ def test_clear(self):
+ self.f.clear()
+ self.assertEqual(len(self.f), 0)
+
+ def test__no_deadlock_first(self, debug=0):
+ # do this so that testers can see what function we're in in
+ # verbose mode when we deadlock.
+ sys.stdout.flush()
+
+ # in pybsddb's _DBWithCursor this causes an internal DBCursor
+ # object is created. Other test_ methods in this class could
+ # inadvertently cause the deadlock but an explicit test is needed.
+ if debug: print "A"
+ k,v = self.f.first()
+ if debug: print "B", k
+ self.f[k] = "deadlock. do not pass go. do not collect $200."
+ if debug: print "C"
+ # if the bsddb implementation leaves the DBCursor open during
+ # the database write and locking+threading support is enabled
+ # the cursor's read lock will deadlock the write lock request..
+
+ # test the iterator interface
+ if True:
+ if debug: print "D"
+ i = self.f.iteritems()
+ k,v = i.next()
+ if debug: print "E"
+ self.f[k] = "please don't deadlock"
+ if debug: print "F"
+ while 1:
+ try:
+ k,v = i.next()
+ except StopIteration:
+ break
+ if debug: print "F2"
+
+ i = iter(self.f)
+ if debug: print "G"
+ while i:
+ try:
+ if debug: print "H"
+ k = i.next()
+ if debug: print "I"
+ self.f[k] = "deadlocks-r-us"
+ if debug: print "J"
+ except StopIteration:
+ i = None
+ if debug: print "K"
+
+ # test the legacy cursor interface mixed with writes
+ self.assertIn(self.f.first()[0], self.d)
+ k = self.f.next()[0]
+ self.assertIn(k, self.d)
+ self.f[k] = "be gone with ye deadlocks"
+ self.assertTrue(self.f[k], "be gone with ye deadlocks")
+
+ def test_for_cursor_memleak(self):
+ # do the bsddb._DBWithCursor iterator internals leak cursors?
+ nc1 = len(self.f._cursor_refs)
+ # create iterator
+ i = self.f.iteritems()
+ nc2 = len(self.f._cursor_refs)
+ # use the iterator (should run to the first yield, creating the cursor)
+ k, v = i.next()
+ nc3 = len(self.f._cursor_refs)
+ # destroy the iterator; this should cause the weakref callback
+ # to remove the cursor object from self.f._cursor_refs
+ del i
+ nc4 = len(self.f._cursor_refs)
+
+ self.assertEqual(nc1, nc2)
+ self.assertEqual(nc1, nc4)
+ self.assertTrue(nc3 == nc1+1)
+
+ def test_popitem(self):
+ k, v = self.f.popitem()
+ self.assertIn(k, self.d)
+ self.assertIn(v, self.d.values())
+ self.assertNotIn(k, self.f)
+ self.assertEqual(len(self.d)-1, len(self.f))
+
+ def test_pop(self):
+ k = 'w'
+ v = self.f.pop(k)
+ self.assertEqual(v, self.d[k])
+ self.assertNotIn(k, self.f)
+ self.assertNotIn(v, self.f.values())
+ self.assertEqual(len(self.d)-1, len(self.f))
+
+ def test_get(self):
+ self.assertEqual(self.f.get('NotHere'), None)
+ self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
+ self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
+
+ def test_setdefault(self):
+ self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
+ self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
+
+ def test_update(self):
+ new = dict(y='life', u='of', i='brian')
+ self.f.update(new)
+ self.d.update(new)
+ for k, v in self.d.iteritems():
+ self.assertEqual(self.f[k], v)
+
+ def test_keyordering(self):
+ if self.openmethod[0] is not bsddb.btopen:
+ return
+ keys = self.d.keys()
+ keys.sort()
+ self.assertEqual(self.f.first()[0], keys[0])
+ self.assertEqual(self.f.next()[0], keys[1])
+ self.assertEqual(self.f.last()[0], keys[-1])
+ self.assertEqual(self.f.previous()[0], keys[-2])
+ self.assertEqual(list(self.f), keys)
+
+class TestBTree(TestBSDDB):
+ fname = test_support.TESTFN
+ openmethod = [bsddb.btopen]
+
+class TestBTree_InMemory(TestBSDDB):
+ fname = None
+ openmethod = [bsddb.btopen]
+
+class TestBTree_InMemory_Truncate(TestBSDDB):
+ fname = None
+ openflag = 'n'
+ openmethod = [bsddb.btopen]
+
+class TestHashTable(TestBSDDB):
+ fname = test_support.TESTFN
+ openmethod = [bsddb.hashopen]
+
+class TestHashTable_InMemory(TestBSDDB):
+ fname = None
+ openmethod = [bsddb.hashopen]
+
+## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
+## # appears broken... at least on
+## # Solaris Intel - rmasse 1/97
+
+def test_main(verbose=None):
+ test_support.run_unittest(
+ TestBTree,
+ TestHashTable,
+ TestBTree_InMemory,
+ TestHashTable_InMemory,
+ TestBTree_InMemory_Truncate,
+ )
+
+if __name__ == "__main__":
+ test_main(verbose=True)