aboutsummaryrefslogtreecommitdiff
path: root/catapult/common/py_utils/py_utils/__init__.py
blob: fba0897c9939564031478b03228217634fbc32cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python

# Copyright (c) 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import functools
import inspect
import os
import sys
import time
import platform


def GetCatapultDir():
  return os.path.normpath(
      os.path.join(os.path.dirname(__file__), '..', '..', '..'))


def IsRunningOnCrosDevice():
  """Returns True if we're on a ChromeOS device."""
  lsb_release = '/etc/lsb-release'
  if sys.platform.startswith('linux') and os.path.exists(lsb_release):
    with open(lsb_release, 'r') as f:
      res = f.read()
      if res.count('CHROMEOS_RELEASE_NAME'):
        return True
  return False


def GetHostOsName():
  if IsRunningOnCrosDevice():
    return 'chromeos'
  elif sys.platform.startswith('linux'):
    return 'linux'
  elif sys.platform == 'darwin':
    return 'mac'
  elif sys.platform == 'win32':
    return 'win'


def GetHostArchName():
  return platform.machine()


def _ExecutableExtensions():
  # pathext is, e.g. '.com;.exe;.bat;.cmd'
  exts = os.getenv('PATHEXT').split(';') #e.g. ['.com','.exe','.bat','.cmd']
  return [x[1:].upper() for x in exts] #e.g. ['COM','EXE','BAT','CMD']


def IsExecutable(path):
  if os.path.isfile(path):
    if hasattr(os, 'name') and os.name == 'nt':
      return path.split('.')[-1].upper() in _ExecutableExtensions()
    else:
      return os.access(path, os.X_OK)
  else:
    return False


def _AddDirToPythonPath(*path_parts):
  path = os.path.abspath(os.path.join(*path_parts))
  if os.path.isdir(path) and path not in sys.path:
    # Some callsite that use telemetry assumes that sys.path[0] is the directory
    # containing the script, so we add these extra paths to right after it.
    sys.path.insert(1, path)

_AddDirToPythonPath(os.path.join(GetCatapultDir(), 'devil'))
_AddDirToPythonPath(os.path.join(GetCatapultDir(), 'dependency_manager'))
_AddDirToPythonPath(os.path.join(GetCatapultDir(), 'third_party', 'mock'))
# mox3 is needed for pyfakefs usage, but not for pylint.
_AddDirToPythonPath(os.path.join(GetCatapultDir(), 'third_party', 'mox3'))
_AddDirToPythonPath(
    os.path.join(GetCatapultDir(), 'third_party', 'pyfakefs'))

from devil.utils import timeout_retry
from devil.utils import reraiser_thread


# Decorator that adds timeout functionality to a function.
def Timeout(default_timeout):
  return lambda func: TimeoutDeco(func, default_timeout)

# Note: Even though the "timeout" keyword argument is the only
# keyword argument that will need to be given to the decorated function,
# we still have to use the **kwargs syntax, because we have to use
# the *args syntax here before (since the decorator decorates functions
# with different numbers of positional arguments) and Python doesn't allow
# a single named keyword argument after *args.
# (e.g., 'def foo(*args, bar=42):' is a syntax error)

def TimeoutDeco(func, default_timeout):
  @functools.wraps(func)
  def RunWithTimeout(*args, **kwargs):
    if 'timeout' in kwargs:
      timeout = kwargs['timeout']
    else:
      timeout = default_timeout
    try:
      return timeout_retry.Run(func, timeout, 0, args=args)
    except reraiser_thread.TimeoutError:
      print '%s timed out.' % func.__name__
      return False
  return RunWithTimeout


MIN_POLL_INTERVAL_IN_SECONDS = 0.1
MAX_POLL_INTERVAL_IN_SECONDS = 5
OUTPUT_INTERVAL_IN_SECONDS = 300

def WaitFor(condition, timeout):
  """Waits for up to |timeout| secs for the function |condition| to return True.

  Polling frequency is (elapsed_time / 10), with a min of .1s and max of 5s.

  Returns:
    Result of |condition| function (if present).
  """
  def GetConditionString():
    if condition.__name__ == '<lambda>':
      try:
        return inspect.getsource(condition).strip()
      except IOError:
        pass
    return condition.__name__

  # Do an initial check to see if its true.
  res = condition()
  if res:
    return res
  start_time = time.time()
  last_output_time = start_time
  elapsed_time = time.time() - start_time
  while elapsed_time < timeout:
    res = condition()
    if res:
      return res
    now = time.time()
    elapsed_time = now - start_time
    last_output_elapsed_time = now - last_output_time
    if last_output_elapsed_time > OUTPUT_INTERVAL_IN_SECONDS:
      last_output_time = time.time()
    poll_interval = min(max(elapsed_time / 10., MIN_POLL_INTERVAL_IN_SECONDS),
                        MAX_POLL_INTERVAL_IN_SECONDS)
    time.sleep(poll_interval)
  raise TimeoutException('Timed out while waiting %ds for %s.' %
                         (timeout, GetConditionString()))

class TimeoutException(Exception):
  """The operation failed to complete because of a timeout.

  It is possible that waiting for a longer period of time would result in a
  successful operation.
  """
  pass