diff options
Diffstat (limited to 'vendor_libs/test_vendor_lib/scripts/test_channel.py')
-rw-r--r-- | vendor_libs/test_vendor_lib/scripts/test_channel.py | 73 |
1 files changed, 19 insertions, 54 deletions
diff --git a/vendor_libs/test_vendor_lib/scripts/test_channel.py b/vendor_libs/test_vendor_lib/scripts/test_channel.py index bca358c72..79b19b1bb 100644 --- a/vendor_libs/test_vendor_lib/scripts/test_channel.py +++ b/vendor_libs/test_vendor_lib/scripts/test_channel.py @@ -42,7 +42,6 @@ import socket import string import struct import sys -import time DEVICE_NAME_LENGTH = 6 DEVICE_ADDRESS_LENGTH = 6 @@ -74,7 +73,7 @@ class Connection(object): self._socket.close() def send(self, data): - self._socket.sendall(data.encode()) + self._socket.sendall(data) def receive(self, size): return self._socket.recv(size) @@ -101,35 +100,37 @@ class TestChannel(object): args_size = len(args) self.lint_command(name, args, name_size, args_size) encoded_name = chr(name_size) + name - encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) + encoded_args = chr(args_size) + ''.join( + chr(len(arg)) + arg for arg in args) command = encoded_name + encoded_args if self._closed: return self._connection.send(command) if name != 'CLOSE_TEST_CHANNEL': - print(self.receive_response().decode()) + print self.receive_response() def receive_response(self): if self._closed: - return b'Closed' + return size_chars = self._connection.receive(4) size_bytes = bytearray(size_chars) if not size_chars: - return b'No response, assuming that the connection is broken' + print 'No response, assuming that the connection is broken' + return False response_size = 0 for i in range(0, len(size_chars) - 1): - response_size |= (size_chars[i] << (8 * i)) + response_size |= ord(size_chars[i]) << (8 * i) response = self._connection.receive(response_size) return response def lint_command(self, name, args, name_size, args_size): assert name_size == len(name) and args_size == len(args) try: - name.encode() + name.encode('utf-8') for arg in args: - arg.encode() + arg.encode('utf-8') except UnicodeError: - print('Unrecognized characters.') + print 'Unrecognized characters.' raise if name_size > 255 or args_size > 255: raise ValueError # Size must be encodable in one octet. @@ -220,43 +221,6 @@ class TestChannelShell(cmd.Cmd): """ self._test_channel.send_command('list', args.split()) - def do_set_timer_period(self, args): - """Arguments: period_ms Set the timer to fire every period_ms milliseconds - """ - self._test_channel.send_command('set_timer_period', args.split()) - - def do_start_timer(self, args): - """Arguments: None. Start the timer. - """ - self._test_channel.send_command('start_timer', args.split()) - - def do_stop_timer(self, args): - """Arguments: None. Stop the timer. - """ - self._test_channel.send_command('stop_timer', args.split()) - - def do_wait(self, args): - """Arguments: time in seconds (float). - """ - sleep_time = float(args.split()[0]) - time.sleep(sleep_time) - - def do_reset(self, args): - """Arguments: None. - - Resets the simulation. - """ - self._test_channel.send_command('reset', []) - - def do_end(self, args): - """Arguments: None. - - Ends the simulation and exits. - """ - self._test_channel.send_command('END_SIMULATION', []) - print('Goodbye.') - return True - def do_quit(self, args): """Arguments: None. @@ -264,7 +228,7 @@ class TestChannelShell(cmd.Cmd): """ self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) self._test_channel.close() - print('Goodbye.') + print 'Goodbye.' return True def do_help(self, args): @@ -300,23 +264,24 @@ class TestChannelShell(cmd.Cmd): def main(argv): if len(argv) != 2: - print('Usage: python test_channel.py [port]') + print 'Usage: python test_channel.py [port]' return try: port = int(argv[1]) except ValueError: - print('Error parsing port.') + print 'Error parsing port.' else: try: test_channel = TestChannel(port) - except socket.error as e: - print('Error connecting to socket: %s' % e) + except socket.error, e: + print 'Error connecting to socket: %s' % e except: - print('Error creating test channel (check argument).') + print 'Error creating test channel (check argument).' else: test_channel_shell = TestChannelShell(test_channel) test_channel_shell.prompt = '$ ' - test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') + test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': |