aboutsummaryrefslogtreecommitdiff
path: root/catapult/devil/devil/android/device_denylist.py
blob: 88b5969cbaa01d3e24a2b9a1a6bd51d43c1b7997 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import json
import logging
import os
import threading
import time

logger = logging.getLogger(__name__)


class Denylist(object):
  def __init__(self, path):
    self._denylist_lock = threading.RLock()
    self._path = path

  def Read(self):
    """Reads the denylist from the denylist file.

    Returns:
      A dict containing bad devices.
    """
    with self._denylist_lock:
      denylist = dict()
      if not os.path.exists(self._path):
        return denylist

      try:
        with open(self._path, 'r') as f:
          denylist = json.load(f)
      except (IOError, ValueError) as e:
        logger.warning('Unable to read denylist: %s', str(e))
        os.remove(self._path)

      if not isinstance(denylist, dict):
        logger.warning('Ignoring %s: %s (a dict was expected instead)',
                       self._path, denylist)
        denylist = dict()

      return denylist

  def Write(self, denylist):
    """Writes the provided denylist to the denylist file.

    Args:
      denylist: list of bad devices to write to the denylist file.
    """
    with self._denylist_lock:
      with open(self._path, 'w') as f:
        json.dump(denylist, f)

  def Extend(self, devices, reason='unknown'):
    """Adds devices to denylist file.

    Args:
      devices: list of bad devices to be added to the denylist file.
      reason: string specifying the reason for denylist (eg: 'unauthorized')
    """
    timestamp = time.time()
    event_info = {
        'timestamp': timestamp,
        'reason': reason,
    }
    device_dicts = {device: event_info for device in devices}
    logger.info('Adding %s to denylist %s for reason: %s', ','.join(devices),
                self._path, reason)
    with self._denylist_lock:
      denylist = self.Read()
      denylist.update(device_dicts)
      self.Write(denylist)

  def Reset(self):
    """Erases the denylist file if it exists."""
    logger.info('Resetting denylist %s', self._path)
    with self._denylist_lock:
      if os.path.exists(self._path):
        os.remove(self._path)