Chris Liechti | 242f8c4 | 2016-05-23 23:38:02 +0200 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # |
| 3 | # This file is part of pySerial - Cross platform serial port support for Python |
| 4 | # (C) 2016 Chris Liechti <cliechti@gmx.net> |
| 5 | # |
| 6 | # SPDX-License-Identifier: BSD-3-Clause |
| 7 | """\ |
| 8 | Test asyncio related functionality. |
| 9 | """ |
| 10 | |
Chris Liechti | c57d2bc | 2016-05-25 01:50:37 +0200 | [diff] [blame] | 11 | import os |
Chris Liechti | 242f8c4 | 2016-05-23 23:38:02 +0200 | [diff] [blame] | 12 | import unittest |
| 13 | import serial |
| 14 | |
| 15 | # on which port should the tests be performed: |
| 16 | PORT = '/dev/ttyUSB0' |
| 17 | |
| 18 | try: |
| 19 | import asyncio |
| 20 | import serial.aio |
| 21 | except (ImportError, SyntaxError): |
| 22 | # not compatible with python 2.x |
| 23 | pass |
| 24 | else: |
| 25 | |
Chris Liechti | c57d2bc | 2016-05-25 01:50:37 +0200 | [diff] [blame] | 26 | @unittest.skipIf(os.name != 'posix', "asyncio not supported on platform") |
Chris Liechti | 242f8c4 | 2016-05-23 23:38:02 +0200 | [diff] [blame] | 27 | class Test_asyncio(unittest.TestCase): |
| 28 | """Test asyncio related functionality""" |
| 29 | |
| 30 | def setUp(self): |
| 31 | self.loop = asyncio.get_event_loop() |
| 32 | # create a closed serial port |
| 33 | |
| 34 | def tearDown(self): |
| 35 | self.loop.close() |
| 36 | |
| 37 | def test_asyncio(self): |
| 38 | TEXT = b'hello world\n' |
| 39 | received = [] |
| 40 | actions = [] |
| 41 | |
| 42 | class Output(asyncio.Protocol): |
| 43 | def connection_made(self, transport): |
| 44 | self.transport = transport |
| 45 | actions.append('open') |
| 46 | transport.serial.rts = False |
| 47 | transport.write(TEXT) |
| 48 | |
| 49 | def data_received(self, data): |
| 50 | #~ print('data received', repr(data)) |
| 51 | received.append(data) |
| 52 | if b'\n' in data: |
| 53 | self.transport.close() |
| 54 | |
| 55 | def connection_lost(self, exc): |
| 56 | actions.append('close') |
| 57 | asyncio.get_event_loop().stop() |
| 58 | |
| 59 | def pause_writing(self): |
| 60 | actions.append('pause') |
| 61 | print(self.transport.get_write_buffer_size()) |
| 62 | |
| 63 | def resume_writing(self): |
| 64 | actions.append('resume') |
| 65 | print(self.transport.get_write_buffer_size()) |
| 66 | |
| 67 | coro = serial.aio.create_serial_connection(self.loop, Output, PORT, baudrate=115200) |
| 68 | self.loop.run_until_complete(coro) |
| 69 | self.loop.run_forever() |
| 70 | self.assertEqual(b''.join(received), TEXT) |
| 71 | self.assertEqual(actions, ['open', 'close']) |
| 72 | |
| 73 | |
| 74 | if __name__ == '__main__': |
| 75 | import sys |
| 76 | sys.stdout.write(__doc__) |
| 77 | if len(sys.argv) > 1: |
| 78 | PORT = sys.argv[1] |
Chris Liechti | 3debab2 | 2016-06-20 22:52:22 +0200 | [diff] [blame] | 79 | sys.stdout.write("Testing port: {!r}\n".format(PORT)) |
Chris Liechti | 242f8c4 | 2016-05-23 23:38:02 +0200 | [diff] [blame] | 80 | sys.argv[1:] = ['-v'] |
| 81 | # When this module is executed from the command-line, it runs all its tests |
| 82 | unittest.main() |