summaryrefslogtreecommitdiff
path: root/lib/python2.7/bsddb/test/test_associate.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/bsddb/test/test_associate.py')
-rw-r--r--lib/python2.7/bsddb/test/test_associate.py465
1 files changed, 0 insertions, 465 deletions
diff --git a/lib/python2.7/bsddb/test/test_associate.py b/lib/python2.7/bsddb/test/test_associate.py
deleted file mode 100644
index 7a49e11..0000000
--- a/lib/python2.7/bsddb/test/test_associate.py
+++ /dev/null
@@ -1,465 +0,0 @@
-"""
-TestCases for DB.associate.
-"""
-
-import sys, os, string
-import time
-from pprint import pprint
-
-import unittest
-from test_all import db, dbshelve, test_support, verbose, have_threads, \
- get_new_environment_path
-
-
-#----------------------------------------------------------------------
-
-
-musicdata = {
-1 : ("Bad English", "The Price Of Love", "Rock"),
-2 : ("DNA featuring Suzanne Vega", "Tom's Diner", "Rock"),
-3 : ("George Michael", "Praying For Time", "Rock"),
-4 : ("Gloria Estefan", "Here We Are", "Rock"),
-5 : ("Linda Ronstadt", "Don't Know Much", "Rock"),
-6 : ("Michael Bolton", "How Am I Supposed To Live Without You", "Blues"),
-7 : ("Paul Young", "Oh Girl", "Rock"),
-8 : ("Paula Abdul", "Opposites Attract", "Rock"),
-9 : ("Richard Marx", "Should've Known Better", "Rock"),
-10: ("Rod Stewart", "Forever Young", "Rock"),
-11: ("Roxette", "Dangerous", "Rock"),
-12: ("Sheena Easton", "The Lover In Me", "Rock"),
-13: ("Sinead O'Connor", "Nothing Compares 2 U", "Rock"),
-14: ("Stevie B.", "Because I Love You", "Rock"),
-15: ("Taylor Dayne", "Love Will Lead You Back", "Rock"),
-16: ("The Bangles", "Eternal Flame", "Rock"),
-17: ("Wilson Phillips", "Release Me", "Rock"),
-18: ("Billy Joel", "Blonde Over Blue", "Rock"),
-19: ("Billy Joel", "Famous Last Words", "Rock"),
-20: ("Billy Joel", "Lullabye (Goodnight, My Angel)", "Rock"),
-21: ("Billy Joel", "The River Of Dreams", "Rock"),
-22: ("Billy Joel", "Two Thousand Years", "Rock"),
-23: ("Janet Jackson", "Alright", "Rock"),
-24: ("Janet Jackson", "Black Cat", "Rock"),
-25: ("Janet Jackson", "Come Back To Me", "Rock"),
-26: ("Janet Jackson", "Escapade", "Rock"),
-27: ("Janet Jackson", "Love Will Never Do (Without You)", "Rock"),
-28: ("Janet Jackson", "Miss You Much", "Rock"),
-29: ("Janet Jackson", "Rhythm Nation", "Rock"),
-30: ("Janet Jackson", "State Of The World", "Rock"),
-31: ("Janet Jackson", "The Knowledge", "Rock"),
-32: ("Spyro Gyra", "End of Romanticism", "Jazz"),
-33: ("Spyro Gyra", "Heliopolis", "Jazz"),
-34: ("Spyro Gyra", "Jubilee", "Jazz"),
-35: ("Spyro Gyra", "Little Linda", "Jazz"),
-36: ("Spyro Gyra", "Morning Dance", "Jazz"),
-37: ("Spyro Gyra", "Song for Lorraine", "Jazz"),
-38: ("Yes", "Owner Of A Lonely Heart", "Rock"),
-39: ("Yes", "Rhythm Of Love", "Rock"),
-40: ("Cusco", "Dream Catcher", "New Age"),
-41: ("Cusco", "Geronimos Laughter", "New Age"),
-42: ("Cusco", "Ghost Dance", "New Age"),
-43: ("Blue Man Group", "Drumbone", "New Age"),
-44: ("Blue Man Group", "Endless Column", "New Age"),
-45: ("Blue Man Group", "Klein Mandelbrot", "New Age"),
-46: ("Kenny G", "Silhouette", "Jazz"),
-47: ("Sade", "Smooth Operator", "Jazz"),
-48: ("David Arkenstone", "Papillon (On The Wings Of The Butterfly)",
- "New Age"),
-49: ("David Arkenstone", "Stepping Stars", "New Age"),
-50: ("David Arkenstone", "Carnation Lily Lily Rose", "New Age"),
-51: ("David Lanz", "Behind The Waterfall", "New Age"),
-52: ("David Lanz", "Cristofori's Dream", "New Age"),
-53: ("David Lanz", "Heartsounds", "New Age"),
-54: ("David Lanz", "Leaves on the Seine", "New Age"),
-99: ("unknown artist", "Unnamed song", "Unknown"),
-}
-
-#----------------------------------------------------------------------
-
-class AssociateErrorTestCase(unittest.TestCase):
- def setUp(self):
- self.filename = self.__class__.__name__ + '.db'
- self.homeDir = get_new_environment_path()
- self.env = db.DBEnv()
- self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
-
- def tearDown(self):
- self.env.close()
- self.env = None
- test_support.rmtree(self.homeDir)
-
- def test00_associateDBError(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test00_associateDBError..." % \
- self.__class__.__name__
-
- dupDB = db.DB(self.env)
- dupDB.set_flags(db.DB_DUP)
- dupDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE)
-
- secDB = db.DB(self.env)
- secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
-
- # dupDB has been configured to allow duplicates, it can't
- # associate with a secondary. Berkeley DB will return an error.
- try:
- def f(a,b): return a+b
- dupDB.associate(secDB, f)
- except db.DBError:
- # good
- secDB.close()
- dupDB.close()
- else:
- secDB.close()
- dupDB.close()
- self.fail("DBError exception was expected")
-
-
-
-#----------------------------------------------------------------------
-
-
-class AssociateTestCase(unittest.TestCase):
- keytype = ''
- envFlags = 0
- dbFlags = 0
-
- def setUp(self):
- self.filename = self.__class__.__name__ + '.db'
- self.homeDir = get_new_environment_path()
- self.env = db.DBEnv()
- self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
- db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags)
-
- def tearDown(self):
- self.closeDB()
- self.env.close()
- self.env = None
- test_support.rmtree(self.homeDir)
-
- def addDataToDB(self, d, txn=None):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = "%02d" % key
- d.put(key, '|'.join(value), txn=txn)
-
- def createDB(self, txn=None):
- self.cur = None
- self.secDB = None
- self.primary = db.DB(self.env)
- self.primary.set_get_returns_none(2)
- self.primary.open(self.filename, "primary", self.dbtype,
- db.DB_CREATE | db.DB_THREAD | self.dbFlags, txn=txn)
-
- def closeDB(self):
- if self.cur:
- self.cur.close()
- self.cur = None
- if self.secDB:
- self.secDB.close()
- self.secDB = None
- self.primary.close()
- self.primary = None
-
- def getDB(self):
- return self.primary
-
-
- def _associateWithDB(self, getGenre):
- self.createDB()
-
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.set_get_returns_none(2)
- self.secDB.open(self.filename, "secondary", db.DB_BTREE,
- db.DB_CREATE | db.DB_THREAD | self.dbFlags)
- self.getDB().associate(self.secDB, getGenre)
-
- self.addDataToDB(self.getDB())
-
- self.finish_test(self.secDB)
-
- def test01_associateWithDB(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test01_associateWithDB..." % \
- self.__class__.__name__
-
- return self._associateWithDB(self.getGenre)
-
- def _associateAfterDB(self, getGenre) :
- self.createDB()
- self.addDataToDB(self.getDB())
-
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.open(self.filename, "secondary", db.DB_BTREE,
- db.DB_CREATE | db.DB_THREAD | self.dbFlags)
-
- # adding the DB_CREATE flag will cause it to index existing records
- self.getDB().associate(self.secDB, getGenre, db.DB_CREATE)
-
- self.finish_test(self.secDB)
-
- def test02_associateAfterDB(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test02_associateAfterDB..." % \
- self.__class__.__name__
-
- return self._associateAfterDB(self.getGenre)
-
- if db.version() >= (4, 6):
- def test03_associateWithDB(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test03_associateWithDB..." % \
- self.__class__.__name__
-
- return self._associateWithDB(self.getGenreList)
-
- def test04_associateAfterDB(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test04_associateAfterDB..." % \
- self.__class__.__name__
-
- return self._associateAfterDB(self.getGenreList)
-
-
- def finish_test(self, secDB, txn=None):
- # 'Blues' should not be in the secondary database
- vals = secDB.pget('Blues', txn=txn)
- self.assertEqual(vals, None, vals)
-
- vals = secDB.pget('Unknown', txn=txn)
- self.assertTrue(vals[0] == 99 or vals[0] == '99', vals)
- vals[1].index('Unknown')
- vals[1].index('Unnamed')
- vals[1].index('unknown')
-
- if verbose:
- print "Primary key traversal:"
- self.cur = self.getDB().cursor(txn)
- count = 0
- rec = self.cur.first()
- while rec is not None:
- if type(self.keytype) == type(''):
- self.assertTrue(int(rec[0])) # for primary db, key is a number
- else:
- self.assertTrue(rec[0] and type(rec[0]) == type(0))
- count = count + 1
- if verbose:
- print rec
- rec = getattr(self.cur, "next")()
- self.assertEqual(count, len(musicdata)) # all items accounted for
-
-
- if verbose:
- print "Secondary key traversal:"
- self.cur = secDB.cursor(txn)
- count = 0
-
- # test cursor pget
- vals = self.cur.pget('Unknown', flags=db.DB_LAST)
- self.assertTrue(vals[1] == 99 or vals[1] == '99', vals)
- self.assertEqual(vals[0], 'Unknown')
- vals[2].index('Unknown')
- vals[2].index('Unnamed')
- vals[2].index('unknown')
-
- vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH)
- self.assertEqual(vals, None, vals)
-
- rec = self.cur.first()
- self.assertEqual(rec[0], "Jazz")
- while rec is not None:
- count = count + 1
- if verbose:
- print rec
- rec = getattr(self.cur, "next")()
- # all items accounted for EXCEPT for 1 with "Blues" genre
- self.assertEqual(count, len(musicdata)-1)
-
- self.cur = None
-
- def getGenre(self, priKey, priData):
- self.assertEqual(type(priData), type(""))
- genre = priData.split('|')[2]
-
- if verbose:
- print 'getGenre key: %r data: %r' % (priKey, priData)
-
- if genre == 'Blues':
- return db.DB_DONOTINDEX
- else:
- return genre
-
- def getGenreList(self, priKey, PriData) :
- v = self.getGenre(priKey, PriData)
- if type(v) == type("") :
- v = [v]
- return v
-
-
-#----------------------------------------------------------------------
-
-
-class AssociateHashTestCase(AssociateTestCase):
- dbtype = db.DB_HASH
-
-class AssociateBTreeTestCase(AssociateTestCase):
- dbtype = db.DB_BTREE
-
-class AssociateRecnoTestCase(AssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-#----------------------------------------------------------------------
-
-class AssociateBTreeTxnTestCase(AssociateBTreeTestCase):
- envFlags = db.DB_INIT_TXN
- dbFlags = 0
-
- def txn_finish_test(self, sDB, txn):
- try:
- self.finish_test(sDB, txn=txn)
- finally:
- if self.cur:
- self.cur.close()
- self.cur = None
- if txn:
- txn.commit()
-
- def test13_associate_in_transaction(self):
- if verbose:
- print '\n', '-=' * 30
- print "Running %s.test13_associateAutoCommit..." % \
- self.__class__.__name__
-
- txn = self.env.txn_begin()
- try:
- self.createDB(txn=txn)
-
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.set_get_returns_none(2)
- self.secDB.open(self.filename, "secondary", db.DB_BTREE,
- db.DB_CREATE | db.DB_THREAD, txn=txn)
- self.getDB().associate(self.secDB, self.getGenre, txn=txn)
-
- self.addDataToDB(self.getDB(), txn=txn)
- except:
- txn.abort()
- raise
-
- self.txn_finish_test(self.secDB, txn=txn)
-
-
-#----------------------------------------------------------------------
-
-class ShelveAssociateTestCase(AssociateTestCase):
-
- def createDB(self):
- self.primary = dbshelve.open(self.filename,
- dbname="primary",
- dbenv=self.env,
- filetype=self.dbtype)
-
- def addDataToDB(self, d):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = "%02d" % key
- d.put(key, value) # save the value as is this time
-
-
- def getGenre(self, priKey, priData):
- self.assertEqual(type(priData), type(()))
- if verbose:
- print 'getGenre key: %r data: %r' % (priKey, priData)
- genre = priData[2]
- if genre == 'Blues':
- return db.DB_DONOTINDEX
- else:
- return genre
-
-
-class ShelveAssociateHashTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_HASH
-
-class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_BTREE
-
-class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-
-#----------------------------------------------------------------------
-
-class ThreadedAssociateTestCase(AssociateTestCase):
-
- def addDataToDB(self, d):
- t1 = Thread(target = self.writer1,
- args = (d, ))
- t2 = Thread(target = self.writer2,
- args = (d, ))
-
- t1.setDaemon(True)
- t2.setDaemon(True)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
-
- def writer1(self, d):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = "%02d" % key
- d.put(key, '|'.join(value))
-
- def writer2(self, d):
- for x in range(100, 600):
- key = 'z%2d' % x
- value = [key] * 4
- d.put(key, '|'.join(value))
-
-
-class ThreadedAssociateHashTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_HASH
-
-class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_BTREE
-
-class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-
-#----------------------------------------------------------------------
-
-def test_suite():
- suite = unittest.TestSuite()
-
- suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
-
- suite.addTest(unittest.makeSuite(AssociateHashTestCase))
- suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
-
- suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
-
- suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
-
- if have_threads:
- suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
-
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')