diff options
Diffstat (limited to 'serial/urlhandler/protocol_cp2110.py')
-rw-r--r-- | serial/urlhandler/protocol_cp2110.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/serial/urlhandler/protocol_cp2110.py b/serial/urlhandler/protocol_cp2110.py new file mode 100644 index 0000000..44ad4eb --- /dev/null +++ b/serial/urlhandler/protocol_cp2110.py @@ -0,0 +1,258 @@ +#! python +# +# Backend for Silicon Labs CP2110/4 HID-to-UART devices. +# +# This file is part of pySerial. https://github.com/pyserial/pyserial +# (C) 2001-2015 Chris Liechti <cliechti@gmx.net> +# (C) 2019 Google LLC +# +# SPDX-License-Identifier: BSD-3-Clause + +# This backend implements support for HID-to-UART devices manufactured +# by Silicon Labs and marketed as CP2110 and CP2114. The +# implementation is (mostly) OS-independent and in userland. It relies +# on cython-hidapi (https://github.com/trezor/cython-hidapi). + +# The HID-to-UART protocol implemented by CP2110/4 is described in the +# AN434 document from Silicon Labs: +# https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf + +# TODO items: + +# - rtscts support is configured for hardware flow control, but the +# signaling is missing (AN434 suggests this is done through GPIO). +# - Cancelling reads and writes is not supported. +# - Baudrate validation is not implemented, as it depends on model and configuration. + +import struct +import threading + +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + +try: + import Queue +except ImportError: + import queue as Queue + +import hid # hidapi + +import serial +from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout + + +# Report IDs and related constant +_REPORT_GETSET_UART_ENABLE = 0x41 +_DISABLE_UART = 0x00 +_ENABLE_UART = 0x01 + +_REPORT_SET_PURGE_FIFOS = 0x43 +_PURGE_TX_FIFO = 0x01 +_PURGE_RX_FIFO = 0x02 + +_REPORT_GETSET_UART_CONFIG = 0x50 + +_REPORT_SET_TRANSMIT_LINE_BREAK = 0x51 +_REPORT_SET_STOP_LINE_BREAK = 0x52 + + +class Serial(SerialBase): + # This is not quite correct. AN343 specifies that the minimum + # baudrate is different between CP2110 and CP2114, and it's halved + # when using non-8-bit symbols. + BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200, + 38400, 57600, 115200, 230400, 460800, 500000, 576000, + 921600, 1000000) + + def __init__(self, *args, **kwargs): + self._hid_handle = None + self._read_buffer = None + self._thread = None + super(Serial, self).__init__(*args, **kwargs) + + def open(self): + if self._port is None: + raise SerialException("Port must be configured before it can be used.") + if self.is_open: + raise SerialException("Port is already open.") + + self._read_buffer = Queue.Queue() + + self._hid_handle = hid.device() + try: + portpath = self.from_url(self.portstr) + self._hid_handle.open_path(portpath) + except OSError as msg: + raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) + + try: + self._reconfigure_port() + except: + try: + self._hid_handle.close() + except: + pass + self._hid_handle = None + raise + else: + self.is_open = True + self._thread = threading.Thread(target=self._hid_read_loop) + self._thread.setDaemon(True) + self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port)) + self._thread.start() + + def from_url(self, url): + parts = urlparse.urlsplit(url) + if parts.scheme != "cp2110": + raise SerialException( + 'expected a string in the forms ' + '"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": ' + 'not starting with cp2110:// {{!r}}'.format(parts.scheme)) + if parts.netloc: # cp2100://BUS:DEVICE:ENDPOINT, for libusb + return parts.netloc.encode('utf-8') + return parts.path.encode('utf-8') + + def close(self): + self.is_open = False + if self._thread: + self._thread.join(1) # read timeout is 0.1 + self._thread = None + self._hid_handle.close() + self._hid_handle = None + + def _reconfigure_port(self): + parity_value = None + if self._parity == serial.PARITY_NONE: + parity_value = 0x00 + elif self._parity == serial.PARITY_ODD: + parity_value = 0x01 + elif self._parity == serial.PARITY_EVEN: + parity_value = 0x02 + elif self._parity == serial.PARITY_MARK: + parity_value = 0x03 + elif self._parity == serial.PARITY_SPACE: + parity_value = 0x04 + else: + raise ValueError('Invalid parity: {!r}'.format(self._parity)) + + if self.rtscts: + flow_control_value = 0x01 + else: + flow_control_value = 0x00 + + data_bits_value = None + if self._bytesize == 5: + data_bits_value = 0x00 + elif self._bytesize == 6: + data_bits_value = 0x01 + elif self._bytesize == 7: + data_bits_value = 0x02 + elif self._bytesize == 8: + data_bits_value = 0x03 + else: + raise ValueError('Invalid char len: {!r}'.format(self._bytesize)) + + stop_bits_value = None + if self._stopbits == serial.STOPBITS_ONE: + stop_bits_value = 0x00 + elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: + stop_bits_value = 0x01 + elif self._stopbits == serial.STOPBITS_TWO: + stop_bits_value = 0x01 + else: + raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits)) + + configuration_report = struct.pack( + '>BLBBBB', + _REPORT_GETSET_UART_CONFIG, + self._baudrate, + parity_value, + flow_control_value, + data_bits_value, + stop_bits_value) + + self._hid_handle.send_feature_report(configuration_report) + + self._hid_handle.send_feature_report( + bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART))) + self._update_break_state() + + @property + def in_waiting(self): + return self._read_buffer.qsize() + + def reset_input_buffer(self): + if not self.is_open: + raise PortNotOpenError() + self._hid_handle.send_feature_report( + bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO))) + # empty read buffer + while self._read_buffer.qsize(): + self._read_buffer.get(False) + + def reset_output_buffer(self): + if not self.is_open: + raise PortNotOpenError() + self._hid_handle.send_feature_report( + bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO))) + + def _update_break_state(self): + if not self._hid_handle: + raise PortNotOpenError() + + if self._break_state: + self._hid_handle.send_feature_report( + bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0))) + else: + # Note that while AN434 states "There are no data bytes in + # the payload other than the Report ID", either hidapi or + # Linux does not seem to send the report otherwise. + self._hid_handle.send_feature_report( + bytes((_REPORT_SET_STOP_LINE_BREAK, 0))) + + def read(self, size=1): + if not self.is_open: + raise PortNotOpenError() + + data = bytearray() + try: + timeout = Timeout(self._timeout) + while len(data) < size: + if self._thread is None: + raise SerialException('connection failed (reader thread died)') + buf = self._read_buffer.get(True, timeout.time_left()) + if buf is None: + return bytes(data) + data += buf + if timeout.expired(): + break + except Queue.Empty: # -> timeout + pass + return bytes(data) + + def write(self, data): + if not self.is_open: + raise PortNotOpenError() + data = to_bytes(data) + tx_len = len(data) + while tx_len > 0: + to_be_sent = min(tx_len, 0x3F) + report = to_bytes([to_be_sent]) + data[:to_be_sent] + self._hid_handle.write(report) + + data = data[to_be_sent:] + tx_len = len(data) + + def _hid_read_loop(self): + try: + while self.is_open: + data = self._hid_handle.read(64, timeout_ms=100) + if not data: + continue + data_len = data.pop(0) + assert data_len == len(data) + self._read_buffer.put(bytearray(data)) + finally: + self._thread = None |