diff options
Diffstat (limited to 'lib/python2.7/test/lock_tests.py')
-rw-r--r-- | lib/python2.7/test/lock_tests.py | 546 |
1 files changed, 0 insertions, 546 deletions
diff --git a/lib/python2.7/test/lock_tests.py b/lib/python2.7/test/lock_tests.py deleted file mode 100644 index 966f9bd..0000000 --- a/lib/python2.7/test/lock_tests.py +++ /dev/null @@ -1,546 +0,0 @@ -""" -Various tests for synchronization primitives. -""" - -import sys -import time -from thread import start_new_thread, get_ident -import threading -import unittest - -from test import test_support as support - - -def _wait(): - # A crude wait/yield function not relying on synchronization primitives. - time.sleep(0.01) - -class Bunch(object): - """ - A bunch of threads. - """ - def __init__(self, f, n, wait_before_exit=False): - """ - Construct a bunch of `n` threads running the same function `f`. - If `wait_before_exit` is True, the threads won't terminate until - do_finish() is called. - """ - self.f = f - self.n = n - self.started = [] - self.finished = [] - self._can_exit = not wait_before_exit - def task(): - tid = get_ident() - self.started.append(tid) - try: - f() - finally: - self.finished.append(tid) - while not self._can_exit: - _wait() - for i in range(n): - start_new_thread(task, ()) - - def wait_for_started(self): - while len(self.started) < self.n: - _wait() - - def wait_for_finished(self): - while len(self.finished) < self.n: - _wait() - - def do_finish(self): - self._can_exit = True - - -class BaseTestCase(unittest.TestCase): - def setUp(self): - self._threads = support.threading_setup() - - def tearDown(self): - support.threading_cleanup(*self._threads) - support.reap_children() - - -class BaseLockTests(BaseTestCase): - """ - Tests for both recursive and non-recursive locks. - """ - - def test_constructor(self): - lock = self.locktype() - del lock - - def test_acquire_destroy(self): - lock = self.locktype() - lock.acquire() - del lock - - def test_acquire_release(self): - lock = self.locktype() - lock.acquire() - lock.release() - del lock - - def test_try_acquire(self): - lock = self.locktype() - self.assertTrue(lock.acquire(False)) - lock.release() - - def test_try_acquire_contended(self): - lock = self.locktype() - lock.acquire() - result = [] - def f(): - result.append(lock.acquire(False)) - Bunch(f, 1).wait_for_finished() - self.assertFalse(result[0]) - lock.release() - - def test_acquire_contended(self): - lock = self.locktype() - lock.acquire() - N = 5 - def f(): - lock.acquire() - lock.release() - - b = Bunch(f, N) - b.wait_for_started() - _wait() - self.assertEqual(len(b.finished), 0) - lock.release() - b.wait_for_finished() - self.assertEqual(len(b.finished), N) - - def test_with(self): - lock = self.locktype() - def f(): - lock.acquire() - lock.release() - def _with(err=None): - with lock: - if err is not None: - raise err - _with() - # Check the lock is unacquired - Bunch(f, 1).wait_for_finished() - self.assertRaises(TypeError, _with, TypeError) - # Check the lock is unacquired - Bunch(f, 1).wait_for_finished() - - def test_thread_leak(self): - # The lock shouldn't leak a Thread instance when used from a foreign - # (non-threading) thread. - lock = self.locktype() - def f(): - lock.acquire() - lock.release() - n = len(threading.enumerate()) - # We run many threads in the hope that existing threads ids won't - # be recycled. - Bunch(f, 15).wait_for_finished() - self.assertEqual(n, len(threading.enumerate())) - - -class LockTests(BaseLockTests): - """ - Tests for non-recursive, weak locks - (which can be acquired and released from different threads). - """ - def test_reacquire(self): - # Lock needs to be released before re-acquiring. - lock = self.locktype() - phase = [] - def f(): - lock.acquire() - phase.append(None) - lock.acquire() - phase.append(None) - start_new_thread(f, ()) - while len(phase) == 0: - _wait() - _wait() - self.assertEqual(len(phase), 1) - lock.release() - while len(phase) == 1: - _wait() - self.assertEqual(len(phase), 2) - - def test_different_thread(self): - # Lock can be released from a different thread. - lock = self.locktype() - lock.acquire() - def f(): - lock.release() - b = Bunch(f, 1) - b.wait_for_finished() - lock.acquire() - lock.release() - - -class RLockTests(BaseLockTests): - """ - Tests for recursive locks. - """ - def test_reacquire(self): - lock = self.locktype() - lock.acquire() - lock.acquire() - lock.release() - lock.acquire() - lock.release() - lock.release() - - def test_release_unacquired(self): - # Cannot release an unacquired lock - lock = self.locktype() - self.assertRaises(RuntimeError, lock.release) - lock.acquire() - lock.acquire() - lock.release() - lock.acquire() - lock.release() - lock.release() - self.assertRaises(RuntimeError, lock.release) - - def test_different_thread(self): - # Cannot release from a different thread - lock = self.locktype() - def f(): - lock.acquire() - b = Bunch(f, 1, True) - try: - self.assertRaises(RuntimeError, lock.release) - finally: - b.do_finish() - - def test__is_owned(self): - lock = self.locktype() - self.assertFalse(lock._is_owned()) - lock.acquire() - self.assertTrue(lock._is_owned()) - lock.acquire() - self.assertTrue(lock._is_owned()) - result = [] - def f(): - result.append(lock._is_owned()) - Bunch(f, 1).wait_for_finished() - self.assertFalse(result[0]) - lock.release() - self.assertTrue(lock._is_owned()) - lock.release() - self.assertFalse(lock._is_owned()) - - -class EventTests(BaseTestCase): - """ - Tests for Event objects. - """ - - def test_is_set(self): - evt = self.eventtype() - self.assertFalse(evt.is_set()) - evt.set() - self.assertTrue(evt.is_set()) - evt.set() - self.assertTrue(evt.is_set()) - evt.clear() - self.assertFalse(evt.is_set()) - evt.clear() - self.assertFalse(evt.is_set()) - - def _check_notify(self, evt): - # All threads get notified - N = 5 - results1 = [] - results2 = [] - def f(): - results1.append(evt.wait()) - results2.append(evt.wait()) - b = Bunch(f, N) - b.wait_for_started() - _wait() - self.assertEqual(len(results1), 0) - evt.set() - b.wait_for_finished() - self.assertEqual(results1, [True] * N) - self.assertEqual(results2, [True] * N) - - def test_notify(self): - evt = self.eventtype() - self._check_notify(evt) - # Another time, after an explicit clear() - evt.set() - evt.clear() - self._check_notify(evt) - - def test_timeout(self): - evt = self.eventtype() - results1 = [] - results2 = [] - N = 5 - def f(): - results1.append(evt.wait(0.0)) - t1 = time.time() - r = evt.wait(0.2) - t2 = time.time() - results2.append((r, t2 - t1)) - Bunch(f, N).wait_for_finished() - self.assertEqual(results1, [False] * N) - for r, dt in results2: - self.assertFalse(r) - self.assertTrue(dt >= 0.2, dt) - # The event is set - results1 = [] - results2 = [] - evt.set() - Bunch(f, N).wait_for_finished() - self.assertEqual(results1, [True] * N) - for r, dt in results2: - self.assertTrue(r) - - -class ConditionTests(BaseTestCase): - """ - Tests for condition variables. - """ - - def test_acquire(self): - cond = self.condtype() - # Be default we have an RLock: the condition can be acquired multiple - # times. - cond.acquire() - cond.acquire() - cond.release() - cond.release() - lock = threading.Lock() - cond = self.condtype(lock) - cond.acquire() - self.assertFalse(lock.acquire(False)) - cond.release() - self.assertTrue(lock.acquire(False)) - self.assertFalse(cond.acquire(False)) - lock.release() - with cond: - self.assertFalse(lock.acquire(False)) - - def test_unacquired_wait(self): - cond = self.condtype() - self.assertRaises(RuntimeError, cond.wait) - - def test_unacquired_notify(self): - cond = self.condtype() - self.assertRaises(RuntimeError, cond.notify) - - def _check_notify(self, cond): - N = 5 - results1 = [] - results2 = [] - phase_num = 0 - def f(): - cond.acquire() - cond.wait() - cond.release() - results1.append(phase_num) - cond.acquire() - cond.wait() - cond.release() - results2.append(phase_num) - b = Bunch(f, N) - b.wait_for_started() - _wait() - self.assertEqual(results1, []) - # Notify 3 threads at first - cond.acquire() - cond.notify(3) - _wait() - phase_num = 1 - cond.release() - while len(results1) < 3: - _wait() - self.assertEqual(results1, [1] * 3) - self.assertEqual(results2, []) - # Notify 5 threads: they might be in their first or second wait - cond.acquire() - cond.notify(5) - _wait() - phase_num = 2 - cond.release() - while len(results1) + len(results2) < 8: - _wait() - self.assertEqual(results1, [1] * 3 + [2] * 2) - self.assertEqual(results2, [2] * 3) - # Notify all threads: they are all in their second wait - cond.acquire() - cond.notify_all() - _wait() - phase_num = 3 - cond.release() - while len(results2) < 5: - _wait() - self.assertEqual(results1, [1] * 3 + [2] * 2) - self.assertEqual(results2, [2] * 3 + [3] * 2) - b.wait_for_finished() - - def test_notify(self): - cond = self.condtype() - self._check_notify(cond) - # A second time, to check internal state is still ok. - self._check_notify(cond) - - def test_timeout(self): - cond = self.condtype() - results = [] - N = 5 - def f(): - cond.acquire() - t1 = time.time() - cond.wait(0.2) - t2 = time.time() - cond.release() - results.append(t2 - t1) - Bunch(f, N).wait_for_finished() - self.assertEqual(len(results), 5) - for dt in results: - self.assertTrue(dt >= 0.2, dt) - - -class BaseSemaphoreTests(BaseTestCase): - """ - Common tests for {bounded, unbounded} semaphore objects. - """ - - def test_constructor(self): - self.assertRaises(ValueError, self.semtype, value = -1) - self.assertRaises(ValueError, self.semtype, value = -sys.maxint) - - def test_acquire(self): - sem = self.semtype(1) - sem.acquire() - sem.release() - sem = self.semtype(2) - sem.acquire() - sem.acquire() - sem.release() - sem.release() - - def test_acquire_destroy(self): - sem = self.semtype() - sem.acquire() - del sem - - def test_acquire_contended(self): - sem = self.semtype(7) - sem.acquire() - N = 10 - results1 = [] - results2 = [] - phase_num = 0 - def f(): - sem.acquire() - results1.append(phase_num) - sem.acquire() - results2.append(phase_num) - b = Bunch(f, 10) - b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() - self.assertEqual(results1 + results2, [0] * 6) - phase_num = 1 - for i in range(7): - sem.release() - while len(results1) + len(results2) < 13: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) - phase_num = 2 - for i in range(6): - sem.release() - while len(results1) + len(results2) < 19: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) - # The semaphore is still locked - self.assertFalse(sem.acquire(False)) - # Final release, to let the last thread finish - sem.release() - b.wait_for_finished() - - def test_try_acquire(self): - sem = self.semtype(2) - self.assertTrue(sem.acquire(False)) - self.assertTrue(sem.acquire(False)) - self.assertFalse(sem.acquire(False)) - sem.release() - self.assertTrue(sem.acquire(False)) - - def test_try_acquire_contended(self): - sem = self.semtype(4) - sem.acquire() - results = [] - def f(): - results.append(sem.acquire(False)) - results.append(sem.acquire(False)) - Bunch(f, 5).wait_for_finished() - # There can be a thread switch between acquiring the semaphore and - # appending the result, therefore results will not necessarily be - # ordered. - self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) - - def test_default_value(self): - # The default initial value is 1. - sem = self.semtype() - sem.acquire() - def f(): - sem.acquire() - sem.release() - b = Bunch(f, 1) - b.wait_for_started() - _wait() - self.assertFalse(b.finished) - sem.release() - b.wait_for_finished() - - def test_with(self): - sem = self.semtype(2) - def _with(err=None): - with sem: - self.assertTrue(sem.acquire(False)) - sem.release() - with sem: - self.assertFalse(sem.acquire(False)) - if err: - raise err - _with() - self.assertTrue(sem.acquire(False)) - sem.release() - self.assertRaises(TypeError, _with, TypeError) - self.assertTrue(sem.acquire(False)) - sem.release() - -class SemaphoreTests(BaseSemaphoreTests): - """ - Tests for unbounded semaphores. - """ - - def test_release_unacquired(self): - # Unbounded releases are allowed and increment the semaphore's value - sem = self.semtype(1) - sem.release() - sem.acquire() - sem.acquire() - sem.release() - - -class BoundedSemaphoreTests(BaseSemaphoreTests): - """ - Tests for bounded semaphores. - """ - - def test_release_unacquired(self): - # Cannot go past the initial value - sem = self.semtype() - self.assertRaises(ValueError, sem.release) - sem.acquire() - sem.release() - self.assertRaises(ValueError, sem.release) |