aboutsummaryrefslogtreecommitdiff
path: root/serial/threaded/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'serial/threaded/__init__.py')
-rw-r--r--serial/threaded/__init__.py297
1 files changed, 297 insertions, 0 deletions
diff --git a/serial/threaded/__init__.py b/serial/threaded/__init__.py
new file mode 100644
index 0000000..b8940b6
--- /dev/null
+++ b/serial/threaded/__init__.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python3
+#
+# Working with threading and pySerial
+#
+# This file is part of pySerial. https://github.com/pyserial/pyserial
+# (C) 2015-2016 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+"""\
+Support threading with serial ports.
+"""
+from __future__ import absolute_import
+
+import serial
+import threading
+
+
+class Protocol(object):
+ """\
+ Protocol as used by the ReaderThread. This base class provides empty
+ implementations of all methods.
+ """
+
+ def connection_made(self, transport):
+ """Called when reader thread is started"""
+
+ def data_received(self, data):
+ """Called with snippets received from the serial port"""
+
+ def connection_lost(self, exc):
+ """\
+ Called when the serial port is closed or the reader loop terminated
+ otherwise.
+ """
+ if isinstance(exc, Exception):
+ raise exc
+
+
+class Packetizer(Protocol):
+ """
+ Read binary packets from serial port. Packets are expected to be terminated
+ with a TERMINATOR byte (null byte by default).
+
+ The class also keeps track of the transport.
+ """
+
+ TERMINATOR = b'\0'
+
+ def __init__(self):
+ self.buffer = bytearray()
+ self.transport = None
+
+ def connection_made(self, transport):
+ """Store transport"""
+ self.transport = transport
+
+ def connection_lost(self, exc):
+ """Forget transport"""
+ self.transport = None
+ super(Packetizer, self).connection_lost(exc)
+
+ def data_received(self, data):
+ """Buffer received data, find TERMINATOR, call handle_packet"""
+ self.buffer.extend(data)
+ while self.TERMINATOR in self.buffer:
+ packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
+ self.handle_packet(packet)
+
+ def handle_packet(self, packet):
+ """Process packets - to be overridden by subclassing"""
+ raise NotImplementedError('please implement functionality in handle_packet')
+
+
+class FramedPacket(Protocol):
+ """
+ Read binary packets. Packets are expected to have a start and stop marker.
+
+ The class also keeps track of the transport.
+ """
+
+ START = b'('
+ STOP = b')'
+
+ def __init__(self):
+ self.packet = bytearray()
+ self.in_packet = False
+ self.transport = None
+
+ def connection_made(self, transport):
+ """Store transport"""
+ self.transport = transport
+
+ def connection_lost(self, exc):
+ """Forget transport"""
+ self.transport = None
+ self.in_packet = False
+ del self.packet[:]
+ super(FramedPacket, self).connection_lost(exc)
+
+ def data_received(self, data):
+ """Find data enclosed in START/STOP, call handle_packet"""
+ for byte in serial.iterbytes(data):
+ if byte == self.START:
+ self.in_packet = True
+ elif byte == self.STOP:
+ self.in_packet = False
+ self.handle_packet(bytes(self.packet)) # make read-only copy
+ del self.packet[:]
+ elif self.in_packet:
+ self.packet.extend(byte)
+ else:
+ self.handle_out_of_packet_data(byte)
+
+ def handle_packet(self, packet):
+ """Process packets - to be overridden by subclassing"""
+ raise NotImplementedError('please implement functionality in handle_packet')
+
+ def handle_out_of_packet_data(self, data):
+ """Process data that is received outside of packets"""
+ pass
+
+
+class LineReader(Packetizer):
+ """
+ Read and write (Unicode) lines from/to serial port.
+ The encoding is applied.
+ """
+
+ TERMINATOR = b'\r\n'
+ ENCODING = 'utf-8'
+ UNICODE_HANDLING = 'replace'
+
+ def handle_packet(self, packet):
+ self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
+
+ def handle_line(self, line):
+ """Process one line - to be overridden by subclassing"""
+ raise NotImplementedError('please implement functionality in handle_line')
+
+ def write_line(self, text):
+ """
+ Write text to the transport. ``text`` is a Unicode string and the encoding
+ is applied before sending ans also the newline is append.
+ """
+ # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
+ self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
+
+
+class ReaderThread(threading.Thread):
+ """\
+ Implement a serial port read loop and dispatch to a Protocol instance (like
+ the asyncio.Protocol) but do it with threads.
+
+ Calls to close() will close the serial port but it is also possible to just
+ stop() this thread and continue the serial port instance otherwise.
+ """
+
+ def __init__(self, serial_instance, protocol_factory):
+ """\
+ Initialize thread.
+
+ Note that the serial_instance' timeout is set to one second!
+ Other settings are not changed.
+ """
+ super(ReaderThread, self).__init__()
+ self.daemon = True
+ self.serial = serial_instance
+ self.protocol_factory = protocol_factory
+ self.alive = True
+ self._lock = threading.Lock()
+ self._connection_made = threading.Event()
+ self.protocol = None
+
+ def stop(self):
+ """Stop the reader thread"""
+ self.alive = False
+ if hasattr(self.serial, 'cancel_read'):
+ self.serial.cancel_read()
+ self.join(2)
+
+ def run(self):
+ """Reader loop"""
+ if not hasattr(self.serial, 'cancel_read'):
+ self.serial.timeout = 1
+ self.protocol = self.protocol_factory()
+ try:
+ self.protocol.connection_made(self)
+ except Exception as e:
+ self.alive = False
+ self.protocol.connection_lost(e)
+ self._connection_made.set()
+ return
+ error = None
+ self._connection_made.set()
+ while self.alive and self.serial.is_open:
+ try:
+ # read all that is there or wait for one byte (blocking)
+ data = self.serial.read(self.serial.in_waiting or 1)
+ except serial.SerialException as e:
+ # probably some I/O problem such as disconnected USB serial
+ # adapters -> exit
+ error = e
+ break
+ else:
+ if data:
+ # make a separated try-except for called user code
+ try:
+ self.protocol.data_received(data)
+ except Exception as e:
+ error = e
+ break
+ self.alive = False
+ self.protocol.connection_lost(error)
+ self.protocol = None
+
+ def write(self, data):
+ """Thread safe writing (uses lock)"""
+ with self._lock:
+ return self.serial.write(data)
+
+ def close(self):
+ """Close the serial port and exit reader thread (uses lock)"""
+ # use the lock to let other threads finish writing
+ with self._lock:
+ # first stop reading, so that closing can be done on idle port
+ self.stop()
+ self.serial.close()
+
+ def connect(self):
+ """
+ Wait until connection is set up and return the transport and protocol
+ instances.
+ """
+ if self.alive:
+ self._connection_made.wait()
+ if not self.alive:
+ raise RuntimeError('connection_lost already called')
+ return (self, self.protocol)
+ else:
+ raise RuntimeError('already stopped')
+
+ # - - context manager, returns protocol
+
+ def __enter__(self):
+ """\
+ Enter context handler. May raise RuntimeError in case the connection
+ could not be created.
+ """
+ self.start()
+ self._connection_made.wait()
+ if not self.alive:
+ raise RuntimeError('connection_lost already called')
+ return self.protocol
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Leave context: close port"""
+ self.close()
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# test
+if __name__ == '__main__':
+ # pylint: disable=wrong-import-position
+ import sys
+ import time
+ import traceback
+
+ #~ PORT = 'spy:///dev/ttyUSB0'
+ PORT = 'loop://'
+
+ class PrintLines(LineReader):
+ def connection_made(self, transport):
+ super(PrintLines, self).connection_made(transport)
+ sys.stdout.write('port opened\n')
+ self.write_line('hello world')
+
+ def handle_line(self, data):
+ sys.stdout.write('line received: {!r}\n'.format(data))
+
+ def connection_lost(self, exc):
+ if exc:
+ traceback.print_exc(exc)
+ sys.stdout.write('port closed\n')
+
+ ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
+ with ReaderThread(ser, PrintLines) as protocol:
+ protocol.write_line('hello')
+ time.sleep(2)
+
+ # alternative usage
+ ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
+ t = ReaderThread(ser, PrintLines)
+ t.start()
+ transport, protocol = t.connect()
+ protocol.write_line('hello')
+ time.sleep(2)
+ t.close()