aboutsummaryrefslogtreecommitdiff
path: root/catapult/devil/devil/android/device_denylist.py
diff options
context:
space:
mode:
Diffstat (limited to 'catapult/devil/devil/android/device_denylist.py')
-rw-r--r--catapult/devil/devil/android/device_denylist.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/catapult/devil/devil/android/device_denylist.py b/catapult/devil/devil/android/device_denylist.py
new file mode 100644
index 00000000..88b5969c
--- /dev/null
+++ b/catapult/devil/devil/android/device_denylist.py
@@ -0,0 +1,79 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import json
+import logging
+import os
+import threading
+import time
+
+logger = logging.getLogger(__name__)
+
+
+class Denylist(object):
+ def __init__(self, path):
+ self._denylist_lock = threading.RLock()
+ self._path = path
+
+ def Read(self):
+ """Reads the denylist from the denylist file.
+
+ Returns:
+ A dict containing bad devices.
+ """
+ with self._denylist_lock:
+ denylist = dict()
+ if not os.path.exists(self._path):
+ return denylist
+
+ try:
+ with open(self._path, 'r') as f:
+ denylist = json.load(f)
+ except (IOError, ValueError) as e:
+ logger.warning('Unable to read denylist: %s', str(e))
+ os.remove(self._path)
+
+ if not isinstance(denylist, dict):
+ logger.warning('Ignoring %s: %s (a dict was expected instead)',
+ self._path, denylist)
+ denylist = dict()
+
+ return denylist
+
+ def Write(self, denylist):
+ """Writes the provided denylist to the denylist file.
+
+ Args:
+ denylist: list of bad devices to write to the denylist file.
+ """
+ with self._denylist_lock:
+ with open(self._path, 'w') as f:
+ json.dump(denylist, f)
+
+ def Extend(self, devices, reason='unknown'):
+ """Adds devices to denylist file.
+
+ Args:
+ devices: list of bad devices to be added to the denylist file.
+ reason: string specifying the reason for denylist (eg: 'unauthorized')
+ """
+ timestamp = time.time()
+ event_info = {
+ 'timestamp': timestamp,
+ 'reason': reason,
+ }
+ device_dicts = {device: event_info for device in devices}
+ logger.info('Adding %s to denylist %s for reason: %s', ','.join(devices),
+ self._path, reason)
+ with self._denylist_lock:
+ denylist = self.Read()
+ denylist.update(device_dicts)
+ self.Write(denylist)
+
+ def Reset(self):
+ """Erases the denylist file if it exists."""
+ logger.info('Resetting denylist %s', self._path)
+ with self._denylist_lock:
+ if os.path.exists(self._path):
+ os.remove(self._path)