aboutsummaryrefslogtreecommitdiff
path: root/catapult/common/py_utils/py_utils/ts_proxy_server.py
blob: ffed090f158af86302b0266a9c84761891fca43d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Start and stop tsproxy."""

import logging
import os
import re
import signal
import subprocess
import sys
import time

try:
  import fcntl
except ImportError:
  fcntl = None

import py_utils
from py_utils import retry_util
from py_utils import atexit_with_log

_TSPROXY_PATH = os.path.join(
    py_utils.GetCatapultDir(), 'third_party', 'tsproxy', 'tsproxy.py')

class TsProxyServerError(Exception):
  """Catch-all exception for tsProxy Server."""
  pass

def ParseTsProxyPortFromOutput(output_line):
  port_re = re.compile(
      r'Started Socks5 proxy server on '
      r'(?P<host>[^:]*):'
      r'(?P<port>\d+)')
  m = port_re.match(output_line)
  if m:
    return int(m.group('port'))


class TsProxyServer(object):
  """Start and stop tsproxy.

  TsProxy provides basic latency, download and upload traffic shaping. This
  class provides a programming API to the tsproxy script in
  catapult/third_party/tsproxy/tsproxy.py

  This class can be used as a context manager.
  """

  def __init__(self, host_ip=None, http_port=None, https_port=None):
    """
    Initialize TsProxyServer.

    Args:

      host_ip: A string of the host ip address.
      http_port: A decimal of the port used for http traffic.
      https_port: a decimal of the port used for https traffic.

    """
    self._proc = None
    self._port = None
    self._is_running = False
    self._host_ip = host_ip
    assert bool(http_port) == bool(https_port)
    self._http_port = http_port
    self._https_port = https_port
    self._non_blocking = False
    self._rtt = None
    self._inbkps = None
    self._outkbps = None

  @property
  def port(self):
    return self._port

  @retry_util.RetryOnException(TsProxyServerError, retries=3)
  def StartServer(self, timeout=10, retries=None):
    """Start TsProxy server and verify that it started."""
    del retries # Handled by decorator.
    cmd_line = [sys.executable, _TSPROXY_PATH]
    # Use port 0 so tsproxy picks a random available port.
    cmd_line.extend(['--port=0'])
    if self._host_ip:
      cmd_line.append('--desthost=%s' % self._host_ip)
    if self._http_port:
      cmd_line.append(
          '--mapports=443:%s,*:%s' % (self._https_port, self._http_port))
      logging.info('Tsproxy commandline: %s', cmd_line)
    self._proc = subprocess.Popen(
        cmd_line, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
        stderr=subprocess.PIPE, bufsize=1)
    self._non_blocking = False
    if fcntl:
      logging.info('fcntl is supported, trying to set '
                   'non blocking I/O for the ts_proxy process')
      fd = self._proc.stdout.fileno()
      fl = fcntl.fcntl(fd, fcntl.F_GETFL)
      fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
      self._non_blocking = True

    atexit_with_log.Register(self.StopServer)
    try:
      py_utils.WaitFor(self._IsStarted, timeout)
      logging.info('TsProxy port: %s', self._port)
      self._is_running = True
    except py_utils.TimeoutException:
      err = self.StopServer()
      if err:
        logging.error('Error stopping WPR server:\n%s', err)
      raise TsProxyServerError(
          'Error starting tsproxy: timed out after %s seconds' % timeout)

  def _IsStarted(self):
    assert not self._is_running
    assert self._proc
    if self._proc.poll() is not None:
      return False
    self._proc.stdout.flush()
    output_line = self._ReadLineTsProxyStdout(timeout=5)
    logging.debug('TsProxy output: %s', output_line)
    self._port = ParseTsProxyPortFromOutput(output_line)
    return self._port != None

  def _ReadLineTsProxyStdout(self, timeout):
    def ReadSingleLine():
      try:
        return self._proc.stdout.readline().strip()
      except IOError:
        # Add a sleep to avoid trying to read self._proc.stdout too often.
        if self._non_blocking:
          time.sleep(0.5)
        return None
    return py_utils.WaitFor(ReadSingleLine, timeout)

  @retry_util.RetryOnException(TsProxyServerError, retries=3)
  def _IssueCommand(self, command_string, timeout, retries=None):
    del retries  # handled by the decorator
    logging.info('Issuing command to ts_proxy_server: %s', command_string)
    command_output = []
    self._proc.stdin.write('%s\n' % command_string)
    def CommandStatusIsRead():
      self._proc.stdin.flush()
      self._proc.stdout.flush()
      command_output.append(self._ReadLineTsProxyStdout(timeout))
      return command_output[-1] == 'OK' or command_output[-1] == 'ERROR'

    py_utils.WaitFor(CommandStatusIsRead, timeout)

    success = 'OK' in command_output
    logging.log(logging.DEBUG if success else logging.ERROR,
                'TsProxy output:\n%s', '\n'.join(command_output))
    if not success:
      raise TsProxyServerError('Failed to execute command: %s', command_string)

  def UpdateOutboundPorts(self, http_port, https_port, timeout=5):
    assert http_port and https_port
    assert http_port != https_port
    assert isinstance(http_port, int) and isinstance(https_port, int)
    assert 1 <= http_port <= 65535
    assert 1 <= https_port <= 65535
    self._IssueCommand('set mapports 443:%i,*:%i' % (https_port, http_port),
                       timeout)

  def UpdateTrafficSettings(
      self, round_trip_latency_ms=None,
      download_bandwidth_kbps=None, upload_bandwidth_kbps=None, timeout=20):
    """Update traffic settings of the proxy server.

    Notes that this method only updates the specified parameter.
    """
    # Memorize the traffic settings & only execute the command if the traffic
    # settings are different.
    if round_trip_latency_ms is not None and self._rtt != round_trip_latency_ms:
      self._IssueCommand('set rtt %s' % round_trip_latency_ms, timeout)
      self._rtt = round_trip_latency_ms

    if (download_bandwidth_kbps is not None and
        self._inbkps != download_bandwidth_kbps):
      self._IssueCommand('set inkbps %s' % download_bandwidth_kbps, timeout)
      self._inbkps = download_bandwidth_kbps

    if (upload_bandwidth_kbps is not None and
        self._outkbps != upload_bandwidth_kbps):
      self._IssueCommand('set outkbps %s' % upload_bandwidth_kbps, timeout)
      self._outkbps = upload_bandwidth_kbps

  def StopServer(self):
    """Stop TsProxy Server."""
    if not self._is_running:
      logging.debug('Attempting to stop TsProxy server that is not running.')
      return
    if not self._proc:
      return
    try:
      self._IssueCommand('exit', timeout=10)
      py_utils.WaitFor(lambda: self._proc.poll() is not None, 10)
    except py_utils.TimeoutException:
      # signal.SIGINT is not supported on Windows.
      if not sys.platform.startswith('win'):
        try:
          # Use a SIGNINT so that it can do graceful cleanup
          self._proc.send_signal(signal.SIGINT)
        except ValueError:
          logging.warning('Unable to stop ts_proxy_server gracefully.\n')
      self._proc.terminate()
    _, err = self._proc.communicate()

    self._proc = None
    self._port = None
    self._is_running = False
    self._rtt = None
    self._inbkps = None
    self._outkbps = None
    return err

  def __enter__(self):
    """Add support for with-statement."""
    self.StartServer()
    return self

  def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb):
    """Add support for with-statement."""
    self.StopServer()