| import errno |
| import os |
| import random |
| import selectors |
| import signal |
| import socket |
| import sys |
| from test import support |
| from test.support import os_helper |
| from test.support import socket_helper |
| from time import sleep |
| import unittest |
| import unittest.mock |
| import tempfile |
| from time import monotonic as time |
| try: |
| import resource |
| except ImportError: |
| resource = None |
| |
| |
| if support.is_emscripten or support.is_wasi: |
| raise unittest.SkipTest("Cannot create socketpair on Emscripten/WASI.") |
| |
| |
| if hasattr(socket, 'socketpair'): |
| socketpair = socket.socketpair |
| else: |
| def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): |
| with socket.socket(family, type, proto) as l: |
| l.bind((socket_helper.HOST, 0)) |
| l.listen() |
| c = socket.socket(family, type, proto) |
| try: |
| c.connect(l.getsockname()) |
| caddr = c.getsockname() |
| while True: |
| a, addr = l.accept() |
| # check that we've got the correct client |
| if addr == caddr: |
| return c, a |
| a.close() |
| except OSError: |
| c.close() |
| raise |
| |
| |
| def find_ready_matching(ready, flag): |
| match = [] |
| for key, events in ready: |
| if events & flag: |
| match.append(key.fileobj) |
| return match |
| |
| |
| class BaseSelectorTestCase: |
| |
| def make_socketpair(self): |
| rd, wr = socketpair() |
| self.addCleanup(rd.close) |
| self.addCleanup(wr.close) |
| return rd, wr |
| |
| def test_register(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| key = s.register(rd, selectors.EVENT_READ, "data") |
| self.assertIsInstance(key, selectors.SelectorKey) |
| self.assertEqual(key.fileobj, rd) |
| self.assertEqual(key.fd, rd.fileno()) |
| self.assertEqual(key.events, selectors.EVENT_READ) |
| self.assertEqual(key.data, "data") |
| |
| # register an unknown event |
| self.assertRaises(ValueError, s.register, 0, 999999) |
| |
| # register an invalid FD |
| self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ) |
| |
| # register twice |
| self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ) |
| |
| # register the same FD, but with a different object |
| self.assertRaises(KeyError, s.register, rd.fileno(), |
| selectors.EVENT_READ) |
| |
| def test_unregister(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| s.register(rd, selectors.EVENT_READ) |
| s.unregister(rd) |
| |
| # unregister an unknown file obj |
| self.assertRaises(KeyError, s.unregister, 999999) |
| |
| # unregister twice |
| self.assertRaises(KeyError, s.unregister, rd) |
| |
| def test_unregister_after_fd_close(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| rd, wr = self.make_socketpair() |
| r, w = rd.fileno(), wr.fileno() |
| s.register(r, selectors.EVENT_READ) |
| s.register(w, selectors.EVENT_WRITE) |
| rd.close() |
| wr.close() |
| s.unregister(r) |
| s.unregister(w) |
| |
| @unittest.skipUnless(os.name == 'posix', "requires posix") |
| def test_unregister_after_fd_close_and_reuse(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| rd, wr = self.make_socketpair() |
| r, w = rd.fileno(), wr.fileno() |
| s.register(r, selectors.EVENT_READ) |
| s.register(w, selectors.EVENT_WRITE) |
| rd2, wr2 = self.make_socketpair() |
| rd.close() |
| wr.close() |
| os.dup2(rd2.fileno(), r) |
| os.dup2(wr2.fileno(), w) |
| self.addCleanup(os.close, r) |
| self.addCleanup(os.close, w) |
| s.unregister(r) |
| s.unregister(w) |
| |
| def test_unregister_after_socket_close(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| rd, wr = self.make_socketpair() |
| s.register(rd, selectors.EVENT_READ) |
| s.register(wr, selectors.EVENT_WRITE) |
| rd.close() |
| wr.close() |
| s.unregister(rd) |
| s.unregister(wr) |
| |
| def test_modify(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| key = s.register(rd, selectors.EVENT_READ) |
| |
| # modify events |
| key2 = s.modify(rd, selectors.EVENT_WRITE) |
| self.assertNotEqual(key.events, key2.events) |
| self.assertEqual(key2, s.get_key(rd)) |
| |
| s.unregister(rd) |
| |
| # modify data |
| d1 = object() |
| d2 = object() |
| |
| key = s.register(rd, selectors.EVENT_READ, d1) |
| key2 = s.modify(rd, selectors.EVENT_READ, d2) |
| self.assertEqual(key.events, key2.events) |
| self.assertNotEqual(key.data, key2.data) |
| self.assertEqual(key2, s.get_key(rd)) |
| self.assertEqual(key2.data, d2) |
| |
| # modify unknown file obj |
| self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ) |
| |
| # modify use a shortcut |
| d3 = object() |
| s.register = unittest.mock.Mock() |
| s.unregister = unittest.mock.Mock() |
| |
| s.modify(rd, selectors.EVENT_READ, d3) |
| self.assertFalse(s.register.called) |
| self.assertFalse(s.unregister.called) |
| |
| def test_modify_unregister(self): |
| # Make sure the fd is unregister()ed in case of error on |
| # modify(): http://bugs.python.org/issue30014 |
| if self.SELECTOR.__name__ == 'EpollSelector': |
| patch = unittest.mock.patch( |
| 'selectors.EpollSelector._selector_cls') |
| elif self.SELECTOR.__name__ == 'PollSelector': |
| patch = unittest.mock.patch( |
| 'selectors.PollSelector._selector_cls') |
| elif self.SELECTOR.__name__ == 'DevpollSelector': |
| patch = unittest.mock.patch( |
| 'selectors.DevpollSelector._selector_cls') |
| else: |
| raise self.skipTest("") |
| |
| with patch as m: |
| m.return_value.modify = unittest.mock.Mock( |
| side_effect=ZeroDivisionError) |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| rd, wr = self.make_socketpair() |
| s.register(rd, selectors.EVENT_READ) |
| self.assertEqual(len(s._map), 1) |
| with self.assertRaises(ZeroDivisionError): |
| s.modify(rd, selectors.EVENT_WRITE) |
| self.assertEqual(len(s._map), 0) |
| |
| def test_close(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| mapping = s.get_map() |
| rd, wr = self.make_socketpair() |
| |
| s.register(rd, selectors.EVENT_READ) |
| s.register(wr, selectors.EVENT_WRITE) |
| |
| s.close() |
| self.assertRaises(RuntimeError, s.get_key, rd) |
| self.assertRaises(RuntimeError, s.get_key, wr) |
| self.assertRaises(KeyError, mapping.__getitem__, rd) |
| self.assertRaises(KeyError, mapping.__getitem__, wr) |
| |
| def test_get_key(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| key = s.register(rd, selectors.EVENT_READ, "data") |
| self.assertEqual(key, s.get_key(rd)) |
| |
| # unknown file obj |
| self.assertRaises(KeyError, s.get_key, 999999) |
| |
| def test_get_map(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| keys = s.get_map() |
| self.assertFalse(keys) |
| self.assertEqual(len(keys), 0) |
| self.assertEqual(list(keys), []) |
| key = s.register(rd, selectors.EVENT_READ, "data") |
| self.assertIn(rd, keys) |
| self.assertEqual(key, keys[rd]) |
| self.assertEqual(len(keys), 1) |
| self.assertEqual(list(keys), [rd.fileno()]) |
| self.assertEqual(list(keys.values()), [key]) |
| |
| # unknown file obj |
| with self.assertRaises(KeyError): |
| keys[999999] |
| |
| # Read-only mapping |
| with self.assertRaises(TypeError): |
| del keys[rd] |
| |
| def test_select(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| s.register(rd, selectors.EVENT_READ) |
| wr_key = s.register(wr, selectors.EVENT_WRITE) |
| |
| result = s.select() |
| for key, events in result: |
| self.assertTrue(isinstance(key, selectors.SelectorKey)) |
| self.assertTrue(events) |
| self.assertFalse(events & ~(selectors.EVENT_READ | |
| selectors.EVENT_WRITE)) |
| |
| self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result) |
| |
| def test_context_manager(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| with s as sel: |
| sel.register(rd, selectors.EVENT_READ) |
| sel.register(wr, selectors.EVENT_WRITE) |
| |
| self.assertRaises(RuntimeError, s.get_key, rd) |
| self.assertRaises(RuntimeError, s.get_key, wr) |
| |
| def test_fileno(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| if hasattr(s, 'fileno'): |
| fd = s.fileno() |
| self.assertTrue(isinstance(fd, int)) |
| self.assertGreaterEqual(fd, 0) |
| |
| def test_selector(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| NUM_SOCKETS = 12 |
| MSG = b" This is a test." |
| MSG_LEN = len(MSG) |
| readers = [] |
| writers = [] |
| r2w = {} |
| w2r = {} |
| |
| for i in range(NUM_SOCKETS): |
| rd, wr = self.make_socketpair() |
| s.register(rd, selectors.EVENT_READ) |
| s.register(wr, selectors.EVENT_WRITE) |
| readers.append(rd) |
| writers.append(wr) |
| r2w[rd] = wr |
| w2r[wr] = rd |
| |
| bufs = [] |
| |
| while writers: |
| ready = s.select() |
| ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE) |
| if not ready_writers: |
| self.fail("no sockets ready for writing") |
| wr = random.choice(ready_writers) |
| wr.send(MSG) |
| |
| for i in range(10): |
| ready = s.select() |
| ready_readers = find_ready_matching(ready, |
| selectors.EVENT_READ) |
| if ready_readers: |
| break |
| # there might be a delay between the write to the write end and |
| # the read end is reported ready |
| sleep(0.1) |
| else: |
| self.fail("no sockets ready for reading") |
| self.assertEqual([w2r[wr]], ready_readers) |
| rd = ready_readers[0] |
| buf = rd.recv(MSG_LEN) |
| self.assertEqual(len(buf), MSG_LEN) |
| bufs.append(buf) |
| s.unregister(r2w[rd]) |
| s.unregister(rd) |
| writers.remove(r2w[rd]) |
| |
| self.assertEqual(bufs, [MSG] * NUM_SOCKETS) |
| |
| @unittest.skipIf(sys.platform == 'win32', |
| 'select.select() cannot be used with empty fd sets') |
| def test_empty_select(self): |
| # Issue #23009: Make sure EpollSelector.select() works when no FD is |
| # registered. |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| self.assertEqual(s.select(timeout=0), []) |
| |
| def test_timeout(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| s.register(wr, selectors.EVENT_WRITE) |
| t = time() |
| self.assertEqual(1, len(s.select(0))) |
| self.assertEqual(1, len(s.select(-1))) |
| self.assertLess(time() - t, 0.5) |
| |
| s.unregister(wr) |
| s.register(rd, selectors.EVENT_READ) |
| t = time() |
| self.assertFalse(s.select(0)) |
| self.assertFalse(s.select(-1)) |
| self.assertLess(time() - t, 0.5) |
| |
| t0 = time() |
| self.assertFalse(s.select(1)) |
| t1 = time() |
| dt = t1 - t0 |
| # Tolerate 2.0 seconds for very slow buildbots |
| self.assertTrue(0.8 <= dt <= 2.0, dt) |
| |
| @unittest.skipUnless(hasattr(signal, "alarm"), |
| "signal.alarm() required for this test") |
| def test_select_interrupt_exc(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| class InterruptSelect(Exception): |
| pass |
| |
| def handler(*args): |
| raise InterruptSelect |
| |
| orig_alrm_handler = signal.signal(signal.SIGALRM, handler) |
| self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) |
| |
| try: |
| signal.alarm(1) |
| |
| s.register(rd, selectors.EVENT_READ) |
| t = time() |
| # select() is interrupted by a signal which raises an exception |
| with self.assertRaises(InterruptSelect): |
| s.select(30) |
| # select() was interrupted before the timeout of 30 seconds |
| self.assertLess(time() - t, 5.0) |
| finally: |
| signal.alarm(0) |
| |
| @unittest.skipUnless(hasattr(signal, "alarm"), |
| "signal.alarm() required for this test") |
| def test_select_interrupt_noraise(self): |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| rd, wr = self.make_socketpair() |
| |
| orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None) |
| self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) |
| |
| try: |
| signal.alarm(1) |
| |
| s.register(rd, selectors.EVENT_READ) |
| t = time() |
| # select() is interrupted by a signal, but the signal handler doesn't |
| # raise an exception, so select() should by retries with a recomputed |
| # timeout |
| self.assertFalse(s.select(1.5)) |
| self.assertGreaterEqual(time() - t, 1.0) |
| finally: |
| signal.alarm(0) |
| |
| |
| class ScalableSelectorMixIn: |
| |
| # see issue #18963 for why it's skipped on older OS X versions |
| @support.requires_mac_ver(10, 5) |
| @unittest.skipUnless(resource, "Test needs resource module") |
| def test_above_fd_setsize(self): |
| # A scalable implementation should have no problem with more than |
| # FD_SETSIZE file descriptors. Since we don't know the value, we just |
| # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling. |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) |
| try: |
| resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) |
| self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, |
| (soft, hard)) |
| NUM_FDS = min(hard, 2**16) |
| except (OSError, ValueError): |
| NUM_FDS = soft |
| |
| # guard for already allocated FDs (stdin, stdout...) |
| NUM_FDS -= 32 |
| |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| for i in range(NUM_FDS // 2): |
| try: |
| rd, wr = self.make_socketpair() |
| except OSError: |
| # too many FDs, skip - note that we should only catch EMFILE |
| # here, but apparently *BSD and Solaris can fail upon connect() |
| # or bind() with EADDRNOTAVAIL, so let's be safe |
| self.skipTest("FD limit reached") |
| |
| try: |
| s.register(rd, selectors.EVENT_READ) |
| s.register(wr, selectors.EVENT_WRITE) |
| except OSError as e: |
| if e.errno == errno.ENOSPC: |
| # this can be raised by epoll if we go over |
| # fs.epoll.max_user_watches sysctl |
| self.skipTest("FD limit reached") |
| raise |
| |
| try: |
| fds = s.select() |
| except OSError as e: |
| if e.errno == errno.EINVAL and sys.platform == 'darwin': |
| # unexplainable errors on macOS don't need to fail the test |
| self.skipTest("Invalid argument error calling poll()") |
| raise |
| self.assertEqual(NUM_FDS // 2, len(fds)) |
| |
| |
| class DefaultSelectorTestCase(BaseSelectorTestCase, unittest.TestCase): |
| |
| SELECTOR = selectors.DefaultSelector |
| |
| |
| class SelectSelectorTestCase(BaseSelectorTestCase, unittest.TestCase): |
| |
| SELECTOR = selectors.SelectSelector |
| |
| |
| @unittest.skipUnless(hasattr(selectors, 'PollSelector'), |
| "Test needs selectors.PollSelector") |
| class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, |
| unittest.TestCase): |
| |
| SELECTOR = getattr(selectors, 'PollSelector', None) |
| |
| |
| @unittest.skipUnless(hasattr(selectors, 'EpollSelector'), |
| "Test needs selectors.EpollSelector") |
| class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, |
| unittest.TestCase): |
| |
| SELECTOR = getattr(selectors, 'EpollSelector', None) |
| |
| def test_register_file(self): |
| # epoll(7) returns EPERM when given a file to watch |
| s = self.SELECTOR() |
| with tempfile.NamedTemporaryFile() as f: |
| with self.assertRaises(IOError): |
| s.register(f, selectors.EVENT_READ) |
| # the SelectorKey has been removed |
| with self.assertRaises(KeyError): |
| s.get_key(f) |
| |
| |
| @unittest.skipUnless(hasattr(selectors, 'KqueueSelector'), |
| "Test needs selectors.KqueueSelector)") |
| class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, |
| unittest.TestCase): |
| |
| SELECTOR = getattr(selectors, 'KqueueSelector', None) |
| |
| def test_register_bad_fd(self): |
| # a file descriptor that's been closed should raise an OSError |
| # with EBADF |
| s = self.SELECTOR() |
| bad_f = os_helper.make_bad_fd() |
| with self.assertRaises(OSError) as cm: |
| s.register(bad_f, selectors.EVENT_READ) |
| self.assertEqual(cm.exception.errno, errno.EBADF) |
| # the SelectorKey has been removed |
| with self.assertRaises(KeyError): |
| s.get_key(bad_f) |
| |
| def test_empty_select_timeout(self): |
| # Issues #23009, #29255: Make sure timeout is applied when no fds |
| # are registered. |
| s = self.SELECTOR() |
| self.addCleanup(s.close) |
| |
| t0 = time() |
| self.assertEqual(s.select(1), []) |
| t1 = time() |
| dt = t1 - t0 |
| # Tolerate 2.0 seconds for very slow buildbots |
| self.assertTrue(0.8 <= dt <= 2.0, dt) |
| |
| |
| @unittest.skipUnless(hasattr(selectors, 'DevpollSelector'), |
| "Test needs selectors.DevpollSelector") |
| class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, |
| unittest.TestCase): |
| |
| SELECTOR = getattr(selectors, 'DevpollSelector', None) |
| |
| |
| def tearDownModule(): |
| support.reap_children() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |