aboutsummaryrefslogtreecommitdiff
path: root/catapult/devil/devil/utils/reraiser_thread_unittest.py
blob: eb37456c62787239a5ca48251b16bd182f58d4b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for reraiser_thread.py."""

import threading
import unittest

from devil.utils import reraiser_thread
from devil.utils import watchdog_timer


class TestException(Exception):
  pass


class TestReraiserThread(unittest.TestCase):
  """Tests for reraiser_thread.ReraiserThread."""

  def testNominal(self):
    result = [None, None]

    def f(a, b=None):
      result[0] = a
      result[1] = b

    thread = reraiser_thread.ReraiserThread(f, [1], {'b': 2})
    thread.start()
    thread.join()
    self.assertEqual(result[0], 1)
    self.assertEqual(result[1], 2)

  def testRaise(self):
    def f():
      raise TestException

    thread = reraiser_thread.ReraiserThread(f)
    thread.start()
    thread.join()
    with self.assertRaises(TestException):
      thread.ReraiseIfException()


class TestReraiserThreadGroup(unittest.TestCase):
  """Tests for reraiser_thread.ReraiserThreadGroup."""

  def testInit(self):
    ran = [False] * 5

    def f(i):
      ran[i] = True

    group = reraiser_thread.ReraiserThreadGroup(
        [reraiser_thread.ReraiserThread(f, args=[i]) for i in range(5)])
    group.StartAll()
    group.JoinAll()
    for v in ran:
      self.assertTrue(v)

  def testAdd(self):
    ran = [False] * 5

    def f(i):
      ran[i] = True

    group = reraiser_thread.ReraiserThreadGroup()
    for i in xrange(5):
      group.Add(reraiser_thread.ReraiserThread(f, args=[i]))
    group.StartAll()
    group.JoinAll()
    for v in ran:
      self.assertTrue(v)

  def testJoinRaise(self):
    def f():
      raise TestException

    group = reraiser_thread.ReraiserThreadGroup(
        [reraiser_thread.ReraiserThread(f) for _ in xrange(5)])
    group.StartAll()
    with self.assertRaises(TestException):
      group.JoinAll()

  def testJoinTimeout(self):
    def f():
      pass

    event = threading.Event()

    def g():
      event.wait()

    group = reraiser_thread.ReraiserThreadGroup(
        [reraiser_thread.ReraiserThread(g),
         reraiser_thread.ReraiserThread(f)])
    group.StartAll()
    with self.assertRaises(reraiser_thread.TimeoutError):
      group.JoinAll(watchdog_timer.WatchdogTimer(0.01))
    event.set()


class TestRunAsync(unittest.TestCase):
  """Tests for reraiser_thread.RunAsync."""

  def testNoArgs(self):
    results = reraiser_thread.RunAsync([])
    self.assertEqual([], results)

  def testOneArg(self):
    results = reraiser_thread.RunAsync([lambda: 1])
    self.assertEqual([1], results)

  def testTwoArgs(self):
    a, b = reraiser_thread.RunAsync((lambda: 1, lambda: 2))
    self.assertEqual(1, a)
    self.assertEqual(2, b)


if __name__ == '__main__':
  unittest.main()