summaryrefslogtreecommitdiff
path: root/lib/python2.7/bsddb/test/test_all.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/bsddb/test/test_all.py')
-rw-r--r--lib/python2.7/bsddb/test/test_all.py623
1 files changed, 0 insertions, 623 deletions
diff --git a/lib/python2.7/bsddb/test/test_all.py b/lib/python2.7/bsddb/test/test_all.py
deleted file mode 100644
index caef1ac..0000000
--- a/lib/python2.7/bsddb/test/test_all.py
+++ /dev/null
@@ -1,623 +0,0 @@
-"""Run all test cases.
-"""
-
-import sys
-import os
-import unittest
-try:
- # For Pythons w/distutils pybsddb
- import bsddb3 as bsddb
-except ImportError:
- # For Python 2.3
- import bsddb
-
-
-if sys.version_info[0] >= 3 :
- charset = "iso8859-1" # Full 8 bit
-
- class logcursor_py3k(object) :
- def __init__(self, env) :
- self._logcursor = env.log_cursor()
-
- def __getattr__(self, v) :
- return getattr(self._logcursor, v)
-
- def __next__(self) :
- v = getattr(self._logcursor, "next")()
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- next = __next__
-
- def first(self) :
- v = self._logcursor.first()
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- def last(self) :
- v = self._logcursor.last()
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- def prev(self) :
- v = self._logcursor.prev()
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- def current(self) :
- v = self._logcursor.current()
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- def set(self, lsn) :
- v = self._logcursor.set(lsn)
- if v is not None :
- v = (v[0], v[1].decode(charset))
- return v
-
- class cursor_py3k(object) :
- def __init__(self, db, *args, **kwargs) :
- self._dbcursor = db.cursor(*args, **kwargs)
-
- def __getattr__(self, v) :
- return getattr(self._dbcursor, v)
-
- def _fix(self, v) :
- if v is None : return None
- key, value = v
- if isinstance(key, bytes) :
- key = key.decode(charset)
- return (key, value.decode(charset))
-
- def __next__(self) :
- v = getattr(self._dbcursor, "next")()
- return self._fix(v)
-
- next = __next__
-
- def previous(self) :
- v = self._dbcursor.previous()
- return self._fix(v)
-
- def last(self) :
- v = self._dbcursor.last()
- return self._fix(v)
-
- def set(self, k) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- v = self._dbcursor.set(k)
- return self._fix(v)
-
- def set_recno(self, num) :
- v = self._dbcursor.set_recno(num)
- return self._fix(v)
-
- def set_range(self, k, dlen=-1, doff=-1) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
- return self._fix(v)
-
- def dup(self, flags=0) :
- cursor = self._dbcursor.dup(flags)
- return dup_cursor_py3k(cursor)
-
- def next_dup(self) :
- v = self._dbcursor.next_dup()
- return self._fix(v)
-
- def next_nodup(self) :
- v = self._dbcursor.next_nodup()
- return self._fix(v)
-
- def put(self, key, data, flags=0, dlen=-1, doff=-1) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- if isinstance(data, str) :
- value = bytes(data, charset)
- return self._dbcursor.put(key, data, flags=flags, dlen=dlen,
- doff=doff)
-
- def current(self, flags=0, dlen=-1, doff=-1) :
- v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
- return self._fix(v)
-
- def first(self) :
- v = self._dbcursor.first()
- return self._fix(v)
-
- def pget(self, key=None, data=None, flags=0) :
- # Incorrect because key can be a bare number,
- # but enough to pass testsuite
- if isinstance(key, int) and (data is None) and (flags == 0) :
- flags = key
- key = None
- if isinstance(key, str) :
- key = bytes(key, charset)
- if isinstance(data, int) and (flags==0) :
- flags = data
- data = None
- if isinstance(data, str) :
- data = bytes(data, charset)
- v=self._dbcursor.pget(key=key, data=data, flags=flags)
- if v is not None :
- v1, v2, v3 = v
- if isinstance(v1, bytes) :
- v1 = v1.decode(charset)
- if isinstance(v2, bytes) :
- v2 = v2.decode(charset)
-
- v = (v1, v2, v3.decode(charset))
-
- return v
-
- def join_item(self) :
- v = self._dbcursor.join_item()
- if v is not None :
- v = v.decode(charset)
- return v
-
- def get(self, *args, **kwargs) :
- l = len(args)
- if l == 2 :
- k, f = args
- if isinstance(k, str) :
- k = bytes(k, "iso8859-1")
- args = (k, f)
- elif l == 3 :
- k, d, f = args
- if isinstance(k, str) :
- k = bytes(k, charset)
- if isinstance(d, str) :
- d = bytes(d, charset)
- args =(k, d, f)
-
- v = self._dbcursor.get(*args, **kwargs)
- if v is not None :
- k, v = v
- if isinstance(k, bytes) :
- k = k.decode(charset)
- v = (k, v.decode(charset))
- return v
-
- def get_both(self, key, value) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- if isinstance(value, str) :
- value = bytes(value, charset)
- v=self._dbcursor.get_both(key, value)
- return self._fix(v)
-
- class dup_cursor_py3k(cursor_py3k) :
- def __init__(self, dbcursor) :
- self._dbcursor = dbcursor
-
- class DB_py3k(object) :
- def __init__(self, *args, **kwargs) :
- args2=[]
- for i in args :
- if isinstance(i, DBEnv_py3k) :
- i = i._dbenv
- args2.append(i)
- args = tuple(args2)
- for k, v in kwargs.items() :
- if isinstance(v, DBEnv_py3k) :
- kwargs[k] = v._dbenv
-
- self._db = bsddb._db.DB_orig(*args, **kwargs)
-
- def __contains__(self, k) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- return getattr(self._db, "has_key")(k)
-
- def __getitem__(self, k) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- v = self._db[k]
- if v is not None :
- v = v.decode(charset)
- return v
-
- def __setitem__(self, k, v) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- if isinstance(v, str) :
- v = bytes(v, charset)
- self._db[k] = v
-
- def __delitem__(self, k) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- del self._db[k]
-
- def __getattr__(self, v) :
- return getattr(self._db, v)
-
- def __len__(self) :
- return len(self._db)
-
- def has_key(self, k, txn=None) :
- if isinstance(k, str) :
- k = bytes(k, charset)
- return self._db.has_key(k, txn=txn)
-
- def set_re_delim(self, c) :
- if isinstance(c, str) : # We can use a numeric value byte too
- c = bytes(c, charset)
- return self._db.set_re_delim(c)
-
- def set_re_pad(self, c) :
- if isinstance(c, str) : # We can use a numeric value byte too
- c = bytes(c, charset)
- return self._db.set_re_pad(c)
-
- def get_re_source(self) :
- source = self._db.get_re_source()
- return source.decode(charset)
-
- def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- if isinstance(data, str) :
- value = bytes(data, charset)
- return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen,
- doff=doff)
-
- def append(self, value, txn=None) :
- if isinstance(value, str) :
- value = bytes(value, charset)
- return self._db.append(value, txn=txn)
-
- def get_size(self, key) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- return self._db.get_size(key)
-
- def exists(self, key, *args, **kwargs) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- return self._db.exists(key, *args, **kwargs)
-
- def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- if default != "MagicCookie" : # Magic for 'test_get_none.py'
- v=self._db.get(key, default=default, txn=txn, flags=flags,
- dlen=dlen, doff=doff)
- else :
- v=self._db.get(key, txn=txn, flags=flags,
- dlen=dlen, doff=doff)
- if (v is not None) and isinstance(v, bytes) :
- v = v.decode(charset)
- return v
-
- def pget(self, key, txn=None) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- v=self._db.pget(key, txn=txn)
- if v is not None :
- v1, v2 = v
- if isinstance(v1, bytes) :
- v1 = v1.decode(charset)
-
- v = (v1, v2.decode(charset))
- return v
-
- def get_both(self, key, value, txn=None, flags=0) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- if isinstance(value, str) :
- value = bytes(value, charset)
- v=self._db.get_both(key, value, txn=txn, flags=flags)
- if v is not None :
- v = v.decode(charset)
- return v
-
- def delete(self, key, txn=None) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- return self._db.delete(key, txn=txn)
-
- def keys(self) :
- k = self._db.keys()
- if len(k) and isinstance(k[0], bytes) :
- return [i.decode(charset) for i in self._db.keys()]
- else :
- return k
-
- def items(self) :
- data = self._db.items()
- if not len(data) : return data
- data2 = []
- for k, v in data :
- if isinstance(k, bytes) :
- k = k.decode(charset)
- data2.append((k, v.decode(charset)))
- return data2
-
- def associate(self, secondarydb, callback, flags=0, txn=None) :
- class associate_callback(object) :
- def __init__(self, callback) :
- self._callback = callback
-
- def callback(self, key, data) :
- if isinstance(key, str) :
- key = key.decode(charset)
- data = data.decode(charset)
- key = self._callback(key, data)
- if (key != bsddb._db.DB_DONOTINDEX) :
- if isinstance(key, str) :
- key = bytes(key, charset)
- elif isinstance(key, list) :
- key2 = []
- for i in key :
- if isinstance(i, str) :
- i = bytes(i, charset)
- key2.append(i)
- key = key2
- return key
-
- return self._db.associate(secondarydb._db,
- associate_callback(callback).callback, flags=flags,
- txn=txn)
-
- def cursor(self, txn=None, flags=0) :
- return cursor_py3k(self._db, txn=txn, flags=flags)
-
- def join(self, cursor_list) :
- cursor_list = [i._dbcursor for i in cursor_list]
- return dup_cursor_py3k(self._db.join(cursor_list))
-
- class DBEnv_py3k(object) :
- def __init__(self, *args, **kwargs) :
- self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)
-
- def __getattr__(self, v) :
- return getattr(self._dbenv, v)
-
- def log_cursor(self, flags=0) :
- return logcursor_py3k(self._dbenv)
-
- def get_lg_dir(self) :
- return self._dbenv.get_lg_dir().decode(charset)
-
- def get_tmp_dir(self) :
- return self._dbenv.get_tmp_dir().decode(charset)
-
- def get_data_dirs(self) :
- return tuple(
- (i.decode(charset) for i in self._dbenv.get_data_dirs()))
-
- class DBSequence_py3k(object) :
- def __init__(self, db, *args, **kwargs) :
- self._db=db
- self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)
-
- def __getattr__(self, v) :
- return getattr(self._dbsequence, v)
-
- def open(self, key, *args, **kwargs) :
- return self._dbsequence.open(bytes(key, charset), *args, **kwargs)
-
- def get_key(self) :
- return self._dbsequence.get_key().decode(charset)
-
- def get_dbp(self) :
- return self._db
-
- import string
- string.letters=[chr(i) for i in xrange(65,91)]
-
- bsddb._db.DBEnv_orig = bsddb._db.DBEnv
- bsddb._db.DB_orig = bsddb._db.DB
- if bsddb.db.version() <= (4, 3) :
- bsddb._db.DBSequence_orig = None
- else :
- bsddb._db.DBSequence_orig = bsddb._db.DBSequence
-
- def do_proxy_db_py3k(flag) :
- flag2 = do_proxy_db_py3k.flag
- do_proxy_db_py3k.flag = flag
- if flag :
- bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
- bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
- bsddb._db.DBSequence = DBSequence_py3k
- else :
- bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
- bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
- bsddb._db.DBSequence = bsddb._db.DBSequence_orig
- return flag2
-
- do_proxy_db_py3k.flag = False
- do_proxy_db_py3k(True)
-
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db, dbtables, dbutils, dbshelve, \
- hashopen, btopen, rnopen, dbobj
-except ImportError:
- # For Python 2.3
- from bsddb import db, dbtables, dbutils, dbshelve, \
- hashopen, btopen, rnopen, dbobj
-
-try:
- from bsddb3 import test_support
-except ImportError:
- if sys.version_info[0] < 3 :
- from test import test_support
- else :
- from test import support as test_support
-
-
-try:
- if sys.version_info[0] < 3 :
- from threading import Thread, currentThread
- del Thread, currentThread
- else :
- from threading import Thread, current_thread
- del Thread, current_thread
- have_threads = True
-except ImportError:
- have_threads = False
-
-verbose = 0
-if 'verbose' in sys.argv:
- verbose = 1
- sys.argv.remove('verbose')
-
-if 'silent' in sys.argv: # take care of old flag, just in case
- verbose = 0
- sys.argv.remove('silent')
-
-
-def print_versions():
- print
- print '-=' * 38
- print db.DB_VERSION_STRING
- print 'bsddb.db.version(): %s' % (db.version(), )
- if db.version() >= (5, 0) :
- print 'bsddb.db.full_version(): %s' %repr(db.full_version())
- print 'bsddb.db.__version__: %s' % db.__version__
- print 'bsddb.db.cvsid: %s' % db.cvsid
-
- # Workaround for allowing generating an EGGs as a ZIP files.
- suffix="__"
- print 'py module: %s' % getattr(bsddb, "__file"+suffix)
- print 'extension module: %s' % getattr(bsddb, "__file"+suffix)
-
- print 'python version: %s' % sys.version
- print 'My pid: %s' % os.getpid()
- print '-=' * 38
-
-
-def get_new_path(name) :
- get_new_path.mutex.acquire()
- try :
- import os
- path=os.path.join(get_new_path.prefix,
- name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
- get_new_path.num+=1
- finally :
- get_new_path.mutex.release()
- return path
-
-def get_new_environment_path() :
- path=get_new_path("environment")
- import os
- try:
- os.makedirs(path,mode=0700)
- except os.error:
- test_support.rmtree(path)
- os.makedirs(path)
- return path
-
-def get_new_database_path() :
- path=get_new_path("database")
- import os
- if os.path.exists(path) :
- os.remove(path)
- return path
-
-
-# This path can be overriden via "set_test_path_prefix()".
-import os, os.path
-get_new_path.prefix=os.path.join(os.environ.get("TMPDIR",
- os.path.join(os.sep,"tmp")), "z-Berkeley_DB")
-get_new_path.num=0
-
-def get_test_path_prefix() :
- return get_new_path.prefix
-
-def set_test_path_prefix(path) :
- get_new_path.prefix=path
-
-def remove_test_path_directory() :
- test_support.rmtree(get_new_path.prefix)
-
-if have_threads :
- import threading
- get_new_path.mutex=threading.Lock()
- del threading
-else :
- class Lock(object) :
- def acquire(self) :
- pass
- def release(self) :
- pass
- get_new_path.mutex=Lock()
- del Lock
-
-
-
-class PrintInfoFakeTest(unittest.TestCase):
- def testPrintVersions(self):
- print_versions()
-
-
-# This little hack is for when this module is run as main and all the
-# other modules import it so they will still be able to get the right
-# verbose setting. It's confusing but it works.
-if sys.version_info[0] < 3 :
- import test_all
- test_all.verbose = verbose
-else :
- import sys
- print >>sys.stderr, "Work to do!"
-
-
-def suite(module_prefix='', timing_check=None):
- test_modules = [
- 'test_associate',
- 'test_basics',
- 'test_dbenv',
- 'test_db',
- 'test_compare',
- 'test_compat',
- 'test_cursor_pget_bug',
- 'test_dbobj',
- 'test_dbshelve',
- 'test_dbtables',
- 'test_distributed_transactions',
- 'test_early_close',
- 'test_fileid',
- 'test_get_none',
- 'test_join',
- 'test_lock',
- 'test_misc',
- 'test_pickle',
- 'test_queue',
- 'test_recno',
- 'test_replication',
- 'test_sequence',
- 'test_thread',
- ]
-
- alltests = unittest.TestSuite()
- for name in test_modules:
- #module = __import__(name)
- # Do it this way so that suite may be called externally via
- # python's Lib/test/test_bsddb3.
- module = __import__(module_prefix+name, globals(), locals(), name)
-
- alltests.addTest(module.test_suite())
- if timing_check:
- alltests.addTest(unittest.makeSuite(timing_check))
- return alltests
-
-
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
- return suite
-
-
-if __name__ == '__main__':
- print_versions()
- unittest.main(defaultTest='suite')