blob: 5df8ef2f358a03344f61fba0aa931f48c6d675ef [file] [log] [blame]
Chris Liechti242f8c42016-05-23 23:38:02 +02001#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <cliechti@gmx.net>
5#
6# SPDX-License-Identifier: BSD-3-Clause
7"""\
8Test asyncio related functionality.
9"""
10
Chris Liechtic57d2bc2016-05-25 01:50:37 +020011import os
Chris Liechti242f8c42016-05-23 23:38:02 +020012import unittest
13import serial
14
15# on which port should the tests be performed:
16PORT = '/dev/ttyUSB0'
17
18try:
19 import asyncio
20 import serial.aio
21except (ImportError, SyntaxError):
22 # not compatible with python 2.x
23 pass
24else:
25
Chris Liechtic57d2bc2016-05-25 01:50:37 +020026 @unittest.skipIf(os.name != 'posix', "asyncio not supported on platform")
Chris Liechti242f8c42016-05-23 23:38:02 +020027 class Test_asyncio(unittest.TestCase):
28 """Test asyncio related functionality"""
29
30 def setUp(self):
31 self.loop = asyncio.get_event_loop()
32 # create a closed serial port
33
34 def tearDown(self):
35 self.loop.close()
36
37 def test_asyncio(self):
38 TEXT = b'hello world\n'
39 received = []
40 actions = []
41
42 class Output(asyncio.Protocol):
43 def connection_made(self, transport):
44 self.transport = transport
45 actions.append('open')
46 transport.serial.rts = False
47 transport.write(TEXT)
48
49 def data_received(self, data):
50 #~ print('data received', repr(data))
51 received.append(data)
52 if b'\n' in data:
53 self.transport.close()
54
55 def connection_lost(self, exc):
56 actions.append('close')
57 asyncio.get_event_loop().stop()
58
59 def pause_writing(self):
60 actions.append('pause')
61 print(self.transport.get_write_buffer_size())
62
63 def resume_writing(self):
64 actions.append('resume')
65 print(self.transport.get_write_buffer_size())
66
67 coro = serial.aio.create_serial_connection(self.loop, Output, PORT, baudrate=115200)
68 self.loop.run_until_complete(coro)
69 self.loop.run_forever()
70 self.assertEqual(b''.join(received), TEXT)
71 self.assertEqual(actions, ['open', 'close'])
72
73
74if __name__ == '__main__':
75 import sys
76 sys.stdout.write(__doc__)
77 if len(sys.argv) > 1:
78 PORT = sys.argv[1]
Chris Liechti3debab22016-06-20 22:52:22 +020079 sys.stdout.write("Testing port: {!r}\n".format(PORT))
Chris Liechti242f8c42016-05-23 23:38:02 +020080 sys.argv[1:] = ['-v']
81 # When this module is executed from the command-line, it runs all its tests
82 unittest.main()