diff options
Diffstat (limited to 'serial/threaded/__init__.py')
-rw-r--r-- | serial/threaded/__init__.py | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/serial/threaded/__init__.py b/serial/threaded/__init__.py new file mode 100644 index 0000000..b8940b6 --- /dev/null +++ b/serial/threaded/__init__.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +# +# Working with threading and pySerial +# +# This file is part of pySerial. https://github.com/pyserial/pyserial +# (C) 2015-2016 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause +"""\ +Support threading with serial ports. +""" +from __future__ import absolute_import + +import serial +import threading + + +class Protocol(object): + """\ + Protocol as used by the ReaderThread. This base class provides empty + implementations of all methods. + """ + + def connection_made(self, transport): + """Called when reader thread is started""" + + def data_received(self, data): + """Called with snippets received from the serial port""" + + def connection_lost(self, exc): + """\ + Called when the serial port is closed or the reader loop terminated + otherwise. + """ + if isinstance(exc, Exception): + raise exc + + +class Packetizer(Protocol): + """ + Read binary packets from serial port. Packets are expected to be terminated + with a TERMINATOR byte (null byte by default). + + The class also keeps track of the transport. + """ + + TERMINATOR = b'\0' + + def __init__(self): + self.buffer = bytearray() + self.transport = None + + def connection_made(self, transport): + """Store transport""" + self.transport = transport + + def connection_lost(self, exc): + """Forget transport""" + self.transport = None + super(Packetizer, self).connection_lost(exc) + + def data_received(self, data): + """Buffer received data, find TERMINATOR, call handle_packet""" + self.buffer.extend(data) + while self.TERMINATOR in self.buffer: + packet, self.buffer = self.buffer.split(self.TERMINATOR, 1) + self.handle_packet(packet) + + def handle_packet(self, packet): + """Process packets - to be overridden by subclassing""" + raise NotImplementedError('please implement functionality in handle_packet') + + +class FramedPacket(Protocol): + """ + Read binary packets. Packets are expected to have a start and stop marker. + + The class also keeps track of the transport. + """ + + START = b'(' + STOP = b')' + + def __init__(self): + self.packet = bytearray() + self.in_packet = False + self.transport = None + + def connection_made(self, transport): + """Store transport""" + self.transport = transport + + def connection_lost(self, exc): + """Forget transport""" + self.transport = None + self.in_packet = False + del self.packet[:] + super(FramedPacket, self).connection_lost(exc) + + def data_received(self, data): + """Find data enclosed in START/STOP, call handle_packet""" + for byte in serial.iterbytes(data): + if byte == self.START: + self.in_packet = True + elif byte == self.STOP: + self.in_packet = False + self.handle_packet(bytes(self.packet)) # make read-only copy + del self.packet[:] + elif self.in_packet: + self.packet.extend(byte) + else: + self.handle_out_of_packet_data(byte) + + def handle_packet(self, packet): + """Process packets - to be overridden by subclassing""" + raise NotImplementedError('please implement functionality in handle_packet') + + def handle_out_of_packet_data(self, data): + """Process data that is received outside of packets""" + pass + + +class LineReader(Packetizer): + """ + Read and write (Unicode) lines from/to serial port. + The encoding is applied. + """ + + TERMINATOR = b'\r\n' + ENCODING = 'utf-8' + UNICODE_HANDLING = 'replace' + + def handle_packet(self, packet): + self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING)) + + def handle_line(self, line): + """Process one line - to be overridden by subclassing""" + raise NotImplementedError('please implement functionality in handle_line') + + def write_line(self, text): + """ + Write text to the transport. ``text`` is a Unicode string and the encoding + is applied before sending ans also the newline is append. + """ + # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call + self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR) + + +class ReaderThread(threading.Thread): + """\ + Implement a serial port read loop and dispatch to a Protocol instance (like + the asyncio.Protocol) but do it with threads. + + Calls to close() will close the serial port but it is also possible to just + stop() this thread and continue the serial port instance otherwise. + """ + + def __init__(self, serial_instance, protocol_factory): + """\ + Initialize thread. + + Note that the serial_instance' timeout is set to one second! + Other settings are not changed. + """ + super(ReaderThread, self).__init__() + self.daemon = True + self.serial = serial_instance + self.protocol_factory = protocol_factory + self.alive = True + self._lock = threading.Lock() + self._connection_made = threading.Event() + self.protocol = None + + def stop(self): + """Stop the reader thread""" + self.alive = False + if hasattr(self.serial, 'cancel_read'): + self.serial.cancel_read() + self.join(2) + + def run(self): + """Reader loop""" + if not hasattr(self.serial, 'cancel_read'): + self.serial.timeout = 1 + self.protocol = self.protocol_factory() + try: + self.protocol.connection_made(self) + except Exception as e: + self.alive = False + self.protocol.connection_lost(e) + self._connection_made.set() + return + error = None + self._connection_made.set() + while self.alive and self.serial.is_open: + try: + # read all that is there or wait for one byte (blocking) + data = self.serial.read(self.serial.in_waiting or 1) + except serial.SerialException as e: + # probably some I/O problem such as disconnected USB serial + # adapters -> exit + error = e + break + else: + if data: + # make a separated try-except for called user code + try: + self.protocol.data_received(data) + except Exception as e: + error = e + break + self.alive = False + self.protocol.connection_lost(error) + self.protocol = None + + def write(self, data): + """Thread safe writing (uses lock)""" + with self._lock: + return self.serial.write(data) + + def close(self): + """Close the serial port and exit reader thread (uses lock)""" + # use the lock to let other threads finish writing + with self._lock: + # first stop reading, so that closing can be done on idle port + self.stop() + self.serial.close() + + def connect(self): + """ + Wait until connection is set up and return the transport and protocol + instances. + """ + if self.alive: + self._connection_made.wait() + if not self.alive: + raise RuntimeError('connection_lost already called') + return (self, self.protocol) + else: + raise RuntimeError('already stopped') + + # - - context manager, returns protocol + + def __enter__(self): + """\ + Enter context handler. May raise RuntimeError in case the connection + could not be created. + """ + self.start() + self._connection_made.wait() + if not self.alive: + raise RuntimeError('connection_lost already called') + return self.protocol + + def __exit__(self, exc_type, exc_val, exc_tb): + """Leave context: close port""" + self.close() + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +# test +if __name__ == '__main__': + # pylint: disable=wrong-import-position + import sys + import time + import traceback + + #~ PORT = 'spy:///dev/ttyUSB0' + PORT = 'loop://' + + class PrintLines(LineReader): + def connection_made(self, transport): + super(PrintLines, self).connection_made(transport) + sys.stdout.write('port opened\n') + self.write_line('hello world') + + def handle_line(self, data): + sys.stdout.write('line received: {!r}\n'.format(data)) + + def connection_lost(self, exc): + if exc: + traceback.print_exc(exc) + sys.stdout.write('port closed\n') + + ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) + with ReaderThread(ser, PrintLines) as protocol: + protocol.write_line('hello') + time.sleep(2) + + # alternative usage + ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) + t = ReaderThread(ser, PrintLines) + t.start() + transport, protocol = t.connect() + protocol.write_line('hello') + time.sleep(2) + t.close() |