1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Start and stop tsproxy."""
import logging
import os
import re
import signal
import subprocess
import sys
import time
try:
import fcntl
except ImportError:
fcntl = None
import py_utils
from py_utils import retry_util
from py_utils import atexit_with_log
_TSPROXY_PATH = os.path.join(
py_utils.GetCatapultDir(), 'third_party', 'tsproxy', 'tsproxy.py')
class TsProxyServerError(Exception):
"""Catch-all exception for tsProxy Server."""
pass
def ParseTsProxyPortFromOutput(output_line):
port_re = re.compile(
r'Started Socks5 proxy server on '
r'(?P<host>[^:]*):'
r'(?P<port>\d+)')
m = port_re.match(output_line)
if m:
return int(m.group('port'))
class TsProxyServer(object):
"""Start and stop tsproxy.
TsProxy provides basic latency, download and upload traffic shaping. This
class provides a programming API to the tsproxy script in
catapult/third_party/tsproxy/tsproxy.py
This class can be used as a context manager.
"""
def __init__(self, host_ip=None, http_port=None, https_port=None):
"""
Initialize TsProxyServer.
Args:
host_ip: A string of the host ip address.
http_port: A decimal of the port used for http traffic.
https_port: a decimal of the port used for https traffic.
"""
self._proc = None
self._port = None
self._is_running = False
self._host_ip = host_ip
assert bool(http_port) == bool(https_port)
self._http_port = http_port
self._https_port = https_port
self._non_blocking = False
self._rtt = None
self._inbkps = None
self._outkbps = None
@property
def port(self):
return self._port
@retry_util.RetryOnException(TsProxyServerError, retries=3)
def StartServer(self, timeout=10, retries=None):
"""Start TsProxy server and verify that it started."""
del retries # Handled by decorator.
cmd_line = [sys.executable, _TSPROXY_PATH]
# Use port 0 so tsproxy picks a random available port.
cmd_line.extend(['--port=0'])
if self._host_ip:
cmd_line.append('--desthost=%s' % self._host_ip)
if self._http_port:
cmd_line.append(
'--mapports=443:%s,*:%s' % (self._https_port, self._http_port))
logging.info('Tsproxy commandline: %s', cmd_line)
self._proc = subprocess.Popen(
cmd_line, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1)
self._non_blocking = False
if fcntl:
logging.info('fcntl is supported, trying to set '
'non blocking I/O for the ts_proxy process')
fd = self._proc.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self._non_blocking = True
atexit_with_log.Register(self.StopServer)
try:
py_utils.WaitFor(self._IsStarted, timeout)
logging.info('TsProxy port: %s', self._port)
self._is_running = True
except py_utils.TimeoutException:
err = self.StopServer()
if err:
logging.error('Error stopping WPR server:\n%s', err)
raise TsProxyServerError(
'Error starting tsproxy: timed out after %s seconds' % timeout)
def _IsStarted(self):
assert not self._is_running
assert self._proc
if self._proc.poll() is not None:
return False
self._proc.stdout.flush()
output_line = self._ReadLineTsProxyStdout(timeout=5)
logging.debug('TsProxy output: %s', output_line)
self._port = ParseTsProxyPortFromOutput(output_line)
return self._port != None
def _ReadLineTsProxyStdout(self, timeout):
def ReadSingleLine():
try:
return self._proc.stdout.readline().strip()
except IOError:
# Add a sleep to avoid trying to read self._proc.stdout too often.
if self._non_blocking:
time.sleep(0.5)
return None
return py_utils.WaitFor(ReadSingleLine, timeout)
@retry_util.RetryOnException(TsProxyServerError, retries=3)
def _IssueCommand(self, command_string, timeout, retries=None):
del retries # handled by the decorator
logging.info('Issuing command to ts_proxy_server: %s', command_string)
command_output = []
self._proc.stdin.write('%s\n' % command_string)
def CommandStatusIsRead():
self._proc.stdin.flush()
self._proc.stdout.flush()
command_output.append(self._ReadLineTsProxyStdout(timeout))
return command_output[-1] == 'OK' or command_output[-1] == 'ERROR'
py_utils.WaitFor(CommandStatusIsRead, timeout)
success = 'OK' in command_output
logging.log(logging.DEBUG if success else logging.ERROR,
'TsProxy output:\n%s', '\n'.join(command_output))
if not success:
raise TsProxyServerError('Failed to execute command: %s', command_string)
def UpdateOutboundPorts(self, http_port, https_port, timeout=5):
assert http_port and https_port
assert http_port != https_port
assert isinstance(http_port, int) and isinstance(https_port, int)
assert 1 <= http_port <= 65535
assert 1 <= https_port <= 65535
self._IssueCommand('set mapports 443:%i,*:%i' % (https_port, http_port),
timeout)
def UpdateTrafficSettings(
self, round_trip_latency_ms=None,
download_bandwidth_kbps=None, upload_bandwidth_kbps=None, timeout=20):
"""Update traffic settings of the proxy server.
Notes that this method only updates the specified parameter.
"""
# Memorize the traffic settings & only execute the command if the traffic
# settings are different.
if round_trip_latency_ms is not None and self._rtt != round_trip_latency_ms:
self._IssueCommand('set rtt %s' % round_trip_latency_ms, timeout)
self._rtt = round_trip_latency_ms
if (download_bandwidth_kbps is not None and
self._inbkps != download_bandwidth_kbps):
self._IssueCommand('set inkbps %s' % download_bandwidth_kbps, timeout)
self._inbkps = download_bandwidth_kbps
if (upload_bandwidth_kbps is not None and
self._outkbps != upload_bandwidth_kbps):
self._IssueCommand('set outkbps %s' % upload_bandwidth_kbps, timeout)
self._outkbps = upload_bandwidth_kbps
def StopServer(self):
"""Stop TsProxy Server."""
if not self._is_running:
logging.debug('Attempting to stop TsProxy server that is not running.')
return
if not self._proc:
return
try:
py_utils.WaitFor(lambda: self._proc.poll() is not None, 10)
except py_utils.TimeoutException:
# signal.SIGINT is not supported on Windows.
if not sys.platform.startswith('win'):
try:
# Use a SIGNINT so that it can do graceful cleanup
self._proc.send_signal(signal.SIGINT)
except ValueError:
logging.warning('Unable to stop ts_proxy_server gracefully.\n')
self._proc.terminate()
_, err = self._proc.communicate()
self._proc = None
self._port = None
self._is_running = False
self._rtt = None
self._inbkps = None
self._outkbps = None
return err
def __enter__(self):
"""Add support for with-statement."""
self.StartServer()
return self
def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb):
"""Add support for with-statement."""
self.StopServer()
|