blob: da81ab435b9a6ffcda41ebe5fdacc4e9b5836cc2 [file] [log] [blame]
Haibo Huangd8830302020-03-03 10:09:46 -08001"""Selector and proactor event loops for Windows."""
2
Yi Kong71199322022-08-30 15:53:45 +08003import sys
4
5if sys.platform != 'win32': # pragma: no cover
6 raise ImportError('win32 only')
7
Haibo Huangd8830302020-03-03 10:09:46 -08008import _overlapped
9import _winapi
10import errno
11import math
12import msvcrt
13import socket
14import struct
15import time
16import weakref
17
18from . import events
19from . import base_subprocess
20from . import futures
21from . import exceptions
22from . import proactor_events
23from . import selector_events
24from . import tasks
25from . import windows_utils
26from .log import logger
27
28
29__all__ = (
30 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32 'WindowsProactorEventLoopPolicy',
33)
34
35
36NULL = 0
37INFINITE = 0xffffffff
38ERROR_CONNECTION_REFUSED = 1225
39ERROR_CONNECTION_ABORTED = 1236
40
41# Initial delay in seconds for connect_pipe() before retrying to connect
42CONNECT_PIPE_INIT_DELAY = 0.001
43
44# Maximum delay in seconds for connect_pipe() before retrying to connect
45CONNECT_PIPE_MAX_DELAY = 0.100
46
47
48class _OverlappedFuture(futures.Future):
49 """Subclass of Future which represents an overlapped operation.
50
51 Cancelling it will immediately cancel the overlapped operation.
52 """
53
54 def __init__(self, ov, *, loop=None):
55 super().__init__(loop=loop)
56 if self._source_traceback:
57 del self._source_traceback[-1]
58 self._ov = ov
59
60 def _repr_info(self):
61 info = super()._repr_info()
62 if self._ov is not None:
63 state = 'pending' if self._ov.pending else 'completed'
64 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65 return info
66
67 def _cancel_overlapped(self):
68 if self._ov is None:
69 return
70 try:
71 self._ov.cancel()
72 except OSError as exc:
73 context = {
74 'message': 'Cancelling an overlapped future failed',
75 'exception': exc,
76 'future': self,
77 }
78 if self._source_traceback:
79 context['source_traceback'] = self._source_traceback
80 self._loop.call_exception_handler(context)
81 self._ov = None
82
Haibo Huang5eba2b42021-01-22 11:22:02 -080083 def cancel(self, msg=None):
Haibo Huangd8830302020-03-03 10:09:46 -080084 self._cancel_overlapped()
Haibo Huang5eba2b42021-01-22 11:22:02 -080085 return super().cancel(msg=msg)
Haibo Huangd8830302020-03-03 10:09:46 -080086
87 def set_exception(self, exception):
88 super().set_exception(exception)
89 self._cancel_overlapped()
90
91 def set_result(self, result):
92 super().set_result(result)
93 self._ov = None
94
95
96class _BaseWaitHandleFuture(futures.Future):
97 """Subclass of Future which represents a wait handle."""
98
99 def __init__(self, ov, handle, wait_handle, *, loop=None):
100 super().__init__(loop=loop)
101 if self._source_traceback:
102 del self._source_traceback[-1]
103 # Keep a reference to the Overlapped object to keep it alive until the
104 # wait is unregistered
105 self._ov = ov
106 self._handle = handle
107 self._wait_handle = wait_handle
108
109 # Should we call UnregisterWaitEx() if the wait completes
110 # or is cancelled?
111 self._registered = True
112
113 def _poll(self):
114 # non-blocking wait: use a timeout of 0 millisecond
115 return (_winapi.WaitForSingleObject(self._handle, 0) ==
116 _winapi.WAIT_OBJECT_0)
117
118 def _repr_info(self):
119 info = super()._repr_info()
120 info.append(f'handle={self._handle:#x}')
121 if self._handle is not None:
122 state = 'signaled' if self._poll() else 'waiting'
123 info.append(state)
124 if self._wait_handle is not None:
125 info.append(f'wait_handle={self._wait_handle:#x}')
126 return info
127
128 def _unregister_wait_cb(self, fut):
129 # The wait was unregistered: it's not safe to destroy the Overlapped
130 # object
131 self._ov = None
132
133 def _unregister_wait(self):
134 if not self._registered:
135 return
136 self._registered = False
137
138 wait_handle = self._wait_handle
139 self._wait_handle = None
140 try:
141 _overlapped.UnregisterWait(wait_handle)
142 except OSError as exc:
143 if exc.winerror != _overlapped.ERROR_IO_PENDING:
144 context = {
145 'message': 'Failed to unregister the wait handle',
146 'exception': exc,
147 'future': self,
148 }
149 if self._source_traceback:
150 context['source_traceback'] = self._source_traceback
151 self._loop.call_exception_handler(context)
152 return
153 # ERROR_IO_PENDING means that the unregister is pending
154
155 self._unregister_wait_cb(None)
156
Haibo Huang5eba2b42021-01-22 11:22:02 -0800157 def cancel(self, msg=None):
Haibo Huangd8830302020-03-03 10:09:46 -0800158 self._unregister_wait()
Haibo Huang5eba2b42021-01-22 11:22:02 -0800159 return super().cancel(msg=msg)
Haibo Huangd8830302020-03-03 10:09:46 -0800160
161 def set_exception(self, exception):
162 self._unregister_wait()
163 super().set_exception(exception)
164
165 def set_result(self, result):
166 self._unregister_wait()
167 super().set_result(result)
168
169
170class _WaitCancelFuture(_BaseWaitHandleFuture):
171 """Subclass of Future which represents a wait for the cancellation of a
172 _WaitHandleFuture using an event.
173 """
174
175 def __init__(self, ov, event, wait_handle, *, loop=None):
176 super().__init__(ov, event, wait_handle, loop=loop)
177
178 self._done_callback = None
179
180 def cancel(self):
181 raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
183 def set_result(self, result):
184 super().set_result(result)
185 if self._done_callback is not None:
186 self._done_callback(self)
187
188 def set_exception(self, exception):
189 super().set_exception(exception)
190 if self._done_callback is not None:
191 self._done_callback(self)
192
193
194class _WaitHandleFuture(_BaseWaitHandleFuture):
195 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196 super().__init__(ov, handle, wait_handle, loop=loop)
197 self._proactor = proactor
198 self._unregister_proactor = True
199 self._event = _overlapped.CreateEvent(None, True, False, None)
200 self._event_fut = None
201
202 def _unregister_wait_cb(self, fut):
203 if self._event is not None:
204 _winapi.CloseHandle(self._event)
205 self._event = None
206 self._event_fut = None
207
208 # If the wait was cancelled, the wait may never be signalled, so
209 # it's required to unregister it. Otherwise, IocpProactor.close() will
210 # wait forever for an event which will never come.
211 #
212 # If the IocpProactor already received the event, it's safe to call
213 # _unregister() because we kept a reference to the Overlapped object
214 # which is used as a unique key.
215 self._proactor._unregister(self._ov)
216 self._proactor = None
217
218 super()._unregister_wait_cb(fut)
219
220 def _unregister_wait(self):
221 if not self._registered:
222 return
223 self._registered = False
224
225 wait_handle = self._wait_handle
226 self._wait_handle = None
227 try:
228 _overlapped.UnregisterWaitEx(wait_handle, self._event)
229 except OSError as exc:
230 if exc.winerror != _overlapped.ERROR_IO_PENDING:
231 context = {
232 'message': 'Failed to unregister the wait handle',
233 'exception': exc,
234 'future': self,
235 }
236 if self._source_traceback:
237 context['source_traceback'] = self._source_traceback
238 self._loop.call_exception_handler(context)
239 return
240 # ERROR_IO_PENDING is not an error, the wait was unregistered
241
242 self._event_fut = self._proactor._wait_cancel(self._event,
243 self._unregister_wait_cb)
244
245
246class PipeServer(object):
247 """Class representing a pipe server.
248
249 This is much like a bound, listening socket.
250 """
251 def __init__(self, address):
252 self._address = address
253 self._free_instances = weakref.WeakSet()
254 # initialize the pipe attribute before calling _server_pipe_handle()
255 # because this function can raise an exception and the destructor calls
256 # the close() method
257 self._pipe = None
258 self._accept_pipe_future = None
259 self._pipe = self._server_pipe_handle(True)
260
261 def _get_unconnected_pipe(self):
262 # Create new instance and return previous one. This ensures
263 # that (until the server is closed) there is always at least
264 # one pipe handle for address. Therefore if a client attempt
265 # to connect it will not fail with FileNotFoundError.
266 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267 return tmp
268
269 def _server_pipe_handle(self, first):
270 # Return a wrapper for a new pipe handle.
271 if self.closed():
272 return None
273 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274 if first:
275 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276 h = _winapi.CreateNamedPipe(
277 self._address, flags,
278 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279 _winapi.PIPE_WAIT,
280 _winapi.PIPE_UNLIMITED_INSTANCES,
281 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283 pipe = windows_utils.PipeHandle(h)
284 self._free_instances.add(pipe)
285 return pipe
286
287 def closed(self):
288 return (self._address is None)
289
290 def close(self):
291 if self._accept_pipe_future is not None:
292 self._accept_pipe_future.cancel()
293 self._accept_pipe_future = None
294 # Close all instances which have not been connected to by a client.
295 if self._address is not None:
296 for pipe in self._free_instances:
297 pipe.close()
298 self._pipe = None
299 self._address = None
300 self._free_instances.clear()
301
302 __del__ = close
303
304
305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
306 """Windows version of selector event loop."""
307
308
309class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310 """Windows version of proactor event loop using IOCP."""
311
312 def __init__(self, proactor=None):
313 if proactor is None:
314 proactor = IocpProactor()
315 super().__init__(proactor)
316
317 def run_forever(self):
318 try:
319 assert self._self_reading_future is None
320 self.call_soon(self._loop_self_reading)
321 super().run_forever()
322 finally:
323 if self._self_reading_future is not None:
324 ov = self._self_reading_future._ov
325 self._self_reading_future.cancel()
Haibo Huang5eba2b42021-01-22 11:22:02 -0800326 # self_reading_future was just cancelled so if it hasn't been
327 # finished yet, it never will be (it's possible that it has
328 # already finished and its callback is waiting in the queue,
329 # where it could still happen if the event loop is restarted).
330 # Unregister it otherwise IocpProactor.close will wait for it
331 # forever
Haibo Huangd8830302020-03-03 10:09:46 -0800332 if ov is not None:
333 self._proactor._unregister(ov)
334 self._self_reading_future = None
335
336 async def create_pipe_connection(self, protocol_factory, address):
337 f = self._proactor.connect_pipe(address)
338 pipe = await f
339 protocol = protocol_factory()
340 trans = self._make_duplex_pipe_transport(pipe, protocol,
341 extra={'addr': address})
342 return trans, protocol
343
344 async def start_serving_pipe(self, protocol_factory, address):
345 server = PipeServer(address)
346
347 def loop_accept_pipe(f=None):
348 pipe = None
349 try:
350 if f:
351 pipe = f.result()
352 server._free_instances.discard(pipe)
353
354 if server.closed():
355 # A client connected before the server was closed:
356 # drop the client (close the pipe) and exit
357 pipe.close()
358 return
359
360 protocol = protocol_factory()
361 self._make_duplex_pipe_transport(
362 pipe, protocol, extra={'addr': address})
363
364 pipe = server._get_unconnected_pipe()
365 if pipe is None:
366 return
367
368 f = self._proactor.accept_pipe(pipe)
369 except OSError as exc:
370 if pipe and pipe.fileno() != -1:
371 self.call_exception_handler({
372 'message': 'Pipe accept failed',
373 'exception': exc,
374 'pipe': pipe,
375 })
376 pipe.close()
377 elif self._debug:
378 logger.warning("Accept pipe failed on pipe %r",
379 pipe, exc_info=True)
380 except exceptions.CancelledError:
381 if pipe:
382 pipe.close()
383 else:
384 server._accept_pipe_future = f
385 f.add_done_callback(loop_accept_pipe)
386
387 self.call_soon(loop_accept_pipe)
388 return [server]
389
390 async def _make_subprocess_transport(self, protocol, args, shell,
391 stdin, stdout, stderr, bufsize,
392 extra=None, **kwargs):
393 waiter = self.create_future()
394 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
395 stdin, stdout, stderr, bufsize,
396 waiter=waiter, extra=extra,
397 **kwargs)
398 try:
399 await waiter
400 except (SystemExit, KeyboardInterrupt):
401 raise
402 except BaseException:
403 transp.close()
404 await transp._wait()
405 raise
406
407 return transp
408
409
410class IocpProactor:
411 """Proactor implementation using IOCP."""
412
413 def __init__(self, concurrency=0xffffffff):
414 self._loop = None
415 self._results = []
416 self._iocp = _overlapped.CreateIoCompletionPort(
417 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
418 self._cache = {}
419 self._registered = weakref.WeakSet()
420 self._unregistered = []
421 self._stopped_serving = weakref.WeakSet()
422
423 def _check_closed(self):
424 if self._iocp is None:
425 raise RuntimeError('IocpProactor is closed')
426
427 def __repr__(self):
428 info = ['overlapped#=%s' % len(self._cache),
429 'result#=%s' % len(self._results)]
430 if self._iocp is None:
431 info.append('closed')
432 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
433
434 def set_loop(self, loop):
435 self._loop = loop
436
437 def select(self, timeout=None):
438 if not self._results:
439 self._poll(timeout)
440 tmp = self._results
441 self._results = []
442 return tmp
443
444 def _result(self, value):
445 fut = self._loop.create_future()
446 fut.set_result(value)
447 return fut
448
449 def recv(self, conn, nbytes, flags=0):
450 self._register_with_iocp(conn)
451 ov = _overlapped.Overlapped(NULL)
452 try:
453 if isinstance(conn, socket.socket):
454 ov.WSARecv(conn.fileno(), nbytes, flags)
455 else:
456 ov.ReadFile(conn.fileno(), nbytes)
457 except BrokenPipeError:
458 return self._result(b'')
459
460 def finish_recv(trans, key, ov):
461 try:
462 return ov.getresult()
463 except OSError as exc:
464 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
465 _overlapped.ERROR_OPERATION_ABORTED):
466 raise ConnectionResetError(*exc.args)
467 else:
468 raise
469
470 return self._register(ov, conn, finish_recv)
471
472 def recv_into(self, conn, buf, flags=0):
473 self._register_with_iocp(conn)
474 ov = _overlapped.Overlapped(NULL)
475 try:
476 if isinstance(conn, socket.socket):
477 ov.WSARecvInto(conn.fileno(), buf, flags)
478 else:
479 ov.ReadFileInto(conn.fileno(), buf)
480 except BrokenPipeError:
Haibo Huang5eba2b42021-01-22 11:22:02 -0800481 return self._result(0)
Haibo Huangd8830302020-03-03 10:09:46 -0800482
483 def finish_recv(trans, key, ov):
484 try:
485 return ov.getresult()
486 except OSError as exc:
487 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
488 _overlapped.ERROR_OPERATION_ABORTED):
489 raise ConnectionResetError(*exc.args)
490 else:
491 raise
492
493 return self._register(ov, conn, finish_recv)
494
495 def recvfrom(self, conn, nbytes, flags=0):
496 self._register_with_iocp(conn)
497 ov = _overlapped.Overlapped(NULL)
498 try:
499 ov.WSARecvFrom(conn.fileno(), nbytes, flags)
500 except BrokenPipeError:
501 return self._result((b'', None))
502
503 def finish_recv(trans, key, ov):
504 try:
505 return ov.getresult()
506 except OSError as exc:
507 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
508 _overlapped.ERROR_OPERATION_ABORTED):
509 raise ConnectionResetError(*exc.args)
510 else:
511 raise
512
513 return self._register(ov, conn, finish_recv)
514
515 def sendto(self, conn, buf, flags=0, addr=None):
516 self._register_with_iocp(conn)
517 ov = _overlapped.Overlapped(NULL)
518
519 ov.WSASendTo(conn.fileno(), buf, flags, addr)
520
521 def finish_send(trans, key, ov):
522 try:
523 return ov.getresult()
524 except OSError as exc:
525 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
526 _overlapped.ERROR_OPERATION_ABORTED):
527 raise ConnectionResetError(*exc.args)
528 else:
529 raise
530
531 return self._register(ov, conn, finish_send)
532
533 def send(self, conn, buf, flags=0):
534 self._register_with_iocp(conn)
535 ov = _overlapped.Overlapped(NULL)
536 if isinstance(conn, socket.socket):
537 ov.WSASend(conn.fileno(), buf, flags)
538 else:
539 ov.WriteFile(conn.fileno(), buf)
540
541 def finish_send(trans, key, ov):
542 try:
543 return ov.getresult()
544 except OSError as exc:
545 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
546 _overlapped.ERROR_OPERATION_ABORTED):
547 raise ConnectionResetError(*exc.args)
548 else:
549 raise
550
551 return self._register(ov, conn, finish_send)
552
553 def accept(self, listener):
554 self._register_with_iocp(listener)
555 conn = self._get_accept_socket(listener.family)
556 ov = _overlapped.Overlapped(NULL)
557 ov.AcceptEx(listener.fileno(), conn.fileno())
558
559 def finish_accept(trans, key, ov):
560 ov.getresult()
561 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
562 buf = struct.pack('@P', listener.fileno())
563 conn.setsockopt(socket.SOL_SOCKET,
564 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
565 conn.settimeout(listener.gettimeout())
566 return conn, conn.getpeername()
567
568 async def accept_coro(future, conn):
569 # Coroutine closing the accept socket if the future is cancelled
570 try:
571 await future
572 except exceptions.CancelledError:
573 conn.close()
574 raise
575
576 future = self._register(ov, listener, finish_accept)
577 coro = accept_coro(future, conn)
578 tasks.ensure_future(coro, loop=self._loop)
579 return future
580
581 def connect(self, conn, address):
582 if conn.type == socket.SOCK_DGRAM:
583 # WSAConnect will complete immediately for UDP sockets so we don't
584 # need to register any IOCP operation
585 _overlapped.WSAConnect(conn.fileno(), address)
586 fut = self._loop.create_future()
587 fut.set_result(None)
588 return fut
589
590 self._register_with_iocp(conn)
591 # The socket needs to be locally bound before we call ConnectEx().
592 try:
593 _overlapped.BindLocal(conn.fileno(), conn.family)
594 except OSError as e:
595 if e.winerror != errno.WSAEINVAL:
596 raise
597 # Probably already locally bound; check using getsockname().
598 if conn.getsockname()[1] == 0:
599 raise
600 ov = _overlapped.Overlapped(NULL)
601 ov.ConnectEx(conn.fileno(), address)
602
603 def finish_connect(trans, key, ov):
604 ov.getresult()
605 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
606 conn.setsockopt(socket.SOL_SOCKET,
607 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
608 return conn
609
610 return self._register(ov, conn, finish_connect)
611
612 def sendfile(self, sock, file, offset, count):
613 self._register_with_iocp(sock)
614 ov = _overlapped.Overlapped(NULL)
615 offset_low = offset & 0xffff_ffff
616 offset_high = (offset >> 32) & 0xffff_ffff
617 ov.TransmitFile(sock.fileno(),
618 msvcrt.get_osfhandle(file.fileno()),
619 offset_low, offset_high,
620 count, 0, 0)
621
622 def finish_sendfile(trans, key, ov):
623 try:
624 return ov.getresult()
625 except OSError as exc:
626 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
627 _overlapped.ERROR_OPERATION_ABORTED):
628 raise ConnectionResetError(*exc.args)
629 else:
630 raise
631 return self._register(ov, sock, finish_sendfile)
632
633 def accept_pipe(self, pipe):
634 self._register_with_iocp(pipe)
635 ov = _overlapped.Overlapped(NULL)
636 connected = ov.ConnectNamedPipe(pipe.fileno())
637
638 if connected:
639 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
640 # that the pipe is connected. There is no need to wait for the
641 # completion of the connection.
642 return self._result(pipe)
643
644 def finish_accept_pipe(trans, key, ov):
645 ov.getresult()
646 return pipe
647
648 return self._register(ov, pipe, finish_accept_pipe)
649
650 async def connect_pipe(self, address):
651 delay = CONNECT_PIPE_INIT_DELAY
652 while True:
653 # Unfortunately there is no way to do an overlapped connect to
654 # a pipe. Call CreateFile() in a loop until it doesn't fail with
655 # ERROR_PIPE_BUSY.
656 try:
657 handle = _overlapped.ConnectPipe(address)
658 break
659 except OSError as exc:
660 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
661 raise
662
663 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
664 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
665 await tasks.sleep(delay)
666
667 return windows_utils.PipeHandle(handle)
668
669 def wait_for_handle(self, handle, timeout=None):
670 """Wait for a handle.
671
672 Return a Future object. The result of the future is True if the wait
673 completed, or False if the wait did not complete (on timeout).
674 """
675 return self._wait_for_handle(handle, timeout, False)
676
677 def _wait_cancel(self, event, done_callback):
678 fut = self._wait_for_handle(event, None, True)
679 # add_done_callback() cannot be used because the wait may only complete
680 # in IocpProactor.close(), while the event loop is not running.
681 fut._done_callback = done_callback
682 return fut
683
684 def _wait_for_handle(self, handle, timeout, _is_cancel):
685 self._check_closed()
686
687 if timeout is None:
688 ms = _winapi.INFINITE
689 else:
690 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
691 # round away from zero to wait *at least* timeout seconds.
692 ms = math.ceil(timeout * 1e3)
693
694 # We only create ov so we can use ov.address as a key for the cache.
695 ov = _overlapped.Overlapped(NULL)
696 wait_handle = _overlapped.RegisterWaitWithQueue(
697 handle, self._iocp, ov.address, ms)
698 if _is_cancel:
699 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
700 else:
701 f = _WaitHandleFuture(ov, handle, wait_handle, self,
702 loop=self._loop)
703 if f._source_traceback:
704 del f._source_traceback[-1]
705
706 def finish_wait_for_handle(trans, key, ov):
707 # Note that this second wait means that we should only use
708 # this with handles types where a successful wait has no
709 # effect. So events or processes are all right, but locks
710 # or semaphores are not. Also note if the handle is
711 # signalled and then quickly reset, then we may return
712 # False even though we have not timed out.
713 return f._poll()
714
715 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
716 return f
717
718 def _register_with_iocp(self, obj):
719 # To get notifications of finished ops on this objects sent to the
720 # completion port, were must register the handle.
721 if obj not in self._registered:
722 self._registered.add(obj)
723 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
724 # XXX We could also use SetFileCompletionNotificationModes()
725 # to avoid sending notifications to completion port of ops
726 # that succeed immediately.
727
728 def _register(self, ov, obj, callback):
729 self._check_closed()
730
731 # Return a future which will be set with the result of the
732 # operation when it completes. The future's value is actually
733 # the value returned by callback().
734 f = _OverlappedFuture(ov, loop=self._loop)
735 if f._source_traceback:
736 del f._source_traceback[-1]
737 if not ov.pending:
738 # The operation has completed, so no need to postpone the
739 # work. We cannot take this short cut if we need the
740 # NumberOfBytes, CompletionKey values returned by
741 # PostQueuedCompletionStatus().
742 try:
743 value = callback(None, None, ov)
744 except OSError as e:
745 f.set_exception(e)
746 else:
747 f.set_result(value)
748 # Even if GetOverlappedResult() was called, we have to wait for the
749 # notification of the completion in GetQueuedCompletionStatus().
750 # Register the overlapped operation to keep a reference to the
751 # OVERLAPPED object, otherwise the memory is freed and Windows may
752 # read uninitialized memory.
753
754 # Register the overlapped operation for later. Note that
755 # we only store obj to prevent it from being garbage
756 # collected too early.
757 self._cache[ov.address] = (f, ov, obj, callback)
758 return f
759
760 def _unregister(self, ov):
761 """Unregister an overlapped object.
762
763 Call this method when its future has been cancelled. The event can
764 already be signalled (pending in the proactor event queue). It is also
765 safe if the event is never signalled (because it was cancelled).
766 """
767 self._check_closed()
768 self._unregistered.append(ov)
769
770 def _get_accept_socket(self, family):
771 s = socket.socket(family)
772 s.settimeout(0)
773 return s
774
775 def _poll(self, timeout=None):
776 if timeout is None:
777 ms = INFINITE
778 elif timeout < 0:
779 raise ValueError("negative timeout")
780 else:
781 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
782 # round away from zero to wait *at least* timeout seconds.
783 ms = math.ceil(timeout * 1e3)
784 if ms >= INFINITE:
785 raise ValueError("timeout too big")
786
787 while True:
788 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
789 if status is None:
790 break
791 ms = 0
792
793 err, transferred, key, address = status
794 try:
795 f, ov, obj, callback = self._cache.pop(address)
796 except KeyError:
797 if self._loop.get_debug():
798 self._loop.call_exception_handler({
799 'message': ('GetQueuedCompletionStatus() returned an '
800 'unexpected event'),
801 'status': ('err=%s transferred=%s key=%#x address=%#x'
802 % (err, transferred, key, address)),
803 })
804
805 # key is either zero, or it is used to return a pipe
806 # handle which should be closed to avoid a leak.
807 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
808 _winapi.CloseHandle(key)
809 continue
810
811 if obj in self._stopped_serving:
812 f.cancel()
813 # Don't call the callback if _register() already read the result or
814 # if the overlapped has been cancelled
815 elif not f.done():
816 try:
817 value = callback(transferred, key, ov)
818 except OSError as e:
819 f.set_exception(e)
820 self._results.append(f)
821 else:
822 f.set_result(value)
823 self._results.append(f)
824
825 # Remove unregistered futures
826 for ov in self._unregistered:
827 self._cache.pop(ov.address, None)
828 self._unregistered.clear()
829
830 def _stop_serving(self, obj):
831 # obj is a socket or pipe handle. It will be closed in
832 # BaseProactorEventLoop._stop_serving() which will make any
833 # pending operations fail quickly.
834 self._stopped_serving.add(obj)
835
836 def close(self):
837 if self._iocp is None:
838 # already closed
839 return
840
841 # Cancel remaining registered operations.
842 for address, (fut, ov, obj, callback) in list(self._cache.items()):
843 if fut.cancelled():
844 # Nothing to do with cancelled futures
845 pass
846 elif isinstance(fut, _WaitCancelFuture):
847 # _WaitCancelFuture must not be cancelled
848 pass
849 else:
850 try:
851 fut.cancel()
852 except OSError as exc:
853 if self._loop is not None:
854 context = {
855 'message': 'Cancelling a future failed',
856 'exception': exc,
857 'future': fut,
858 }
859 if fut._source_traceback:
860 context['source_traceback'] = fut._source_traceback
861 self._loop.call_exception_handler(context)
862
863 # Wait until all cancelled overlapped complete: don't exit with running
864 # overlapped to prevent a crash. Display progress every second if the
865 # loop is still running.
866 msg_update = 1.0
867 start_time = time.monotonic()
868 next_msg = start_time + msg_update
869 while self._cache:
870 if next_msg <= time.monotonic():
871 logger.debug('%r is running after closing for %.1f seconds',
872 self, time.monotonic() - start_time)
873 next_msg = time.monotonic() + msg_update
874
875 # handle a few events, or timeout
876 self._poll(msg_update)
877
878 self._results = []
879
880 _winapi.CloseHandle(self._iocp)
881 self._iocp = None
882
883 def __del__(self):
884 self.close()
885
886
887class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
888
889 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
890 self._proc = windows_utils.Popen(
891 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
892 bufsize=bufsize, **kwargs)
893
894 def callback(f):
895 returncode = self._proc.poll()
896 self._process_exited(returncode)
897
898 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
899 f.add_done_callback(callback)
900
901
902SelectorEventLoop = _WindowsSelectorEventLoop
903
904
905class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
906 _loop_factory = SelectorEventLoop
907
908
909class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
910 _loop_factory = ProactorEventLoop
911
912
913DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy