aboutsummaryrefslogtreecommitdiff
path: root/catapult/common/py_utils/py_utils/lock_unittest.py
blob: 2ba288bd14a77b5e9fc008c673a83a4b8b04a593 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import multiprocessing
import os
import tempfile
import time
import unittest

from six.moves import range  # pylint: disable=redefined-builtin

from py_utils import lock


def _AppendTextToFile(file_name):
  with open(file_name, 'a') as f:
    lock.AcquireFileLock(f, lock.LOCK_EX)
    # Sleep 100 ms to increase the chance of another process trying to acquire
    # the lock of file as the same time.
    time.sleep(0.1)
    f.write('Start')
    for _ in range(10000):
      f.write('*')
    f.write('End')


def _ReadFileWithSharedLockBlockingThenWrite(read_file, write_file):
  with open(read_file, 'r') as f:
    lock.AcquireFileLock(f, lock.LOCK_SH)
    content = f.read()
    with open(write_file, 'a') as f2:
      lock.AcquireFileLock(f2, lock.LOCK_EX)
      f2.write(content)


def _ReadFileWithExclusiveLockNonBlocking(target_file, status_file):
  with open(target_file, 'r') as f:
    try:
      lock.AcquireFileLock(f, lock.LOCK_EX | lock.LOCK_NB)
      with open(status_file, 'w') as f2:
        f2.write('LockException was not raised')
    except lock.LockException:
      with open(status_file, 'w') as f2:
        f2.write('LockException raised')


class FileLockTest(unittest.TestCase):
  def setUp(self):
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.close()
    self.temp_file_path = tf.name

  def tearDown(self):
    os.remove(self.temp_file_path)

  def testExclusiveLock(self):
    processess = []
    for _ in range(10):
      p = multiprocessing.Process(
          target=_AppendTextToFile, args=(self.temp_file_path,))
      p.start()
      processess.append(p)
    for p in processess:
      p.join()

    # If the file lock works as expected, there should be 10 atomic writes of
    # 'Start***...***End' to the file in some order, which lead to the final
    # file content as below.
    expected_file_content = ''.join((['Start'] + ['*']*10000 + ['End']) * 10)
    with open(self.temp_file_path, 'r') as f:
      # Use assertTrue instead of assertEquals since the strings are big, hence
      # assertEquals's assertion failure will contain huge strings.
      self.assertTrue(expected_file_content == f.read())

  def testSharedLock(self):
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.close()
    temp_write_file = tf.name
    try:
      with open(self.temp_file_path, 'w') as f:
        f.write('0123456789')
      with open(self.temp_file_path, 'r') as f:
        # First, acquire a shared lock on temp_file_path
        lock.AcquireFileLock(f, lock.LOCK_SH)

        processess = []
        # Create 10 processes that also try to acquire shared lock from
        # temp_file_path then append temp_file_path's content to temp_write_file
        for _ in range(10):
          p = multiprocessing.Process(
              target=_ReadFileWithSharedLockBlockingThenWrite,
              args=(self.temp_file_path, temp_write_file))
          p.start()
          processess.append(p)
        for p in processess:
          p.join()

      # temp_write_file should contains 10 copy of temp_file_path's content.
      with open(temp_write_file, 'r') as f:
        self.assertEquals('0123456789'*10, f.read())
    finally:
      os.remove(temp_write_file)

  def testNonBlockingLockAcquiring(self):
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.close()
    temp_status_file = tf.name
    try:
      with open(self.temp_file_path, 'w') as f:
        lock.AcquireFileLock(f, lock.LOCK_EX)
        p = multiprocessing.Process(
            target=_ReadFileWithExclusiveLockNonBlocking,
            args=(self.temp_file_path, temp_status_file))
        p.start()
        p.join()
      with open(temp_status_file, 'r') as f:
        self.assertEquals('LockException raised', f.read())
    finally:
      os.remove(temp_status_file)

  def testUnlockBeforeClosingFile(self):
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.close()
    temp_status_file = tf.name
    try:
      with open(self.temp_file_path, 'r') as f:
        lock.AcquireFileLock(f, lock.LOCK_SH)
        lock.ReleaseFileLock(f)
        p = multiprocessing.Process(
            target=_ReadFileWithExclusiveLockNonBlocking,
            args=(self.temp_file_path, temp_status_file))
        p.start()
        p.join()
      with open(temp_status_file, 'r') as f:
        self.assertEquals('LockException was not raised', f.read())
    finally:
      os.remove(temp_status_file)

  def testContextualLock(self):
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.close()
    temp_status_file = tf.name
    try:
      with open(self.temp_file_path, 'r') as f:
        with lock.FileLock(f, lock.LOCK_EX):
          # Within this block, accessing self.temp_file_path from another
          # process should raise exception.
          p = multiprocessing.Process(
              target=_ReadFileWithExclusiveLockNonBlocking,
              args=(self.temp_file_path, temp_status_file))
          p.start()
          p.join()
          with open(temp_status_file, 'r') as f:
            self.assertEquals('LockException raised', f.read())

        # Accessing self.temp_file_path here should not raise exception.
        p = multiprocessing.Process(
            target=_ReadFileWithExclusiveLockNonBlocking,
            args=(self.temp_file_path, temp_status_file))
        p.start()
        p.join()
      with open(temp_status_file, 'r') as f:
        self.assertEquals('LockException was not raised', f.read())
    finally:
      os.remove(temp_status_file)