| #!/usr/bin/env python |
| |
| # RS485 support |
| # |
| # This file is part of pySerial. https://github.com/pyserial/pyserial |
| # (C) 2015 Chris Liechti <[email protected]> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| """\ |
| The settings for RS485 are stored in a dedicated object that can be applied to |
| serial ports (where supported). |
| NOTE: Some implementations may only support a subset of the settings. |
| """ |
| |
| from __future__ import absolute_import |
| |
| import time |
| import serial |
| |
| |
| class RS485Settings(object): |
| def __init__( |
| self, |
| rts_level_for_tx=True, |
| rts_level_for_rx=False, |
| loopback=False, |
| delay_before_tx=None, |
| delay_before_rx=None): |
| self.rts_level_for_tx = rts_level_for_tx |
| self.rts_level_for_rx = rts_level_for_rx |
| self.loopback = loopback |
| self.delay_before_tx = delay_before_tx |
| self.delay_before_rx = delay_before_rx |
| |
| |
| class RS485(serial.Serial): |
| """\ |
| A subclass that replaces the write method with one that toggles RTS |
| according to the RS485 settings. |
| |
| NOTE: This may work unreliably on some serial ports (control signals not |
| synchronized or delayed compared to data). Using delays may be |
| unreliable (varying times, larger than expected) as the OS may not |
| support very fine grained delays (no smaller than in the order of |
| tens of milliseconds). |
| |
| NOTE: Some implementations support this natively. Better performance |
| can be expected when the native version is used. |
| |
| NOTE: The loopback property is ignored by this implementation. The actual |
| behavior depends on the used hardware. |
| |
| Usage: |
| |
| ser = RS485(...) |
| ser.rs485_mode = RS485Settings(...) |
| ser.write(b'hello') |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| super(RS485, self).__init__(*args, **kwargs) |
| self._alternate_rs485_settings = None |
| |
| def write(self, b): |
| """Write to port, controlling RTS before and after transmitting.""" |
| if self._alternate_rs485_settings is not None: |
| # apply level for TX and optional delay |
| self.setRTS(self._alternate_rs485_settings.rts_level_for_tx) |
| if self._alternate_rs485_settings.delay_before_tx is not None: |
| time.sleep(self._alternate_rs485_settings.delay_before_tx) |
| # write and wait for data to be written |
| super(RS485, self).write(b) |
| super(RS485, self).flush() |
| # optional delay and apply level for RX |
| if self._alternate_rs485_settings.delay_before_rx is not None: |
| time.sleep(self._alternate_rs485_settings.delay_before_rx) |
| self.setRTS(self._alternate_rs485_settings.rts_level_for_rx) |
| else: |
| super(RS485, self).write(b) |
| |
| # redirect where the property stores the settings so that underlying Serial |
| # instance does not see them |
| @property |
| def rs485_mode(self): |
| """\ |
| Enable RS485 mode and apply new settings, set to None to disable. |
| See serial.rs485.RS485Settings for more info about the value. |
| """ |
| return self._alternate_rs485_settings |
| |
| @rs485_mode.setter |
| def rs485_mode(self, rs485_settings): |
| self._alternate_rs485_settings = rs485_settings |