| #! python |
| # |
| # Backend for Silicon Labs CP2110/4 HID-to-UART devices. |
| # |
| # This file is part of pySerial. https://github.com/pyserial/pyserial |
| # (C) 2001-2015 Chris Liechti <[email protected]> |
| # (C) 2019 Google LLC |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| # This backend implements support for HID-to-UART devices manufactured |
| # by Silicon Labs and marketed as CP2110 and CP2114. The |
| # implementation is (mostly) OS-independent and in userland. It relies |
| # on cython-hidapi (https://github.com/trezor/cython-hidapi). |
| |
| # The HID-to-UART protocol implemented by CP2110/4 is described in the |
| # AN434 document from Silicon Labs: |
| # https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf |
| |
| # TODO items: |
| |
| # - rtscts support is configured for hardware flow control, but the |
| # signaling is missing (AN434 suggests this is done through GPIO). |
| # - Cancelling reads and writes is not supported. |
| # - Baudrate validation is not implemented, as it depends on model and configuration. |
| |
| import struct |
| import threading |
| |
| try: |
| import urlparse |
| except ImportError: |
| import urllib.parse as urlparse |
| |
| try: |
| import Queue |
| except ImportError: |
| import queue as Queue |
| |
| import hid # hidapi |
| |
| import serial |
| from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout |
| |
| |
| # Report IDs and related constant |
| _REPORT_GETSET_UART_ENABLE = 0x41 |
| _DISABLE_UART = 0x00 |
| _ENABLE_UART = 0x01 |
| |
| _REPORT_SET_PURGE_FIFOS = 0x43 |
| _PURGE_TX_FIFO = 0x01 |
| _PURGE_RX_FIFO = 0x02 |
| |
| _REPORT_GETSET_UART_CONFIG = 0x50 |
| |
| _REPORT_SET_TRANSMIT_LINE_BREAK = 0x51 |
| _REPORT_SET_STOP_LINE_BREAK = 0x52 |
| |
| |
| class Serial(SerialBase): |
| # This is not quite correct. AN343 specifies that the minimum |
| # baudrate is different between CP2110 and CP2114, and it's halved |
| # when using non-8-bit symbols. |
| BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200, |
| 38400, 57600, 115200, 230400, 460800, 500000, 576000, |
| 921600, 1000000) |
| |
| def __init__(self, *args, **kwargs): |
| self._hid_handle = None |
| self._read_buffer = None |
| self._thread = None |
| super(Serial, self).__init__(*args, **kwargs) |
| |
| def open(self): |
| if self._port is None: |
| raise SerialException("Port must be configured before it can be used.") |
| if self.is_open: |
| raise SerialException("Port is already open.") |
| |
| self._read_buffer = Queue.Queue() |
| |
| self._hid_handle = hid.device() |
| try: |
| portpath = self.from_url(self.portstr) |
| self._hid_handle.open_path(portpath) |
| except OSError as msg: |
| raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) |
| |
| try: |
| self._reconfigure_port() |
| except: |
| try: |
| self._hid_handle.close() |
| except: |
| pass |
| self._hid_handle = None |
| raise |
| else: |
| self.is_open = True |
| self._thread = threading.Thread(target=self._hid_read_loop) |
| self._thread.setDaemon(True) |
| self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port)) |
| self._thread.start() |
| |
| def from_url(self, url): |
| parts = urlparse.urlsplit(url) |
| if parts.scheme != "cp2110": |
| raise SerialException( |
| 'expected a string in the forms ' |
| '"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": ' |
| 'not starting with cp2110:// {{!r}}'.format(parts.scheme)) |
| if parts.netloc: # cp2100://BUS:DEVICE:ENDPOINT, for libusb |
| return parts.netloc.encode('utf-8') |
| return parts.path.encode('utf-8') |
| |
| def close(self): |
| self.is_open = False |
| if self._thread: |
| self._thread.join(1) # read timeout is 0.1 |
| self._thread = None |
| self._hid_handle.close() |
| self._hid_handle = None |
| |
| def _reconfigure_port(self): |
| parity_value = None |
| if self._parity == serial.PARITY_NONE: |
| parity_value = 0x00 |
| elif self._parity == serial.PARITY_ODD: |
| parity_value = 0x01 |
| elif self._parity == serial.PARITY_EVEN: |
| parity_value = 0x02 |
| elif self._parity == serial.PARITY_MARK: |
| parity_value = 0x03 |
| elif self._parity == serial.PARITY_SPACE: |
| parity_value = 0x04 |
| else: |
| raise ValueError('Invalid parity: {!r}'.format(self._parity)) |
| |
| if self.rtscts: |
| flow_control_value = 0x01 |
| else: |
| flow_control_value = 0x00 |
| |
| data_bits_value = None |
| if self._bytesize == 5: |
| data_bits_value = 0x00 |
| elif self._bytesize == 6: |
| data_bits_value = 0x01 |
| elif self._bytesize == 7: |
| data_bits_value = 0x02 |
| elif self._bytesize == 8: |
| data_bits_value = 0x03 |
| else: |
| raise ValueError('Invalid char len: {!r}'.format(self._bytesize)) |
| |
| stop_bits_value = None |
| if self._stopbits == serial.STOPBITS_ONE: |
| stop_bits_value = 0x00 |
| elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: |
| stop_bits_value = 0x01 |
| elif self._stopbits == serial.STOPBITS_TWO: |
| stop_bits_value = 0x01 |
| else: |
| raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits)) |
| |
| configuration_report = struct.pack( |
| '>BLBBBB', |
| _REPORT_GETSET_UART_CONFIG, |
| self._baudrate, |
| parity_value, |
| flow_control_value, |
| data_bits_value, |
| stop_bits_value) |
| |
| self._hid_handle.send_feature_report(configuration_report) |
| |
| self._hid_handle.send_feature_report( |
| bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART))) |
| self._update_break_state() |
| |
| @property |
| def in_waiting(self): |
| return self._read_buffer.qsize() |
| |
| def reset_input_buffer(self): |
| if not self.is_open: |
| raise PortNotOpenError() |
| self._hid_handle.send_feature_report( |
| bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO))) |
| # empty read buffer |
| while self._read_buffer.qsize(): |
| self._read_buffer.get(False) |
| |
| def reset_output_buffer(self): |
| if not self.is_open: |
| raise PortNotOpenError() |
| self._hid_handle.send_feature_report( |
| bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO))) |
| |
| def _update_break_state(self): |
| if not self._hid_handle: |
| raise PortNotOpenError() |
| |
| if self._break_state: |
| self._hid_handle.send_feature_report( |
| bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0))) |
| else: |
| # Note that while AN434 states "There are no data bytes in |
| # the payload other than the Report ID", either hidapi or |
| # Linux does not seem to send the report otherwise. |
| self._hid_handle.send_feature_report( |
| bytes((_REPORT_SET_STOP_LINE_BREAK, 0))) |
| |
| def read(self, size=1): |
| if not self.is_open: |
| raise PortNotOpenError() |
| |
| data = bytearray() |
| try: |
| timeout = Timeout(self._timeout) |
| while len(data) < size: |
| if self._thread is None: |
| raise SerialException('connection failed (reader thread died)') |
| buf = self._read_buffer.get(True, timeout.time_left()) |
| if buf is None: |
| return bytes(data) |
| data += buf |
| if timeout.expired(): |
| break |
| except Queue.Empty: # -> timeout |
| pass |
| return bytes(data) |
| |
| def write(self, data): |
| if not self.is_open: |
| raise PortNotOpenError() |
| data = to_bytes(data) |
| tx_len = len(data) |
| while tx_len > 0: |
| to_be_sent = min(tx_len, 0x3F) |
| report = to_bytes([to_be_sent]) + data[:to_be_sent] |
| self._hid_handle.write(report) |
| |
| data = data[to_be_sent:] |
| tx_len = len(data) |
| |
| def _hid_read_loop(self): |
| try: |
| while self.is_open: |
| data = self._hid_handle.read(64, timeout_ms=100) |
| if not data: |
| continue |
| data_len = data.pop(0) |
| assert data_len == len(data) |
| self._read_buffer.put(bytearray(data)) |
| finally: |
| self._thread = None |