aboutsummaryrefslogtreecommitdiff
path: root/catapult/common/py_utils/py_utils/ts_proxy_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'catapult/common/py_utils/py_utils/ts_proxy_server.py')
-rw-r--r--catapult/common/py_utils/py_utils/ts_proxy_server.py222
1 files changed, 222 insertions, 0 deletions
diff --git a/catapult/common/py_utils/py_utils/ts_proxy_server.py b/catapult/common/py_utils/py_utils/ts_proxy_server.py
new file mode 100644
index 00000000..b71d143d
--- /dev/null
+++ b/catapult/common/py_utils/py_utils/ts_proxy_server.py
@@ -0,0 +1,222 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Start and stop tsproxy."""
+
+import logging
+import os
+import re
+import signal
+import subprocess
+import sys
+import time
+
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
+
+import py_utils
+from py_utils import retry_util
+from py_utils import atexit_with_log
+
+_TSPROXY_PATH = os.path.join(
+ py_utils.GetCatapultDir(), 'third_party', 'tsproxy', 'tsproxy.py')
+
+class TsProxyServerError(Exception):
+ """Catch-all exception for tsProxy Server."""
+ pass
+
+def ParseTsProxyPortFromOutput(output_line):
+ port_re = re.compile(
+ r'Started Socks5 proxy server on '
+ r'(?P<host>[^:]*):'
+ r'(?P<port>\d+)')
+ m = port_re.match(output_line)
+ if m:
+ return int(m.group('port'))
+
+
+class TsProxyServer(object):
+ """Start and stop tsproxy.
+
+ TsProxy provides basic latency, download and upload traffic shaping. This
+ class provides a programming API to the tsproxy script in
+ catapult/third_party/tsproxy/tsproxy.py
+
+ This class can be used as a context manager.
+ """
+
+ def __init__(self, host_ip=None, http_port=None, https_port=None):
+ """
+ Initialize TsProxyServer.
+
+ Args:
+
+ host_ip: A string of the host ip address.
+ http_port: A decimal of the port used for http traffic.
+ https_port: a decimal of the port used for https traffic.
+
+ """
+ self._proc = None
+ self._port = None
+ self._is_running = False
+ self._host_ip = host_ip
+ assert bool(http_port) == bool(https_port)
+ self._http_port = http_port
+ self._https_port = https_port
+ self._non_blocking = False
+ self._rtt = None
+ self._inbkps = None
+ self._outkbps = None
+
+ @property
+ def port(self):
+ return self._port
+
+ @retry_util.RetryOnException(TsProxyServerError, retries=3)
+ def StartServer(self, timeout=10, retries=None):
+ """Start TsProxy server and verify that it started."""
+ del retries # Handled by decorator.
+ cmd_line = [sys.executable, _TSPROXY_PATH]
+ # Use port 0 so tsproxy picks a random available port.
+ cmd_line.extend(['--port=0'])
+ if self._host_ip:
+ cmd_line.append('--desthost=%s' % self._host_ip)
+ if self._http_port:
+ cmd_line.append(
+ '--mapports=443:%s,*:%s' % (self._https_port, self._http_port))
+ logging.info('Tsproxy commandline: %s', cmd_line)
+ self._proc = subprocess.Popen(
+ cmd_line, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE, bufsize=1)
+ self._non_blocking = False
+ if fcntl:
+ logging.info('fcntl is supported, trying to set '
+ 'non blocking I/O for the ts_proxy process')
+ fd = self._proc.stdout.fileno()
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+ self._non_blocking = True
+
+ atexit_with_log.Register(self.StopServer)
+ try:
+ py_utils.WaitFor(self._IsStarted, timeout)
+ logging.info('TsProxy port: %s', self._port)
+ self._is_running = True
+ except py_utils.TimeoutException:
+ err = self.StopServer()
+ if err:
+ logging.error('Error stopping WPR server:\n%s', err)
+ raise TsProxyServerError(
+ 'Error starting tsproxy: timed out after %s seconds' % timeout)
+
+ def _IsStarted(self):
+ assert not self._is_running
+ assert self._proc
+ if self._proc.poll() is not None:
+ return False
+ self._proc.stdout.flush()
+ output_line = self._ReadLineTsProxyStdout(timeout=5)
+ logging.debug('TsProxy output: %s', output_line)
+ self._port = ParseTsProxyPortFromOutput(output_line)
+ return self._port != None
+
+ def _ReadLineTsProxyStdout(self, timeout):
+ def ReadSingleLine():
+ try:
+ return self._proc.stdout.readline().strip()
+ except IOError:
+ # Add a sleep to avoid trying to read self._proc.stdout too often.
+ if self._non_blocking:
+ time.sleep(0.5)
+ return None
+ return py_utils.WaitFor(ReadSingleLine, timeout)
+
+ @retry_util.RetryOnException(TsProxyServerError, retries=3)
+ def _IssueCommand(self, command_string, timeout, retries=None):
+ del retries # handled by the decorator
+ logging.info('Issuing command to ts_proxy_server: %s', command_string)
+ command_output = []
+ self._proc.stdin.write('%s\n' % command_string)
+ def CommandStatusIsRead():
+ self._proc.stdin.flush()
+ self._proc.stdout.flush()
+ command_output.append(self._ReadLineTsProxyStdout(timeout))
+ return command_output[-1] == 'OK' or command_output[-1] == 'ERROR'
+
+ py_utils.WaitFor(CommandStatusIsRead, timeout)
+
+ success = 'OK' in command_output
+ logging.log(logging.DEBUG if success else logging.ERROR,
+ 'TsProxy output:\n%s', '\n'.join(command_output))
+ if not success:
+ raise TsProxyServerError('Failed to execute command: %s', command_string)
+
+ def UpdateOutboundPorts(self, http_port, https_port, timeout=5):
+ assert http_port and https_port
+ assert http_port != https_port
+ assert isinstance(http_port, int) and isinstance(https_port, int)
+ assert 1 <= http_port <= 65535
+ assert 1 <= https_port <= 65535
+ self._IssueCommand('set mapports 443:%i,*:%i' % (https_port, http_port),
+ timeout)
+
+ def UpdateTrafficSettings(
+ self, round_trip_latency_ms=None,
+ download_bandwidth_kbps=None, upload_bandwidth_kbps=None, timeout=20):
+ """Update traffic settings of the proxy server.
+
+ Notes that this method only updates the specified parameter.
+ """
+ # Memorize the traffic settings & only execute the command if the traffic
+ # settings are different.
+ if round_trip_latency_ms is not None and self._rtt != round_trip_latency_ms:
+ self._IssueCommand('set rtt %s' % round_trip_latency_ms, timeout)
+ self._rtt = round_trip_latency_ms
+
+ if (download_bandwidth_kbps is not None and
+ self._inbkps != download_bandwidth_kbps):
+ self._IssueCommand('set inkbps %s' % download_bandwidth_kbps, timeout)
+ self._inbkps = download_bandwidth_kbps
+
+ if (upload_bandwidth_kbps is not None and
+ self._outkbps != upload_bandwidth_kbps):
+ self._IssueCommand('set outkbps %s' % upload_bandwidth_kbps, timeout)
+ self._outkbps = upload_bandwidth_kbps
+
+ def StopServer(self):
+ """Stop TsProxy Server."""
+ if not self._is_running:
+ logging.debug('Attempting to stop TsProxy server that is not running.')
+ return
+ if not self._proc:
+ return
+ try:
+ py_utils.WaitFor(lambda: self._proc.poll() is not None, 10)
+ except py_utils.TimeoutException:
+ try:
+ # Use a SIGNINT so that it can do graceful cleanup
+ self._proc.send_signal(signal.SIGINT)
+ except ValueError:
+ logging.warning('Unable to stop ts_proxy_server gracefully.\n')
+ self._proc.terminate()
+ _, err = self._proc.communicate()
+
+ self._proc = None
+ self._port = None
+ self._is_running = False
+ self._rtt = None
+ self._inbkps = None
+ self._outkbps = None
+ return err
+
+ def __enter__(self):
+ """Add support for with-statement."""
+ self.StartServer()
+ return self
+
+ def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb):
+ """Add support for with-statement."""
+ self.StopServer()