| #! python |
| # |
| # Base class and support functions used by various backends. |
| # |
| # This file is part of pySerial. https://github.com/pyserial/pyserial |
| # (C) 2001-2020 Chris Liechti <[email protected]> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| from __future__ import absolute_import |
| |
| import io |
| import time |
| |
| # ``memoryview`` was introduced in Python 2.7 and ``bytes(some_memoryview)`` |
| # isn't returning the contents (very unfortunate). Therefore we need special |
| # cases and test for it. Ensure that there is a ``memoryview`` object for older |
| # Python versions. This is easier than making every test dependent on its |
| # existence. |
| try: |
| memoryview |
| except (NameError, AttributeError): |
| # implementation does not matter as we do not really use it. |
| # it just must not inherit from something else we might care for. |
| class memoryview(object): # pylint: disable=redefined-builtin,invalid-name |
| pass |
| |
| try: |
| unicode |
| except (NameError, AttributeError): |
| unicode = str # for Python 3, pylint: disable=redefined-builtin,invalid-name |
| |
| try: |
| basestring |
| except (NameError, AttributeError): |
| basestring = (str,) # for Python 3, pylint: disable=redefined-builtin,invalid-name |
| |
| |
| # "for byte in data" fails for python3 as it returns ints instead of bytes |
| def iterbytes(b): |
| """Iterate over bytes, returning bytes instead of ints (python3)""" |
| if isinstance(b, memoryview): |
| b = b.tobytes() |
| i = 0 |
| while True: |
| a = b[i:i + 1] |
| i += 1 |
| if a: |
| yield a |
| else: |
| break |
| |
| |
| # all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11' |
| # so a simple ``bytes(sequence)`` doesn't work for all versions |
| def to_bytes(seq): |
| """convert a sequence to a bytes type""" |
| if isinstance(seq, bytes): |
| return seq |
| elif isinstance(seq, bytearray): |
| return bytes(seq) |
| elif isinstance(seq, memoryview): |
| return seq.tobytes() |
| elif isinstance(seq, unicode): |
| raise TypeError('unicode strings are not supported, please encode to bytes: {!r}'.format(seq)) |
| else: |
| # handle list of integers and bytes (one or more items) for Python 2 and 3 |
| return bytes(bytearray(seq)) |
| |
| |
| # create control bytes |
| XON = to_bytes([17]) |
| XOFF = to_bytes([19]) |
| |
| CR = to_bytes([13]) |
| LF = to_bytes([10]) |
| |
| |
| PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S' |
| STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2) |
| FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) |
| |
| PARITY_NAMES = { |
| PARITY_NONE: 'None', |
| PARITY_EVEN: 'Even', |
| PARITY_ODD: 'Odd', |
| PARITY_MARK: 'Mark', |
| PARITY_SPACE: 'Space', |
| } |
| |
| |
| class SerialException(IOError): |
| """Base class for serial port related exceptions.""" |
| |
| |
| class SerialTimeoutException(SerialException): |
| """Write timeouts give an exception""" |
| |
| |
| class PortNotOpenError(SerialException): |
| """Port is not open""" |
| def __init__(self): |
| super(PortNotOpenError, self).__init__('Attempting to use a port that is not open') |
| |
| |
| class Timeout(object): |
| """\ |
| Abstraction for timeout operations. Using time.monotonic() if available |
| or time.time() in all other cases. |
| |
| The class can also be initialized with 0 or None, in order to support |
| non-blocking and fully blocking I/O operations. The attributes |
| is_non_blocking and is_infinite are set accordingly. |
| """ |
| if hasattr(time, 'monotonic'): |
| # Timeout implementation with time.monotonic(). This function is only |
| # supported by Python 3.3 and above. It returns a time in seconds |
| # (float) just as time.time(), but is not affected by system clock |
| # adjustments. |
| TIME = time.monotonic |
| else: |
| # Timeout implementation with time.time(). This is compatible with all |
| # Python versions but has issues if the clock is adjusted while the |
| # timeout is running. |
| TIME = time.time |
| |
| def __init__(self, duration): |
| """Initialize a timeout with given duration""" |
| self.is_infinite = (duration is None) |
| self.is_non_blocking = (duration == 0) |
| self.duration = duration |
| if duration is not None: |
| self.target_time = self.TIME() + duration |
| else: |
| self.target_time = None |
| |
| def expired(self): |
| """Return a boolean, telling if the timeout has expired""" |
| return self.target_time is not None and self.time_left() <= 0 |
| |
| def time_left(self): |
| """Return how many seconds are left until the timeout expires""" |
| if self.is_non_blocking: |
| return 0 |
| elif self.is_infinite: |
| return None |
| else: |
| delta = self.target_time - self.TIME() |
| if delta > self.duration: |
| # clock jumped, recalculate |
| self.target_time = self.TIME() + self.duration |
| return self.duration |
| else: |
| return max(0, delta) |
| |
| def restart(self, duration): |
| """\ |
| Restart a timeout, only supported if a timeout was already set up |
| before. |
| """ |
| self.duration = duration |
| self.target_time = self.TIME() + duration |
| |
| |
| class SerialBase(io.RawIOBase): |
| """\ |
| Serial port base class. Provides __init__ function and properties to |
| get/set port settings. |
| """ |
| |
| # default values, may be overridden in subclasses that do not support all values |
| BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, |
| 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, |
| 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, |
| 3000000, 3500000, 4000000) |
| BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) |
| PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE) |
| STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) |
| |
| def __init__(self, |
| port=None, |
| baudrate=9600, |
| bytesize=EIGHTBITS, |
| parity=PARITY_NONE, |
| stopbits=STOPBITS_ONE, |
| timeout=None, |
| xonxoff=False, |
| rtscts=False, |
| write_timeout=None, |
| dsrdtr=False, |
| inter_byte_timeout=None, |
| exclusive=None, |
| **kwargs): |
| """\ |
| Initialize comm port object. If a "port" is given, then the port will be |
| opened immediately. Otherwise a Serial port object in closed state |
| is returned. |
| """ |
| |
| self.is_open = False |
| self.portstr = None |
| self.name = None |
| # correct values are assigned below through properties |
| self._port = None |
| self._baudrate = None |
| self._bytesize = None |
| self._parity = None |
| self._stopbits = None |
| self._timeout = None |
| self._write_timeout = None |
| self._xonxoff = None |
| self._rtscts = None |
| self._dsrdtr = None |
| self._inter_byte_timeout = None |
| self._rs485_mode = None # disabled by default |
| self._rts_state = True |
| self._dtr_state = True |
| self._break_state = False |
| self._exclusive = None |
| |
| # assign values using get/set methods using the properties feature |
| self.port = port |
| self.baudrate = baudrate |
| self.bytesize = bytesize |
| self.parity = parity |
| self.stopbits = stopbits |
| self.timeout = timeout |
| self.write_timeout = write_timeout |
| self.xonxoff = xonxoff |
| self.rtscts = rtscts |
| self.dsrdtr = dsrdtr |
| self.inter_byte_timeout = inter_byte_timeout |
| self.exclusive = exclusive |
| |
| # watch for backward compatible kwargs |
| if 'writeTimeout' in kwargs: |
| self.write_timeout = kwargs.pop('writeTimeout') |
| if 'interCharTimeout' in kwargs: |
| self.inter_byte_timeout = kwargs.pop('interCharTimeout') |
| if kwargs: |
| raise ValueError('unexpected keyword arguments: {!r}'.format(kwargs)) |
| |
| if port is not None: |
| self.open() |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| |
| # to be implemented by subclasses: |
| # def open(self): |
| # def close(self): |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| |
| @property |
| def port(self): |
| """\ |
| Get the current port setting. The value that was passed on init or using |
| setPort() is passed back. |
| """ |
| return self._port |
| |
| @port.setter |
| def port(self, port): |
| """\ |
| Change the port. |
| """ |
| if port is not None and not isinstance(port, basestring): |
| raise ValueError('"port" must be None or a string, not {}'.format(type(port))) |
| was_open = self.is_open |
| if was_open: |
| self.close() |
| self.portstr = port |
| self._port = port |
| self.name = self.portstr |
| if was_open: |
| self.open() |
| |
| @property |
| def baudrate(self): |
| """Get the current baud rate setting.""" |
| return self._baudrate |
| |
| @baudrate.setter |
| def baudrate(self, baudrate): |
| """\ |
| Change baud rate. It raises a ValueError if the port is open and the |
| baud rate is not possible. If the port is closed, then the value is |
| accepted and the exception is raised when the port is opened. |
| """ |
| try: |
| b = int(baudrate) |
| except TypeError: |
| raise ValueError("Not a valid baudrate: {!r}".format(baudrate)) |
| else: |
| if b < 0: |
| raise ValueError("Not a valid baudrate: {!r}".format(baudrate)) |
| self._baudrate = b |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def bytesize(self): |
| """Get the current byte size setting.""" |
| return self._bytesize |
| |
| @bytesize.setter |
| def bytesize(self, bytesize): |
| """Change byte size.""" |
| if bytesize not in self.BYTESIZES: |
| raise ValueError("Not a valid byte size: {!r}".format(bytesize)) |
| self._bytesize = bytesize |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def exclusive(self): |
| """Get the current exclusive access setting.""" |
| return self._exclusive |
| |
| @exclusive.setter |
| def exclusive(self, exclusive): |
| """Change the exclusive access setting.""" |
| self._exclusive = exclusive |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def parity(self): |
| """Get the current parity setting.""" |
| return self._parity |
| |
| @parity.setter |
| def parity(self, parity): |
| """Change parity setting.""" |
| if parity not in self.PARITIES: |
| raise ValueError("Not a valid parity: {!r}".format(parity)) |
| self._parity = parity |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def stopbits(self): |
| """Get the current stop bits setting.""" |
| return self._stopbits |
| |
| @stopbits.setter |
| def stopbits(self, stopbits): |
| """Change stop bits size.""" |
| if stopbits not in self.STOPBITS: |
| raise ValueError("Not a valid stop bit size: {!r}".format(stopbits)) |
| self._stopbits = stopbits |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def timeout(self): |
| """Get the current timeout setting.""" |
| return self._timeout |
| |
| @timeout.setter |
| def timeout(self, timeout): |
| """Change timeout setting.""" |
| if timeout is not None: |
| try: |
| timeout + 1 # test if it's a number, will throw a TypeError if not... |
| except TypeError: |
| raise ValueError("Not a valid timeout: {!r}".format(timeout)) |
| if timeout < 0: |
| raise ValueError("Not a valid timeout: {!r}".format(timeout)) |
| self._timeout = timeout |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def write_timeout(self): |
| """Get the current timeout setting.""" |
| return self._write_timeout |
| |
| @write_timeout.setter |
| def write_timeout(self, timeout): |
| """Change timeout setting.""" |
| if timeout is not None: |
| if timeout < 0: |
| raise ValueError("Not a valid timeout: {!r}".format(timeout)) |
| try: |
| timeout + 1 # test if it's a number, will throw a TypeError if not... |
| except TypeError: |
| raise ValueError("Not a valid timeout: {!r}".format(timeout)) |
| |
| self._write_timeout = timeout |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def inter_byte_timeout(self): |
| """Get the current inter-character timeout setting.""" |
| return self._inter_byte_timeout |
| |
| @inter_byte_timeout.setter |
| def inter_byte_timeout(self, ic_timeout): |
| """Change inter-byte timeout setting.""" |
| if ic_timeout is not None: |
| if ic_timeout < 0: |
| raise ValueError("Not a valid timeout: {!r}".format(ic_timeout)) |
| try: |
| ic_timeout + 1 # test if it's a number, will throw a TypeError if not... |
| except TypeError: |
| raise ValueError("Not a valid timeout: {!r}".format(ic_timeout)) |
| |
| self._inter_byte_timeout = ic_timeout |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def xonxoff(self): |
| """Get the current XON/XOFF setting.""" |
| return self._xonxoff |
| |
| @xonxoff.setter |
| def xonxoff(self, xonxoff): |
| """Change XON/XOFF setting.""" |
| self._xonxoff = xonxoff |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def rtscts(self): |
| """Get the current RTS/CTS flow control setting.""" |
| return self._rtscts |
| |
| @rtscts.setter |
| def rtscts(self, rtscts): |
| """Change RTS/CTS flow control setting.""" |
| self._rtscts = rtscts |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def dsrdtr(self): |
| """Get the current DSR/DTR flow control setting.""" |
| return self._dsrdtr |
| |
| @dsrdtr.setter |
| def dsrdtr(self, dsrdtr=None): |
| """Change DsrDtr flow control setting.""" |
| if dsrdtr is None: |
| # if not set, keep backwards compatibility and follow rtscts setting |
| self._dsrdtr = self._rtscts |
| else: |
| # if defined independently, follow its value |
| self._dsrdtr = dsrdtr |
| if self.is_open: |
| self._reconfigure_port() |
| |
| @property |
| def rts(self): |
| return self._rts_state |
| |
| @rts.setter |
| def rts(self, value): |
| self._rts_state = value |
| if self.is_open: |
| self._update_rts_state() |
| |
| @property |
| def dtr(self): |
| return self._dtr_state |
| |
| @dtr.setter |
| def dtr(self, value): |
| self._dtr_state = value |
| if self.is_open: |
| self._update_dtr_state() |
| |
| @property |
| def break_condition(self): |
| return self._break_state |
| |
| @break_condition.setter |
| def break_condition(self, value): |
| self._break_state = value |
| if self.is_open: |
| self._update_break_state() |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| # functions useful for RS-485 adapters |
| |
| @property |
| def rs485_mode(self): |
| """\ |
| Enable RS485 mode and apply new settings, set to None to disable. |
| See serial.rs485.RS485Settings for more info about the value. |
| """ |
| return self._rs485_mode |
| |
| @rs485_mode.setter |
| def rs485_mode(self, rs485_settings): |
| self._rs485_mode = rs485_settings |
| if self.is_open: |
| self._reconfigure_port() |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| |
| _SAVED_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff', |
| 'dsrdtr', 'rtscts', 'timeout', 'write_timeout', |
| 'inter_byte_timeout') |
| |
| def get_settings(self): |
| """\ |
| Get current port settings as a dictionary. For use with |
| apply_settings(). |
| """ |
| return dict([(key, getattr(self, '_' + key)) for key in self._SAVED_SETTINGS]) |
| |
| def apply_settings(self, d): |
| """\ |
| Apply stored settings from a dictionary returned from |
| get_settings(). It's allowed to delete keys from the dictionary. These |
| values will simply left unchanged. |
| """ |
| for key in self._SAVED_SETTINGS: |
| if key in d and d[key] != getattr(self, '_' + key): # check against internal "_" value |
| setattr(self, key, d[key]) # set non "_" value to use properties write function |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| |
| def __repr__(self): |
| """String representation of the current port settings and its state.""" |
| return '{name}<id=0x{id:x}, open={p.is_open}>(port={p.portstr!r}, ' \ |
| 'baudrate={p.baudrate!r}, bytesize={p.bytesize!r}, parity={p.parity!r}, ' \ |
| 'stopbits={p.stopbits!r}, timeout={p.timeout!r}, xonxoff={p.xonxoff!r}, ' \ |
| 'rtscts={p.rtscts!r}, dsrdtr={p.dsrdtr!r})'.format( |
| name=self.__class__.__name__, id=id(self), p=self) |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| # compatibility with io library |
| # pylint: disable=invalid-name,missing-docstring |
| |
| def readable(self): |
| return True |
| |
| def writable(self): |
| return True |
| |
| def seekable(self): |
| return False |
| |
| def readinto(self, b): |
| data = self.read(len(b)) |
| n = len(data) |
| try: |
| b[:n] = data |
| except TypeError as err: |
| import array |
| if not isinstance(b, array.array): |
| raise err |
| b[:n] = array.array('b', data) |
| return n |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| # context manager |
| |
| def __enter__(self): |
| if self._port is not None and not self.is_open: |
| self.open() |
| return self |
| |
| def __exit__(self, *args, **kwargs): |
| self.close() |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| |
| def send_break(self, duration=0.25): |
| """\ |
| Send break condition. Timed, returns to idle state after given |
| duration. |
| """ |
| if not self.is_open: |
| raise PortNotOpenError() |
| self.break_condition = True |
| time.sleep(duration) |
| self.break_condition = False |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| # backwards compatibility / deprecated functions |
| |
| def flushInput(self): |
| self.reset_input_buffer() |
| |
| def flushOutput(self): |
| self.reset_output_buffer() |
| |
| def inWaiting(self): |
| return self.in_waiting |
| |
| def sendBreak(self, duration=0.25): |
| self.send_break(duration) |
| |
| def setRTS(self, value=1): |
| self.rts = value |
| |
| def setDTR(self, value=1): |
| self.dtr = value |
| |
| def getCTS(self): |
| return self.cts |
| |
| def getDSR(self): |
| return self.dsr |
| |
| def getRI(self): |
| return self.ri |
| |
| def getCD(self): |
| return self.cd |
| |
| def setPort(self, port): |
| self.port = port |
| |
| @property |
| def writeTimeout(self): |
| return self.write_timeout |
| |
| @writeTimeout.setter |
| def writeTimeout(self, timeout): |
| self.write_timeout = timeout |
| |
| @property |
| def interCharTimeout(self): |
| return self.inter_byte_timeout |
| |
| @interCharTimeout.setter |
| def interCharTimeout(self, interCharTimeout): |
| self.inter_byte_timeout = interCharTimeout |
| |
| def getSettingsDict(self): |
| return self.get_settings() |
| |
| def applySettingsDict(self, d): |
| self.apply_settings(d) |
| |
| def isOpen(self): |
| return self.is_open |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - |
| # additional functionality |
| |
| def read_all(self): |
| """\ |
| Read all bytes currently available in the buffer of the OS. |
| """ |
| return self.read(self.in_waiting) |
| |
| def read_until(self, expected=LF, size=None): |
| """\ |
| Read until an expected sequence is found (line feed by default), the size |
| is exceeded or until timeout occurs. |
| """ |
| lenterm = len(expected) |
| line = bytearray() |
| timeout = Timeout(self._timeout) |
| while True: |
| c = self.read(1) |
| if c: |
| line += c |
| if line[-lenterm:] == expected: |
| break |
| if size is not None and len(line) >= size: |
| break |
| else: |
| break |
| if timeout.expired(): |
| break |
| return bytes(line) |
| |
| def iread_until(self, *args, **kwargs): |
| """\ |
| Read lines, implemented as generator. It will raise StopIteration on |
| timeout (empty read). |
| """ |
| while True: |
| line = self.read_until(*args, **kwargs) |
| if not line: |
| break |
| yield line |
| |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - - |
| if __name__ == '__main__': |
| import sys |
| s = SerialBase() |
| sys.stdout.write('port name: {}\n'.format(s.name)) |
| sys.stdout.write('baud rates: {}\n'.format(s.BAUDRATES)) |
| sys.stdout.write('byte sizes: {}\n'.format(s.BYTESIZES)) |
| sys.stdout.write('parities: {}\n'.format(s.PARITIES)) |
| sys.stdout.write('stop bits: {}\n'.format(s.STOPBITS)) |
| sys.stdout.write('{}\n'.format(s)) |