aboutsummaryrefslogtreecommitdiff
path: root/vendor_libs/test_vendor_lib/scripts/test_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'vendor_libs/test_vendor_lib/scripts/test_channel.py')
-rw-r--r--vendor_libs/test_vendor_lib/scripts/test_channel.py73
1 files changed, 19 insertions, 54 deletions
diff --git a/vendor_libs/test_vendor_lib/scripts/test_channel.py b/vendor_libs/test_vendor_lib/scripts/test_channel.py
index bca358c72..79b19b1bb 100644
--- a/vendor_libs/test_vendor_lib/scripts/test_channel.py
+++ b/vendor_libs/test_vendor_lib/scripts/test_channel.py
@@ -42,7 +42,6 @@ import socket
import string
import struct
import sys
-import time
DEVICE_NAME_LENGTH = 6
DEVICE_ADDRESS_LENGTH = 6
@@ -74,7 +73,7 @@ class Connection(object):
self._socket.close()
def send(self, data):
- self._socket.sendall(data.encode())
+ self._socket.sendall(data)
def receive(self, size):
return self._socket.recv(size)
@@ -101,35 +100,37 @@ class TestChannel(object):
args_size = len(args)
self.lint_command(name, args, name_size, args_size)
encoded_name = chr(name_size) + name
- encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
+ encoded_args = chr(args_size) + ''.join(
+ chr(len(arg)) + arg for arg in args)
command = encoded_name + encoded_args
if self._closed:
return
self._connection.send(command)
if name != 'CLOSE_TEST_CHANNEL':
- print(self.receive_response().decode())
+ print self.receive_response()
def receive_response(self):
if self._closed:
- return b'Closed'
+ return
size_chars = self._connection.receive(4)
size_bytes = bytearray(size_chars)
if not size_chars:
- return b'No response, assuming that the connection is broken'
+ print 'No response, assuming that the connection is broken'
+ return False
response_size = 0
for i in range(0, len(size_chars) - 1):
- response_size |= (size_chars[i] << (8 * i))
+ response_size |= ord(size_chars[i]) << (8 * i)
response = self._connection.receive(response_size)
return response
def lint_command(self, name, args, name_size, args_size):
assert name_size == len(name) and args_size == len(args)
try:
- name.encode()
+ name.encode('utf-8')
for arg in args:
- arg.encode()
+ arg.encode('utf-8')
except UnicodeError:
- print('Unrecognized characters.')
+ print 'Unrecognized characters.'
raise
if name_size > 255 or args_size > 255:
raise ValueError # Size must be encodable in one octet.
@@ -220,43 +221,6 @@ class TestChannelShell(cmd.Cmd):
"""
self._test_channel.send_command('list', args.split())
- def do_set_timer_period(self, args):
- """Arguments: period_ms Set the timer to fire every period_ms milliseconds
- """
- self._test_channel.send_command('set_timer_period', args.split())
-
- def do_start_timer(self, args):
- """Arguments: None. Start the timer.
- """
- self._test_channel.send_command('start_timer', args.split())
-
- def do_stop_timer(self, args):
- """Arguments: None. Stop the timer.
- """
- self._test_channel.send_command('stop_timer', args.split())
-
- def do_wait(self, args):
- """Arguments: time in seconds (float).
- """
- sleep_time = float(args.split()[0])
- time.sleep(sleep_time)
-
- def do_reset(self, args):
- """Arguments: None.
-
- Resets the simulation.
- """
- self._test_channel.send_command('reset', [])
-
- def do_end(self, args):
- """Arguments: None.
-
- Ends the simulation and exits.
- """
- self._test_channel.send_command('END_SIMULATION', [])
- print('Goodbye.')
- return True
-
def do_quit(self, args):
"""Arguments: None.
@@ -264,7 +228,7 @@ class TestChannelShell(cmd.Cmd):
"""
self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
self._test_channel.close()
- print('Goodbye.')
+ print 'Goodbye.'
return True
def do_help(self, args):
@@ -300,23 +264,24 @@ class TestChannelShell(cmd.Cmd):
def main(argv):
if len(argv) != 2:
- print('Usage: python test_channel.py [port]')
+ print 'Usage: python test_channel.py [port]'
return
try:
port = int(argv[1])
except ValueError:
- print('Error parsing port.')
+ print 'Error parsing port.'
else:
try:
test_channel = TestChannel(port)
- except socket.error as e:
- print('Error connecting to socket: %s' % e)
+ except socket.error, e:
+ print 'Error connecting to socket: %s' % e
except:
- print('Error creating test channel (check argument).')
+ print 'Error creating test channel (check argument).'
else:
test_channel_shell = TestChannelShell(test_channel)
test_channel_shell.prompt = '$ '
- test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
+ test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':