diff options
Diffstat (limited to 'catapult/common/py_utils/py_utils/ts_proxy_server.py')
-rw-r--r-- | catapult/common/py_utils/py_utils/ts_proxy_server.py | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/catapult/common/py_utils/py_utils/ts_proxy_server.py b/catapult/common/py_utils/py_utils/ts_proxy_server.py new file mode 100644 index 00000000..b71d143d --- /dev/null +++ b/catapult/common/py_utils/py_utils/ts_proxy_server.py @@ -0,0 +1,222 @@ +# Copyright 2016 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Start and stop tsproxy.""" + +import logging +import os +import re +import signal +import subprocess +import sys +import time + +try: + import fcntl +except ImportError: + fcntl = None + +import py_utils +from py_utils import retry_util +from py_utils import atexit_with_log + +_TSPROXY_PATH = os.path.join( + py_utils.GetCatapultDir(), 'third_party', 'tsproxy', 'tsproxy.py') + +class TsProxyServerError(Exception): + """Catch-all exception for tsProxy Server.""" + pass + +def ParseTsProxyPortFromOutput(output_line): + port_re = re.compile( + r'Started Socks5 proxy server on ' + r'(?P<host>[^:]*):' + r'(?P<port>\d+)') + m = port_re.match(output_line) + if m: + return int(m.group('port')) + + +class TsProxyServer(object): + """Start and stop tsproxy. + + TsProxy provides basic latency, download and upload traffic shaping. This + class provides a programming API to the tsproxy script in + catapult/third_party/tsproxy/tsproxy.py + + This class can be used as a context manager. + """ + + def __init__(self, host_ip=None, http_port=None, https_port=None): + """ + Initialize TsProxyServer. + + Args: + + host_ip: A string of the host ip address. + http_port: A decimal of the port used for http traffic. + https_port: a decimal of the port used for https traffic. + + """ + self._proc = None + self._port = None + self._is_running = False + self._host_ip = host_ip + assert bool(http_port) == bool(https_port) + self._http_port = http_port + self._https_port = https_port + self._non_blocking = False + self._rtt = None + self._inbkps = None + self._outkbps = None + + @property + def port(self): + return self._port + + @retry_util.RetryOnException(TsProxyServerError, retries=3) + def StartServer(self, timeout=10, retries=None): + """Start TsProxy server and verify that it started.""" + del retries # Handled by decorator. + cmd_line = [sys.executable, _TSPROXY_PATH] + # Use port 0 so tsproxy picks a random available port. + cmd_line.extend(['--port=0']) + if self._host_ip: + cmd_line.append('--desthost=%s' % self._host_ip) + if self._http_port: + cmd_line.append( + '--mapports=443:%s,*:%s' % (self._https_port, self._http_port)) + logging.info('Tsproxy commandline: %s', cmd_line) + self._proc = subprocess.Popen( + cmd_line, stdout=subprocess.PIPE, stdin=subprocess.PIPE, + stderr=subprocess.PIPE, bufsize=1) + self._non_blocking = False + if fcntl: + logging.info('fcntl is supported, trying to set ' + 'non blocking I/O for the ts_proxy process') + fd = self._proc.stdout.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + self._non_blocking = True + + atexit_with_log.Register(self.StopServer) + try: + py_utils.WaitFor(self._IsStarted, timeout) + logging.info('TsProxy port: %s', self._port) + self._is_running = True + except py_utils.TimeoutException: + err = self.StopServer() + if err: + logging.error('Error stopping WPR server:\n%s', err) + raise TsProxyServerError( + 'Error starting tsproxy: timed out after %s seconds' % timeout) + + def _IsStarted(self): + assert not self._is_running + assert self._proc + if self._proc.poll() is not None: + return False + self._proc.stdout.flush() + output_line = self._ReadLineTsProxyStdout(timeout=5) + logging.debug('TsProxy output: %s', output_line) + self._port = ParseTsProxyPortFromOutput(output_line) + return self._port != None + + def _ReadLineTsProxyStdout(self, timeout): + def ReadSingleLine(): + try: + return self._proc.stdout.readline().strip() + except IOError: + # Add a sleep to avoid trying to read self._proc.stdout too often. + if self._non_blocking: + time.sleep(0.5) + return None + return py_utils.WaitFor(ReadSingleLine, timeout) + + @retry_util.RetryOnException(TsProxyServerError, retries=3) + def _IssueCommand(self, command_string, timeout, retries=None): + del retries # handled by the decorator + logging.info('Issuing command to ts_proxy_server: %s', command_string) + command_output = [] + self._proc.stdin.write('%s\n' % command_string) + def CommandStatusIsRead(): + self._proc.stdin.flush() + self._proc.stdout.flush() + command_output.append(self._ReadLineTsProxyStdout(timeout)) + return command_output[-1] == 'OK' or command_output[-1] == 'ERROR' + + py_utils.WaitFor(CommandStatusIsRead, timeout) + + success = 'OK' in command_output + logging.log(logging.DEBUG if success else logging.ERROR, + 'TsProxy output:\n%s', '\n'.join(command_output)) + if not success: + raise TsProxyServerError('Failed to execute command: %s', command_string) + + def UpdateOutboundPorts(self, http_port, https_port, timeout=5): + assert http_port and https_port + assert http_port != https_port + assert isinstance(http_port, int) and isinstance(https_port, int) + assert 1 <= http_port <= 65535 + assert 1 <= https_port <= 65535 + self._IssueCommand('set mapports 443:%i,*:%i' % (https_port, http_port), + timeout) + + def UpdateTrafficSettings( + self, round_trip_latency_ms=None, + download_bandwidth_kbps=None, upload_bandwidth_kbps=None, timeout=20): + """Update traffic settings of the proxy server. + + Notes that this method only updates the specified parameter. + """ + # Memorize the traffic settings & only execute the command if the traffic + # settings are different. + if round_trip_latency_ms is not None and self._rtt != round_trip_latency_ms: + self._IssueCommand('set rtt %s' % round_trip_latency_ms, timeout) + self._rtt = round_trip_latency_ms + + if (download_bandwidth_kbps is not None and + self._inbkps != download_bandwidth_kbps): + self._IssueCommand('set inkbps %s' % download_bandwidth_kbps, timeout) + self._inbkps = download_bandwidth_kbps + + if (upload_bandwidth_kbps is not None and + self._outkbps != upload_bandwidth_kbps): + self._IssueCommand('set outkbps %s' % upload_bandwidth_kbps, timeout) + self._outkbps = upload_bandwidth_kbps + + def StopServer(self): + """Stop TsProxy Server.""" + if not self._is_running: + logging.debug('Attempting to stop TsProxy server that is not running.') + return + if not self._proc: + return + try: + py_utils.WaitFor(lambda: self._proc.poll() is not None, 10) + except py_utils.TimeoutException: + try: + # Use a SIGNINT so that it can do graceful cleanup + self._proc.send_signal(signal.SIGINT) + except ValueError: + logging.warning('Unable to stop ts_proxy_server gracefully.\n') + self._proc.terminate() + _, err = self._proc.communicate() + + self._proc = None + self._port = None + self._is_running = False + self._rtt = None + self._inbkps = None + self._outkbps = None + return err + + def __enter__(self): + """Add support for with-statement.""" + self.StartServer() + return self + + def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb): + """Add support for with-statement.""" + self.StopServer() |