diff options
Diffstat (limited to 'lib/python2.7/bsddb/test/test_all.py')
-rw-r--r-- | lib/python2.7/bsddb/test/test_all.py | 623 |
1 files changed, 0 insertions, 623 deletions
diff --git a/lib/python2.7/bsddb/test/test_all.py b/lib/python2.7/bsddb/test/test_all.py deleted file mode 100644 index caef1ac..0000000 --- a/lib/python2.7/bsddb/test/test_all.py +++ /dev/null @@ -1,623 +0,0 @@ -"""Run all test cases. -""" - -import sys -import os -import unittest -try: - # For Pythons w/distutils pybsddb - import bsddb3 as bsddb -except ImportError: - # For Python 2.3 - import bsddb - - -if sys.version_info[0] >= 3 : - charset = "iso8859-1" # Full 8 bit - - class logcursor_py3k(object) : - def __init__(self, env) : - self._logcursor = env.log_cursor() - - def __getattr__(self, v) : - return getattr(self._logcursor, v) - - def __next__(self) : - v = getattr(self._logcursor, "next")() - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - next = __next__ - - def first(self) : - v = self._logcursor.first() - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - def last(self) : - v = self._logcursor.last() - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - def prev(self) : - v = self._logcursor.prev() - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - def current(self) : - v = self._logcursor.current() - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - def set(self, lsn) : - v = self._logcursor.set(lsn) - if v is not None : - v = (v[0], v[1].decode(charset)) - return v - - class cursor_py3k(object) : - def __init__(self, db, *args, **kwargs) : - self._dbcursor = db.cursor(*args, **kwargs) - - def __getattr__(self, v) : - return getattr(self._dbcursor, v) - - def _fix(self, v) : - if v is None : return None - key, value = v - if isinstance(key, bytes) : - key = key.decode(charset) - return (key, value.decode(charset)) - - def __next__(self) : - v = getattr(self._dbcursor, "next")() - return self._fix(v) - - next = __next__ - - def previous(self) : - v = self._dbcursor.previous() - return self._fix(v) - - def last(self) : - v = self._dbcursor.last() - return self._fix(v) - - def set(self, k) : - if isinstance(k, str) : - k = bytes(k, charset) - v = self._dbcursor.set(k) - return self._fix(v) - - def set_recno(self, num) : - v = self._dbcursor.set_recno(num) - return self._fix(v) - - def set_range(self, k, dlen=-1, doff=-1) : - if isinstance(k, str) : - k = bytes(k, charset) - v = self._dbcursor.set_range(k, dlen=dlen, doff=doff) - return self._fix(v) - - def dup(self, flags=0) : - cursor = self._dbcursor.dup(flags) - return dup_cursor_py3k(cursor) - - def next_dup(self) : - v = self._dbcursor.next_dup() - return self._fix(v) - - def next_nodup(self) : - v = self._dbcursor.next_nodup() - return self._fix(v) - - def put(self, key, data, flags=0, dlen=-1, doff=-1) : - if isinstance(key, str) : - key = bytes(key, charset) - if isinstance(data, str) : - value = bytes(data, charset) - return self._dbcursor.put(key, data, flags=flags, dlen=dlen, - doff=doff) - - def current(self, flags=0, dlen=-1, doff=-1) : - v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff) - return self._fix(v) - - def first(self) : - v = self._dbcursor.first() - return self._fix(v) - - def pget(self, key=None, data=None, flags=0) : - # Incorrect because key can be a bare number, - # but enough to pass testsuite - if isinstance(key, int) and (data is None) and (flags == 0) : - flags = key - key = None - if isinstance(key, str) : - key = bytes(key, charset) - if isinstance(data, int) and (flags==0) : - flags = data - data = None - if isinstance(data, str) : - data = bytes(data, charset) - v=self._dbcursor.pget(key=key, data=data, flags=flags) - if v is not None : - v1, v2, v3 = v - if isinstance(v1, bytes) : - v1 = v1.decode(charset) - if isinstance(v2, bytes) : - v2 = v2.decode(charset) - - v = (v1, v2, v3.decode(charset)) - - return v - - def join_item(self) : - v = self._dbcursor.join_item() - if v is not None : - v = v.decode(charset) - return v - - def get(self, *args, **kwargs) : - l = len(args) - if l == 2 : - k, f = args - if isinstance(k, str) : - k = bytes(k, "iso8859-1") - args = (k, f) - elif l == 3 : - k, d, f = args - if isinstance(k, str) : - k = bytes(k, charset) - if isinstance(d, str) : - d = bytes(d, charset) - args =(k, d, f) - - v = self._dbcursor.get(*args, **kwargs) - if v is not None : - k, v = v - if isinstance(k, bytes) : - k = k.decode(charset) - v = (k, v.decode(charset)) - return v - - def get_both(self, key, value) : - if isinstance(key, str) : - key = bytes(key, charset) - if isinstance(value, str) : - value = bytes(value, charset) - v=self._dbcursor.get_both(key, value) - return self._fix(v) - - class dup_cursor_py3k(cursor_py3k) : - def __init__(self, dbcursor) : - self._dbcursor = dbcursor - - class DB_py3k(object) : - def __init__(self, *args, **kwargs) : - args2=[] - for i in args : - if isinstance(i, DBEnv_py3k) : - i = i._dbenv - args2.append(i) - args = tuple(args2) - for k, v in kwargs.items() : - if isinstance(v, DBEnv_py3k) : - kwargs[k] = v._dbenv - - self._db = bsddb._db.DB_orig(*args, **kwargs) - - def __contains__(self, k) : - if isinstance(k, str) : - k = bytes(k, charset) - return getattr(self._db, "has_key")(k) - - def __getitem__(self, k) : - if isinstance(k, str) : - k = bytes(k, charset) - v = self._db[k] - if v is not None : - v = v.decode(charset) - return v - - def __setitem__(self, k, v) : - if isinstance(k, str) : - k = bytes(k, charset) - if isinstance(v, str) : - v = bytes(v, charset) - self._db[k] = v - - def __delitem__(self, k) : - if isinstance(k, str) : - k = bytes(k, charset) - del self._db[k] - - def __getattr__(self, v) : - return getattr(self._db, v) - - def __len__(self) : - return len(self._db) - - def has_key(self, k, txn=None) : - if isinstance(k, str) : - k = bytes(k, charset) - return self._db.has_key(k, txn=txn) - - def set_re_delim(self, c) : - if isinstance(c, str) : # We can use a numeric value byte too - c = bytes(c, charset) - return self._db.set_re_delim(c) - - def set_re_pad(self, c) : - if isinstance(c, str) : # We can use a numeric value byte too - c = bytes(c, charset) - return self._db.set_re_pad(c) - - def get_re_source(self) : - source = self._db.get_re_source() - return source.decode(charset) - - def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) : - if isinstance(key, str) : - key = bytes(key, charset) - if isinstance(data, str) : - value = bytes(data, charset) - return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen, - doff=doff) - - def append(self, value, txn=None) : - if isinstance(value, str) : - value = bytes(value, charset) - return self._db.append(value, txn=txn) - - def get_size(self, key) : - if isinstance(key, str) : - key = bytes(key, charset) - return self._db.get_size(key) - - def exists(self, key, *args, **kwargs) : - if isinstance(key, str) : - key = bytes(key, charset) - return self._db.exists(key, *args, **kwargs) - - def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) : - if isinstance(key, str) : - key = bytes(key, charset) - if default != "MagicCookie" : # Magic for 'test_get_none.py' - v=self._db.get(key, default=default, txn=txn, flags=flags, - dlen=dlen, doff=doff) - else : - v=self._db.get(key, txn=txn, flags=flags, - dlen=dlen, doff=doff) - if (v is not None) and isinstance(v, bytes) : - v = v.decode(charset) - return v - - def pget(self, key, txn=None) : - if isinstance(key, str) : - key = bytes(key, charset) - v=self._db.pget(key, txn=txn) - if v is not None : - v1, v2 = v - if isinstance(v1, bytes) : - v1 = v1.decode(charset) - - v = (v1, v2.decode(charset)) - return v - - def get_both(self, key, value, txn=None, flags=0) : - if isinstance(key, str) : - key = bytes(key, charset) - if isinstance(value, str) : - value = bytes(value, charset) - v=self._db.get_both(key, value, txn=txn, flags=flags) - if v is not None : - v = v.decode(charset) - return v - - def delete(self, key, txn=None) : - if isinstance(key, str) : - key = bytes(key, charset) - return self._db.delete(key, txn=txn) - - def keys(self) : - k = self._db.keys() - if len(k) and isinstance(k[0], bytes) : - return [i.decode(charset) for i in self._db.keys()] - else : - return k - - def items(self) : - data = self._db.items() - if not len(data) : return data - data2 = [] - for k, v in data : - if isinstance(k, bytes) : - k = k.decode(charset) - data2.append((k, v.decode(charset))) - return data2 - - def associate(self, secondarydb, callback, flags=0, txn=None) : - class associate_callback(object) : - def __init__(self, callback) : - self._callback = callback - - def callback(self, key, data) : - if isinstance(key, str) : - key = key.decode(charset) - data = data.decode(charset) - key = self._callback(key, data) - if (key != bsddb._db.DB_DONOTINDEX) : - if isinstance(key, str) : - key = bytes(key, charset) - elif isinstance(key, list) : - key2 = [] - for i in key : - if isinstance(i, str) : - i = bytes(i, charset) - key2.append(i) - key = key2 - return key - - return self._db.associate(secondarydb._db, - associate_callback(callback).callback, flags=flags, - txn=txn) - - def cursor(self, txn=None, flags=0) : - return cursor_py3k(self._db, txn=txn, flags=flags) - - def join(self, cursor_list) : - cursor_list = [i._dbcursor for i in cursor_list] - return dup_cursor_py3k(self._db.join(cursor_list)) - - class DBEnv_py3k(object) : - def __init__(self, *args, **kwargs) : - self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs) - - def __getattr__(self, v) : - return getattr(self._dbenv, v) - - def log_cursor(self, flags=0) : - return logcursor_py3k(self._dbenv) - - def get_lg_dir(self) : - return self._dbenv.get_lg_dir().decode(charset) - - def get_tmp_dir(self) : - return self._dbenv.get_tmp_dir().decode(charset) - - def get_data_dirs(self) : - return tuple( - (i.decode(charset) for i in self._dbenv.get_data_dirs())) - - class DBSequence_py3k(object) : - def __init__(self, db, *args, **kwargs) : - self._db=db - self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs) - - def __getattr__(self, v) : - return getattr(self._dbsequence, v) - - def open(self, key, *args, **kwargs) : - return self._dbsequence.open(bytes(key, charset), *args, **kwargs) - - def get_key(self) : - return self._dbsequence.get_key().decode(charset) - - def get_dbp(self) : - return self._db - - import string - string.letters=[chr(i) for i in xrange(65,91)] - - bsddb._db.DBEnv_orig = bsddb._db.DBEnv - bsddb._db.DB_orig = bsddb._db.DB - if bsddb.db.version() <= (4, 3) : - bsddb._db.DBSequence_orig = None - else : - bsddb._db.DBSequence_orig = bsddb._db.DBSequence - - def do_proxy_db_py3k(flag) : - flag2 = do_proxy_db_py3k.flag - do_proxy_db_py3k.flag = flag - if flag : - bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k - bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k - bsddb._db.DBSequence = DBSequence_py3k - else : - bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig - bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig - bsddb._db.DBSequence = bsddb._db.DBSequence_orig - return flag2 - - do_proxy_db_py3k.flag = False - do_proxy_db_py3k(True) - -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db, dbtables, dbutils, dbshelve, \ - hashopen, btopen, rnopen, dbobj -except ImportError: - # For Python 2.3 - from bsddb import db, dbtables, dbutils, dbshelve, \ - hashopen, btopen, rnopen, dbobj - -try: - from bsddb3 import test_support -except ImportError: - if sys.version_info[0] < 3 : - from test import test_support - else : - from test import support as test_support - - -try: - if sys.version_info[0] < 3 : - from threading import Thread, currentThread - del Thread, currentThread - else : - from threading import Thread, current_thread - del Thread, current_thread - have_threads = True -except ImportError: - have_threads = False - -verbose = 0 -if 'verbose' in sys.argv: - verbose = 1 - sys.argv.remove('verbose') - -if 'silent' in sys.argv: # take care of old flag, just in case - verbose = 0 - sys.argv.remove('silent') - - -def print_versions(): - print - print '-=' * 38 - print db.DB_VERSION_STRING - print 'bsddb.db.version(): %s' % (db.version(), ) - if db.version() >= (5, 0) : - print 'bsddb.db.full_version(): %s' %repr(db.full_version()) - print 'bsddb.db.__version__: %s' % db.__version__ - print 'bsddb.db.cvsid: %s' % db.cvsid - - # Workaround for allowing generating an EGGs as a ZIP files. - suffix="__" - print 'py module: %s' % getattr(bsddb, "__file"+suffix) - print 'extension module: %s' % getattr(bsddb, "__file"+suffix) - - print 'python version: %s' % sys.version - print 'My pid: %s' % os.getpid() - print '-=' * 38 - - -def get_new_path(name) : - get_new_path.mutex.acquire() - try : - import os - path=os.path.join(get_new_path.prefix, - name+"_"+str(os.getpid())+"_"+str(get_new_path.num)) - get_new_path.num+=1 - finally : - get_new_path.mutex.release() - return path - -def get_new_environment_path() : - path=get_new_path("environment") - import os - try: - os.makedirs(path,mode=0700) - except os.error: - test_support.rmtree(path) - os.makedirs(path) - return path - -def get_new_database_path() : - path=get_new_path("database") - import os - if os.path.exists(path) : - os.remove(path) - return path - - -# This path can be overriden via "set_test_path_prefix()". -import os, os.path -get_new_path.prefix=os.path.join(os.environ.get("TMPDIR", - os.path.join(os.sep,"tmp")), "z-Berkeley_DB") -get_new_path.num=0 - -def get_test_path_prefix() : - return get_new_path.prefix - -def set_test_path_prefix(path) : - get_new_path.prefix=path - -def remove_test_path_directory() : - test_support.rmtree(get_new_path.prefix) - -if have_threads : - import threading - get_new_path.mutex=threading.Lock() - del threading -else : - class Lock(object) : - def acquire(self) : - pass - def release(self) : - pass - get_new_path.mutex=Lock() - del Lock - - - -class PrintInfoFakeTest(unittest.TestCase): - def testPrintVersions(self): - print_versions() - - -# This little hack is for when this module is run as main and all the -# other modules import it so they will still be able to get the right -# verbose setting. It's confusing but it works. -if sys.version_info[0] < 3 : - import test_all - test_all.verbose = verbose -else : - import sys - print >>sys.stderr, "Work to do!" - - -def suite(module_prefix='', timing_check=None): - test_modules = [ - 'test_associate', - 'test_basics', - 'test_dbenv', - 'test_db', - 'test_compare', - 'test_compat', - 'test_cursor_pget_bug', - 'test_dbobj', - 'test_dbshelve', - 'test_dbtables', - 'test_distributed_transactions', - 'test_early_close', - 'test_fileid', - 'test_get_none', - 'test_join', - 'test_lock', - 'test_misc', - 'test_pickle', - 'test_queue', - 'test_recno', - 'test_replication', - 'test_sequence', - 'test_thread', - ] - - alltests = unittest.TestSuite() - for name in test_modules: - #module = __import__(name) - # Do it this way so that suite may be called externally via - # python's Lib/test/test_bsddb3. - module = __import__(module_prefix+name, globals(), locals(), name) - - alltests.addTest(module.test_suite()) - if timing_check: - alltests.addTest(unittest.makeSuite(timing_check)) - return alltests - - -def test_suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(PrintInfoFakeTest)) - return suite - - -if __name__ == '__main__': - print_versions() - unittest.main(defaultTest='suite') |