aboutsummaryrefslogtreecommitdiff
path: root/serial/tools/miniterm.py
diff options
context:
space:
mode:
Diffstat (limited to 'serial/tools/miniterm.py')
-rw-r--r--serial/tools/miniterm.py355
1 files changed, 210 insertions, 145 deletions
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 14182f0..3b8d5d2 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -3,10 +3,12 @@
# Very simple serial terminal
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
-# (C)2002-2015 Chris Liechti <cliechti@gmx.net>
+# (C)2002-2017 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
+from __future__ import absolute_import
+
import codecs
import os
import sys
@@ -275,12 +277,12 @@ class DebugIO(Transform):
"""Print what is sent and received"""
def rx(self, text):
- sys.stderr.write(' [RX:{}] '.format(repr(text)))
+ sys.stderr.write(' [RX:{!r}] '.format(text))
sys.stderr.flush()
return text
def tx(self, text):
- sys.stderr.write(' [TX:{}] '.format(repr(text)))
+ sys.stderr.write(' [TX:{!r}] '.format(text))
sys.stderr.flush()
return text
@@ -315,7 +317,7 @@ def ask_for_port():
sys.stderr.write('\n--- Available ports:\n')
ports = []
for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
- sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc))
+ sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
ports.append(port)
while True:
port = raw_input('--- Enter port index or full name: ')
@@ -347,8 +349,8 @@ class Miniterm(object):
self.eol = eol
self.filters = filters
self.update_transformations()
- self.exit_character = 0x1d # GS/CTRL+]
- self.menu_character = 0x14 # Menu: CTRL+T
+ self.exit_character = unichr(0x1d) # GS/CTRL+]
+ self.menu_character = unichr(0x14) # Menu: CTRL+T
self.alive = None
self._reader_alive = None
self.receiver_thread = None
@@ -502,25 +504,7 @@ class Miniterm(object):
if self.echo:
self.console.write(c)
elif c == '\x15': # CTRL+U -> upload file
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- with self.console:
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- with open(filename, 'rb') as f:
- sys.stderr.write('--- Sending file {} ---\n'.format(filename))
- while True:
- block = f.read(1024)
- if not block:
- break
- self.serial.write(block)
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
- except IOError as e:
- sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+ self.upload_file()
elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
sys.stderr.write(self.get_help_text())
elif c == '\x12': # CTRL+R -> Toggle RTS
@@ -536,24 +520,9 @@ class Miniterm(object):
self.echo = not self.echo
sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
elif c == '\x06': # CTRL+F -> edit filters
- sys.stderr.write('\n--- Available Filters:\n')
- sys.stderr.write('\n'.join(
- '--- {:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
- with self.console:
- new_filters = sys.stdin.readline().lower().split()
- if new_filters:
- for f in new_filters:
- if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
- break
- else:
- self.filters = new_filters
- self.update_transformations()
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+ self.change_filter()
elif c == '\x0c': # CTRL+L -> EOL mode
- modes = list(EOL_TRANSFORMATIONS) # keys
+ modes = list(EOL_TRANSFORMATIONS) # keys
eol = modes.index(self.eol) + 1
if eol >= len(modes):
eol = 0
@@ -561,63 +530,17 @@ class Miniterm(object):
sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
self.update_transformations()
elif c == '\x01': # CTRL+A -> set encoding
- sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
- with self.console:
- new_encoding = sys.stdin.readline().strip()
- if new_encoding:
- try:
- codecs.lookup(new_encoding)
- except LookupError:
- sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
- else:
- self.set_rx_encoding(new_encoding)
- self.set_tx_encoding(new_encoding)
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+ self.change_encoding()
elif c == '\x09': # CTRL+I -> info
self.dump_port_settings()
#~ elif c == '\x01': # CTRL+A -> cycle escape mode
#~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
elif c in 'pP': # P -> change port
- with self.console:
- try:
- port = ask_for_port()
- except KeyboardInterrupt:
- port = None
- if port and port != self.serial.port:
- # reader thread needs to be shut down
- self._stop_reader()
- # save settings
- settings = self.serial.getSettingsDict()
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- # restore settings and open
- new_serial.applySettingsDict(settings)
- new_serial.rts = self.serial.rts
- new_serial.dtr = self.serial.dtr
- new_serial.open()
- new_serial.break_condition = self.serial.break_condition
- except Exception as e:
- sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
- new_serial.close()
- else:
- self.serial.close()
- self.serial = new_serial
- sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
- # and restart the reader thread
- self._start_reader()
+ self.change_port()
+ elif c in 'sS': # S -> suspend / open port temporarily
+ self.suspend_port()
elif c in 'bB': # B -> change baudrate
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- with self.console:
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError as e:
- sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
+ self.change_baudrate()
elif c == '8': # 8 -> change to 8 bits
self.serial.bytesize = serial.EIGHTBITS
self.dump_port_settings()
@@ -657,6 +580,138 @@ class Miniterm(object):
else:
sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
+ def upload_file(self):
+ """Ask user for filenname and send its contents"""
+ sys.stderr.write('\n--- File to upload: ')
+ sys.stderr.flush()
+ with self.console:
+ filename = sys.stdin.readline().rstrip('\r\n')
+ if filename:
+ try:
+ with open(filename, 'rb') as f:
+ sys.stderr.write('--- Sending file {} ---\n'.format(filename))
+ while True:
+ block = f.read(1024)
+ if not block:
+ break
+ self.serial.write(block)
+ # Wait for output buffer to drain.
+ self.serial.flush()
+ sys.stderr.write('.') # Progress indicator.
+ sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
+ except IOError as e:
+ sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+
+ def change_filter(self):
+ """change the i/o transformations"""
+ sys.stderr.write('\n--- Available Filters:\n')
+ sys.stderr.write('\n'.join(
+ '--- {:<10} = {.__doc__}'.format(k, v)
+ for k, v in sorted(TRANSFORMATIONS.items())))
+ sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
+ with self.console:
+ new_filters = sys.stdin.readline().lower().split()
+ if new_filters:
+ for f in new_filters:
+ if f not in TRANSFORMATIONS:
+ sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
+ break
+ else:
+ self.filters = new_filters
+ self.update_transformations()
+ sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+
+ def change_encoding(self):
+ """change encoding on the serial port"""
+ sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
+ with self.console:
+ new_encoding = sys.stdin.readline().strip()
+ if new_encoding:
+ try:
+ codecs.lookup(new_encoding)
+ except LookupError:
+ sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
+ else:
+ self.set_rx_encoding(new_encoding)
+ self.set_tx_encoding(new_encoding)
+ sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
+ sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+
+ def change_baudrate(self):
+ """change the baudrate"""
+ sys.stderr.write('\n--- Baudrate: ')
+ sys.stderr.flush()
+ with self.console:
+ backup = self.serial.baudrate
+ try:
+ self.serial.baudrate = int(sys.stdin.readline().strip())
+ except ValueError as e:
+ sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
+ self.serial.baudrate = backup
+ else:
+ self.dump_port_settings()
+
+ def change_port(self):
+ """Have a conversation with the user to change the serial port"""
+ with self.console:
+ try:
+ port = ask_for_port()
+ except KeyboardInterrupt:
+ port = None
+ if port and port != self.serial.port:
+ # reader thread needs to be shut down
+ self._stop_reader()
+ # save settings
+ settings = self.serial.getSettingsDict()
+ try:
+ new_serial = serial.serial_for_url(port, do_not_open=True)
+ # restore settings and open
+ new_serial.applySettingsDict(settings)
+ new_serial.rts = self.serial.rts
+ new_serial.dtr = self.serial.dtr
+ new_serial.open()
+ new_serial.break_condition = self.serial.break_condition
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
+ new_serial.close()
+ else:
+ self.serial.close()
+ self.serial = new_serial
+ sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
+ # and restart the reader thread
+ self._start_reader()
+
+ def suspend_port(self):
+ """\
+ open port temporarily, allow reconnect, exit and port change to get
+ out of the loop
+ """
+ # reader thread needs to be shut down
+ self._stop_reader()
+ self.serial.close()
+ sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
+ do_change_port = False
+ while not self.serial.is_open:
+ sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
+ exit=key_description(self.exit_character)))
+ k = self.console.getkey()
+ if k == self.exit_character:
+ self.stop() # exit app
+ break
+ elif k in 'pP':
+ do_change_port = True
+ break
+ try:
+ self.serial.open()
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
+ if do_change_port:
+ self.change_port()
+ else:
+ # and restart the reader thread
+ self._start_reader()
+ sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
+
def get_help_text(self):
"""return the help text"""
# help text, starts with blank line!
@@ -707,123 +762,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
import argparse
parser = argparse.ArgumentParser(
- description="Miniterm - A simple terminal program for the serial port.")
+ description='Miniterm - A simple terminal program for the serial port.')
parser.add_argument(
- "port",
+ 'port',
nargs='?',
- help="serial port name ('-' to show port list)",
+ help='serial port name ("-" to show port list)',
default=default_port)
parser.add_argument(
- "baudrate",
+ 'baudrate',
nargs='?',
type=int,
- help="set baud rate, default: %(default)s",
+ help='set baud rate, default: %(default)s',
default=default_baudrate)
- group = parser.add_argument_group("port settings")
+ group = parser.add_argument_group('port settings')
group.add_argument(
- "--parity",
+ '--parity',
choices=['N', 'E', 'O', 'S', 'M'],
type=lambda c: c.upper(),
- help="set parity, one of {N E O S M}, default: N",
+ help='set parity, one of {N E O S M}, default: N',
default='N')
group.add_argument(
- "--rtscts",
- action="store_true",
- help="enable RTS/CTS flow control (default off)",
+ '--rtscts',
+ action='store_true',
+ help='enable RTS/CTS flow control (default off)',
default=False)
group.add_argument(
- "--xonxoff",
- action="store_true",
- help="enable software flow control (default off)",
+ '--xonxoff',
+ action='store_true',
+ help='enable software flow control (default off)',
default=False)
group.add_argument(
- "--rts",
+ '--rts',
type=int,
- help="set initial RTS line state (possible values: 0, 1)",
+ help='set initial RTS line state (possible values: 0, 1)',
default=default_rts)
group.add_argument(
- "--dtr",
+ '--dtr',
type=int,
- help="set initial DTR line state (possible values: 0, 1)",
+ help='set initial DTR line state (possible values: 0, 1)',
default=default_dtr)
group.add_argument(
- "--ask",
- action="store_true",
- help="ask again for port when open fails",
+ '--non-exclusive',
+ dest='exclusive',
+ action='store_false',
+ help='disable locking for native ports',
+ default=True)
+
+ group.add_argument(
+ '--ask',
+ action='store_true',
+ help='ask again for port when open fails',
default=False)
- group = parser.add_argument_group("data handling")
+ group = parser.add_argument_group('data handling')
group.add_argument(
- "-e", "--echo",
- action="store_true",
- help="enable local echo (default off)",
+ '-e', '--echo',
+ action='store_true',
+ help='enable local echo (default off)',
default=False)
group.add_argument(
- "--encoding",
- dest="serial_port_encoding",
- metavar="CODEC",
- help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s",
+ '--encoding',
+ dest='serial_port_encoding',
+ metavar='CODEC',
+ help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
default='UTF-8')
group.add_argument(
- "-f", "--filter",
- action="append",
- metavar="NAME",
- help="add text transformation",
+ '-f', '--filter',
+ action='append',
+ metavar='NAME',
+ help='add text transformation',
default=[])
group.add_argument(
- "--eol",
+ '--eol',
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
- help="end of line mode",
+ help='end of line mode',
default='CRLF')
group.add_argument(
- "--raw",
- action="store_true",
- help="Do no apply any encodings/transformations",
+ '--raw',
+ action='store_true',
+ help='Do no apply any encodings/transformations',
default=False)
- group = parser.add_argument_group("hotkeys")
+ group = parser.add_argument_group('hotkeys')
group.add_argument(
- "--exit-char",
+ '--exit-char',
type=int,
metavar='NUM',
- help="Unicode of special character that is used to exit the application, default: %(default)s",
+ help='Unicode of special character that is used to exit the application, default: %(default)s',
default=0x1d) # GS/CTRL+]
group.add_argument(
- "--menu-char",
+ '--menu-char',
type=int,
metavar='NUM',
- help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s",
+ help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
default=0x14) # Menu: CTRL+T
- group = parser.add_argument_group("diagnostics")
+ group = parser.add_argument_group('diagnostics')
group.add_argument(
- "-q", "--quiet",
- action="store_true",
- help="suppress non-error messages",
+ '-q', '--quiet',
+ action='store_true',
+ help='suppress non-error messages',
default=False)
group.add_argument(
- "--develop",
- action="store_true",
- help="show Python traceback on error",
+ '--develop',
+ action='store_true',
+ help='show Python traceback on error',
default=False)
args = parser.parse_args()
@@ -876,9 +938,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
serial_instance.rts = args.rts
+ if isinstance(serial_instance, serial.Serial):
+ serial_instance.exclusive = args.exclusive
+
serial_instance.open()
except serial.SerialException as e:
- sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e))
+ sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
if args.develop:
raise
if not args.ask:
@@ -914,7 +979,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
except KeyboardInterrupt:
pass
if not args.quiet:
- sys.stderr.write("\n--- exit ---\n")
+ sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()