| # Copyright 2009 Brian Quinlan. All Rights Reserved. |
| # Licensed to PSF under a Contributor Agreement. |
| |
| """Implements ProcessPoolExecutor. |
| |
| The following diagram and text describe the data-flow through the system: |
| |
| |======================= In-process =====================|== Out-of-process ==| |
| |
| +----------+ +----------+ +--------+ +-----------+ +---------+ |
| | | => | Work Ids | | | | Call Q | | Process | |
| | | +----------+ | | +-----------+ | Pool | |
| | | | ... | | | | ... | +---------+ |
| | | | 6 | => | | => | 5, call() | => | | |
| | | | 7 | | | | ... | | | |
| | Process | | ... | | Local | +-----------+ | Process | |
| | Pool | +----------+ | Worker | | #1..n | |
| | Executor | | Thread | | | |
| | | +----------- + | | +-----------+ | | |
| | | <=> | Work Items | <=> | | <= | Result Q | <= | | |
| | | +------------+ | | +-----------+ | | |
| | | | 6: call() | | | | ... | | | |
| | | | future | | | | 4, result | | | |
| | | | ... | | | | 3, except | | | |
| +----------+ +------------+ +--------+ +-----------+ +---------+ |
| |
| Executor.submit() called: |
| - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict |
| - adds the id of the _WorkItem to the "Work Ids" queue |
| |
| Local worker thread: |
| - reads work ids from the "Work Ids" queue and looks up the corresponding |
| WorkItem from the "Work Items" dict: if the work item has been cancelled then |
| it is simply removed from the dict, otherwise it is repackaged as a |
| _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" |
| until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because |
| calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). |
| - reads _ResultItems from "Result Q", updates the future stored in the |
| "Work Items" dict and deletes the dict entry |
| |
| Process #1..n: |
| - reads _CallItems from "Call Q", executes the calls, and puts the resulting |
| _ResultItems in "Result Q" |
| """ |
| |
| __author__ = 'Brian Quinlan ([email protected])' |
| |
| import os |
| from concurrent.futures import _base |
| import queue |
| import multiprocessing as mp |
| import multiprocessing.connection |
| from multiprocessing.queues import Queue |
| import threading |
| import weakref |
| from functools import partial |
| import itertools |
| import sys |
| import traceback |
| |
| |
| _threads_wakeups = weakref.WeakKeyDictionary() |
| _global_shutdown = False |
| |
| |
| class _ThreadWakeup: |
| def __init__(self): |
| self._closed = False |
| self._reader, self._writer = mp.Pipe(duplex=False) |
| |
| def close(self): |
| if not self._closed: |
| self._closed = True |
| self._writer.close() |
| self._reader.close() |
| |
| def wakeup(self): |
| if not self._closed: |
| self._writer.send_bytes(b"") |
| |
| def clear(self): |
| if not self._closed: |
| while self._reader.poll(): |
| self._reader.recv_bytes() |
| |
| |
| def _python_exit(): |
| global _global_shutdown |
| _global_shutdown = True |
| items = list(_threads_wakeups.items()) |
| for _, thread_wakeup in items: |
| # call not protected by ProcessPoolExecutor._shutdown_lock |
| thread_wakeup.wakeup() |
| for t, _ in items: |
| t.join() |
| |
| # Register for `_python_exit()` to be called just before joining all |
| # non-daemon threads. This is used instead of `atexit.register()` for |
| # compatibility with subinterpreters, which no longer support daemon threads. |
| # See bpo-39812 for context. |
| threading._register_atexit(_python_exit) |
| |
| # Controls how many more calls than processes will be queued in the call queue. |
| # A smaller number will mean that processes spend more time idle waiting for |
| # work while a larger number will make Future.cancel() succeed less frequently |
| # (Futures in the call queue cannot be cancelled). |
| EXTRA_QUEUED_CALLS = 1 |
| |
| |
| # On Windows, WaitForMultipleObjects is used to wait for processes to finish. |
| # It can wait on, at most, 63 objects. There is an overhead of two objects: |
| # - the result queue reader |
| # - the thread wakeup reader |
| _MAX_WINDOWS_WORKERS = 63 - 2 |
| |
| # Hack to embed stringification of remote traceback in local traceback |
| |
| class _RemoteTraceback(Exception): |
| def __init__(self, tb): |
| self.tb = tb |
| def __str__(self): |
| return self.tb |
| |
| class _ExceptionWithTraceback: |
| def __init__(self, exc, tb): |
| tb = traceback.format_exception(type(exc), exc, tb) |
| tb = ''.join(tb) |
| self.exc = exc |
| self.tb = '\n"""\n%s"""' % tb |
| def __reduce__(self): |
| return _rebuild_exc, (self.exc, self.tb) |
| |
| def _rebuild_exc(exc, tb): |
| exc.__cause__ = _RemoteTraceback(tb) |
| return exc |
| |
| class _WorkItem(object): |
| def __init__(self, future, fn, args, kwargs): |
| self.future = future |
| self.fn = fn |
| self.args = args |
| self.kwargs = kwargs |
| |
| class _ResultItem(object): |
| def __init__(self, work_id, exception=None, result=None): |
| self.work_id = work_id |
| self.exception = exception |
| self.result = result |
| |
| class _CallItem(object): |
| def __init__(self, work_id, fn, args, kwargs): |
| self.work_id = work_id |
| self.fn = fn |
| self.args = args |
| self.kwargs = kwargs |
| |
| |
| class _SafeQueue(Queue): |
| """Safe Queue set exception to the future object linked to a job""" |
| def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, |
| thread_wakeup): |
| self.pending_work_items = pending_work_items |
| self.shutdown_lock = shutdown_lock |
| self.thread_wakeup = thread_wakeup |
| super().__init__(max_size, ctx=ctx) |
| |
| def _on_queue_feeder_error(self, e, obj): |
| if isinstance(obj, _CallItem): |
| tb = traceback.format_exception(type(e), e, e.__traceback__) |
| e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) |
| work_item = self.pending_work_items.pop(obj.work_id, None) |
| with self.shutdown_lock: |
| self.thread_wakeup.wakeup() |
| # work_item can be None if another process terminated. In this |
| # case, the executor_manager_thread fails all work_items |
| # with BrokenProcessPool |
| if work_item is not None: |
| work_item.future.set_exception(e) |
| else: |
| super()._on_queue_feeder_error(e, obj) |
| |
| |
| def _get_chunks(*iterables, chunksize): |
| """ Iterates over zip()ed iterables in chunks. """ |
| it = zip(*iterables) |
| while True: |
| chunk = tuple(itertools.islice(it, chunksize)) |
| if not chunk: |
| return |
| yield chunk |
| |
| |
| def _process_chunk(fn, chunk): |
| """ Processes a chunk of an iterable passed to map. |
| |
| Runs the function passed to map() on a chunk of the |
| iterable passed to map. |
| |
| This function is run in a separate process. |
| |
| """ |
| return [fn(*args) for args in chunk] |
| |
| |
| def _sendback_result(result_queue, work_id, result=None, exception=None): |
| """Safely send back the given result or exception""" |
| try: |
| result_queue.put(_ResultItem(work_id, result=result, |
| exception=exception)) |
| except BaseException as e: |
| exc = _ExceptionWithTraceback(e, e.__traceback__) |
| result_queue.put(_ResultItem(work_id, exception=exc)) |
| |
| |
| def _process_worker(call_queue, result_queue, initializer, initargs): |
| """Evaluates calls from call_queue and places the results in result_queue. |
| |
| This worker is run in a separate process. |
| |
| Args: |
| call_queue: A ctx.Queue of _CallItems that will be read and |
| evaluated by the worker. |
| result_queue: A ctx.Queue of _ResultItems that will written |
| to by the worker. |
| initializer: A callable initializer, or None |
| initargs: A tuple of args for the initializer |
| """ |
| if initializer is not None: |
| try: |
| initializer(*initargs) |
| except BaseException: |
| _base.LOGGER.critical('Exception in initializer:', exc_info=True) |
| # The parent will notice that the process stopped and |
| # mark the pool broken |
| return |
| while True: |
| call_item = call_queue.get(block=True) |
| if call_item is None: |
| # Wake up queue management thread |
| result_queue.put(os.getpid()) |
| return |
| try: |
| r = call_item.fn(*call_item.args, **call_item.kwargs) |
| except BaseException as e: |
| exc = _ExceptionWithTraceback(e, e.__traceback__) |
| _sendback_result(result_queue, call_item.work_id, exception=exc) |
| else: |
| _sendback_result(result_queue, call_item.work_id, result=r) |
| del r |
| |
| # Liberate the resource as soon as possible, to avoid holding onto |
| # open files or shared memory that is not needed anymore |
| del call_item |
| |
| |
| class _ExecutorManagerThread(threading.Thread): |
| """Manages the communication between this process and the worker processes. |
| |
| The manager is run in a local thread. |
| |
| Args: |
| executor: A reference to the ProcessPoolExecutor that owns |
| this thread. A weakref will be own by the manager as well as |
| references to internal objects used to introspect the state of |
| the executor. |
| """ |
| |
| def __init__(self, executor): |
| # Store references to necessary internals of the executor. |
| |
| # A _ThreadWakeup to allow waking up the queue_manager_thread from the |
| # main Thread and avoid deadlocks caused by permanently locked queues. |
| self.thread_wakeup = executor._executor_manager_thread_wakeup |
| self.shutdown_lock = executor._shutdown_lock |
| |
| # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used |
| # to determine if the ProcessPoolExecutor has been garbage collected |
| # and that the manager can exit. |
| # When the executor gets garbage collected, the weakref callback |
| # will wake up the queue management thread so that it can terminate |
| # if there is no pending work item. |
| def weakref_cb(_, |
| thread_wakeup=self.thread_wakeup, |
| shutdown_lock=self.shutdown_lock): |
| mp.util.debug('Executor collected: triggering callback for' |
| ' QueueManager wakeup') |
| with shutdown_lock: |
| thread_wakeup.wakeup() |
| |
| self.executor_reference = weakref.ref(executor, weakref_cb) |
| |
| # A list of the ctx.Process instances used as workers. |
| self.processes = executor._processes |
| |
| # A ctx.Queue that will be filled with _CallItems derived from |
| # _WorkItems for processing by the process workers. |
| self.call_queue = executor._call_queue |
| |
| # A ctx.SimpleQueue of _ResultItems generated by the process workers. |
| self.result_queue = executor._result_queue |
| |
| # A queue.Queue of work ids e.g. Queue([5, 6, ...]). |
| self.work_ids_queue = executor._work_ids |
| |
| # A dict mapping work ids to _WorkItems e.g. |
| # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
| self.pending_work_items = executor._pending_work_items |
| |
| super().__init__() |
| |
| def run(self): |
| # Main loop for the executor manager thread. |
| |
| while True: |
| self.add_call_item_to_queue() |
| |
| result_item, is_broken, cause = self.wait_result_broken_or_wakeup() |
| |
| if is_broken: |
| self.terminate_broken(cause) |
| return |
| if result_item is not None: |
| self.process_result_item(result_item) |
| # Delete reference to result_item to avoid keeping references |
| # while waiting on new results. |
| del result_item |
| |
| # attempt to increment idle process count |
| executor = self.executor_reference() |
| if executor is not None: |
| executor._idle_worker_semaphore.release() |
| del executor |
| |
| if self.is_shutting_down(): |
| self.flag_executor_shutting_down() |
| |
| # Since no new work items can be added, it is safe to shutdown |
| # this thread if there are no pending work items. |
| if not self.pending_work_items: |
| self.join_executor_internals() |
| return |
| |
| def add_call_item_to_queue(self): |
| # Fills call_queue with _WorkItems from pending_work_items. |
| # This function never blocks. |
| while True: |
| if self.call_queue.full(): |
| return |
| try: |
| work_id = self.work_ids_queue.get(block=False) |
| except queue.Empty: |
| return |
| else: |
| work_item = self.pending_work_items[work_id] |
| |
| if work_item.future.set_running_or_notify_cancel(): |
| self.call_queue.put(_CallItem(work_id, |
| work_item.fn, |
| work_item.args, |
| work_item.kwargs), |
| block=True) |
| else: |
| del self.pending_work_items[work_id] |
| continue |
| |
| def wait_result_broken_or_wakeup(self): |
| # Wait for a result to be ready in the result_queue while checking |
| # that all worker processes are still running, or for a wake up |
| # signal send. The wake up signals come either from new tasks being |
| # submitted, from the executor being shutdown/gc-ed, or from the |
| # shutdown of the python interpreter. |
| result_reader = self.result_queue._reader |
| assert not self.thread_wakeup._closed |
| wakeup_reader = self.thread_wakeup._reader |
| readers = [result_reader, wakeup_reader] |
| worker_sentinels = [p.sentinel for p in list(self.processes.values())] |
| ready = mp.connection.wait(readers + worker_sentinels) |
| |
| cause = None |
| is_broken = True |
| result_item = None |
| if result_reader in ready: |
| try: |
| result_item = result_reader.recv() |
| is_broken = False |
| except BaseException as e: |
| cause = traceback.format_exception(type(e), e, e.__traceback__) |
| |
| elif wakeup_reader in ready: |
| is_broken = False |
| |
| with self.shutdown_lock: |
| self.thread_wakeup.clear() |
| |
| return result_item, is_broken, cause |
| |
| def process_result_item(self, result_item): |
| # Process the received a result_item. This can be either the PID of a |
| # worker that exited gracefully or a _ResultItem |
| |
| if isinstance(result_item, int): |
| # Clean shutdown of a worker using its PID |
| # (avoids marking the executor broken) |
| assert self.is_shutting_down() |
| p = self.processes.pop(result_item) |
| p.join() |
| if not self.processes: |
| self.join_executor_internals() |
| return |
| else: |
| # Received a _ResultItem so mark the future as completed. |
| work_item = self.pending_work_items.pop(result_item.work_id, None) |
| # work_item can be None if another process terminated (see above) |
| if work_item is not None: |
| if result_item.exception: |
| work_item.future.set_exception(result_item.exception) |
| else: |
| work_item.future.set_result(result_item.result) |
| |
| def is_shutting_down(self): |
| # Check whether we should start shutting down the executor. |
| executor = self.executor_reference() |
| # No more work items can be added if: |
| # - The interpreter is shutting down OR |
| # - The executor that owns this worker has been collected OR |
| # - The executor that owns this worker has been shutdown. |
| return (_global_shutdown or executor is None |
| or executor._shutdown_thread) |
| |
| def terminate_broken(self, cause): |
| # Terminate the executor because it is in a broken state. The cause |
| # argument can be used to display more information on the error that |
| # lead the executor into becoming broken. |
| |
| # Mark the process pool broken so that submits fail right now. |
| executor = self.executor_reference() |
| if executor is not None: |
| executor._broken = ('A child process terminated ' |
| 'abruptly, the process pool is not ' |
| 'usable anymore') |
| executor._shutdown_thread = True |
| executor = None |
| |
| # All pending tasks are to be marked failed with the following |
| # BrokenProcessPool error |
| bpe = BrokenProcessPool("A process in the process pool was " |
| "terminated abruptly while the future was " |
| "running or pending.") |
| if cause is not None: |
| bpe.__cause__ = _RemoteTraceback( |
| f"\n'''\n{''.join(cause)}'''") |
| |
| # Mark pending tasks as failed. |
| for work_id, work_item in self.pending_work_items.items(): |
| work_item.future.set_exception(bpe) |
| # Delete references to object. See issue16284 |
| del work_item |
| self.pending_work_items.clear() |
| |
| # Terminate remaining workers forcibly: the queues or their |
| # locks may be in a dirty state and block forever. |
| for p in self.processes.values(): |
| p.terminate() |
| |
| # clean up resources |
| self.join_executor_internals() |
| |
| def flag_executor_shutting_down(self): |
| # Flag the executor as shutting down and cancel remaining tasks if |
| # requested as early as possible if it is not gc-ed yet. |
| executor = self.executor_reference() |
| if executor is not None: |
| executor._shutdown_thread = True |
| # Cancel pending work items if requested. |
| if executor._cancel_pending_futures: |
| # Cancel all pending futures and update pending_work_items |
| # to only have futures that are currently running. |
| new_pending_work_items = {} |
| for work_id, work_item in self.pending_work_items.items(): |
| if not work_item.future.cancel(): |
| new_pending_work_items[work_id] = work_item |
| self.pending_work_items = new_pending_work_items |
| # Drain work_ids_queue since we no longer need to |
| # add items to the call queue. |
| while True: |
| try: |
| self.work_ids_queue.get_nowait() |
| except queue.Empty: |
| break |
| # Make sure we do this only once to not waste time looping |
| # on running processes over and over. |
| executor._cancel_pending_futures = False |
| |
| def shutdown_workers(self): |
| n_children_to_stop = self.get_n_children_alive() |
| n_sentinels_sent = 0 |
| # Send the right number of sentinels, to make sure all children are |
| # properly terminated. |
| while (n_sentinels_sent < n_children_to_stop |
| and self.get_n_children_alive() > 0): |
| for i in range(n_children_to_stop - n_sentinels_sent): |
| try: |
| self.call_queue.put_nowait(None) |
| n_sentinels_sent += 1 |
| except queue.Full: |
| break |
| |
| def join_executor_internals(self): |
| self.shutdown_workers() |
| # Release the queue's resources as soon as possible. |
| self.call_queue.close() |
| self.call_queue.join_thread() |
| with self.shutdown_lock: |
| self.thread_wakeup.close() |
| # If .join() is not called on the created processes then |
| # some ctx.Queue methods may deadlock on Mac OS X. |
| for p in self.processes.values(): |
| p.join() |
| |
| def get_n_children_alive(self): |
| # This is an upper bound on the number of children alive. |
| return sum(p.is_alive() for p in self.processes.values()) |
| |
| |
| _system_limits_checked = False |
| _system_limited = None |
| |
| |
| def _check_system_limits(): |
| global _system_limits_checked, _system_limited |
| if _system_limits_checked: |
| if _system_limited: |
| raise NotImplementedError(_system_limited) |
| _system_limits_checked = True |
| try: |
| import multiprocessing.synchronize |
| except ImportError: |
| _system_limited = ( |
| "This Python build lacks multiprocessing.synchronize, usually due " |
| "to named semaphores being unavailable on this platform." |
| ) |
| raise NotImplementedError(_system_limited) |
| try: |
| nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") |
| except (AttributeError, ValueError): |
| # sysconf not available or setting not available |
| return |
| if nsems_max == -1: |
| # indetermined limit, assume that limit is determined |
| # by available memory only |
| return |
| if nsems_max >= 256: |
| # minimum number of semaphores available |
| # according to POSIX |
| return |
| _system_limited = ("system provides too few semaphores (%d" |
| " available, 256 necessary)" % nsems_max) |
| raise NotImplementedError(_system_limited) |
| |
| |
| def _chain_from_iterable_of_lists(iterable): |
| """ |
| Specialized implementation of itertools.chain.from_iterable. |
| Each item in *iterable* should be a list. This function is |
| careful not to keep references to yielded objects. |
| """ |
| for element in iterable: |
| element.reverse() |
| while element: |
| yield element.pop() |
| |
| |
| class BrokenProcessPool(_base.BrokenExecutor): |
| """ |
| Raised when a process in a ProcessPoolExecutor terminated abruptly |
| while a future was in the running state. |
| """ |
| |
| |
| class ProcessPoolExecutor(_base.Executor): |
| def __init__(self, max_workers=None, mp_context=None, |
| initializer=None, initargs=()): |
| """Initializes a new ProcessPoolExecutor instance. |
| |
| Args: |
| max_workers: The maximum number of processes that can be used to |
| execute the given calls. If None or not given then as many |
| worker processes will be created as the machine has processors. |
| mp_context: A multiprocessing context to launch the workers. This |
| object should provide SimpleQueue, Queue and Process. |
| initializer: A callable used to initialize worker processes. |
| initargs: A tuple of arguments to pass to the initializer. |
| """ |
| _check_system_limits() |
| |
| if max_workers is None: |
| self._max_workers = os.cpu_count() or 1 |
| if sys.platform == 'win32': |
| self._max_workers = min(_MAX_WINDOWS_WORKERS, |
| self._max_workers) |
| else: |
| if max_workers <= 0: |
| raise ValueError("max_workers must be greater than 0") |
| elif (sys.platform == 'win32' and |
| max_workers > _MAX_WINDOWS_WORKERS): |
| raise ValueError( |
| f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") |
| |
| self._max_workers = max_workers |
| |
| if mp_context is None: |
| mp_context = mp.get_context() |
| self._mp_context = mp_context |
| |
| if initializer is not None and not callable(initializer): |
| raise TypeError("initializer must be a callable") |
| self._initializer = initializer |
| self._initargs = initargs |
| |
| # Management thread |
| self._executor_manager_thread = None |
| |
| # Map of pids to processes |
| self._processes = {} |
| |
| # Shutdown is a two-step process. |
| self._shutdown_thread = False |
| self._shutdown_lock = threading.Lock() |
| self._idle_worker_semaphore = threading.Semaphore(0) |
| self._broken = False |
| self._queue_count = 0 |
| self._pending_work_items = {} |
| self._cancel_pending_futures = False |
| |
| # _ThreadWakeup is a communication channel used to interrupt the wait |
| # of the main loop of executor_manager_thread from another thread (e.g. |
| # when calling executor.submit or executor.shutdown). We do not use the |
| # _result_queue to send wakeup signals to the executor_manager_thread |
| # as it could result in a deadlock if a worker process dies with the |
| # _result_queue write lock still acquired. |
| # |
| # _shutdown_lock must be locked to access _ThreadWakeup. |
| self._executor_manager_thread_wakeup = _ThreadWakeup() |
| |
| # Create communication channels for the executor |
| # Make the call queue slightly larger than the number of processes to |
| # prevent the worker processes from idling. But don't make it too big |
| # because futures in the call queue cannot be cancelled. |
| queue_size = self._max_workers + EXTRA_QUEUED_CALLS |
| self._call_queue = _SafeQueue( |
| max_size=queue_size, ctx=self._mp_context, |
| pending_work_items=self._pending_work_items, |
| shutdown_lock=self._shutdown_lock, |
| thread_wakeup=self._executor_manager_thread_wakeup) |
| # Killed worker processes can produce spurious "broken pipe" |
| # tracebacks in the queue's own worker thread. But we detect killed |
| # processes anyway, so silence the tracebacks. |
| self._call_queue._ignore_epipe = True |
| self._result_queue = mp_context.SimpleQueue() |
| self._work_ids = queue.Queue() |
| |
| def _start_executor_manager_thread(self): |
| if self._executor_manager_thread is None: |
| # Start the processes so that their sentinels are known. |
| self._executor_manager_thread = _ExecutorManagerThread(self) |
| self._executor_manager_thread.start() |
| _threads_wakeups[self._executor_manager_thread] = \ |
| self._executor_manager_thread_wakeup |
| |
| def _adjust_process_count(self): |
| # if there's an idle process, we don't need to spawn a new one. |
| if self._idle_worker_semaphore.acquire(blocking=False): |
| return |
| |
| process_count = len(self._processes) |
| if process_count < self._max_workers: |
| p = self._mp_context.Process( |
| target=_process_worker, |
| args=(self._call_queue, |
| self._result_queue, |
| self._initializer, |
| self._initargs)) |
| p.start() |
| self._processes[p.pid] = p |
| |
| def submit(self, fn, /, *args, **kwargs): |
| with self._shutdown_lock: |
| if self._broken: |
| raise BrokenProcessPool(self._broken) |
| if self._shutdown_thread: |
| raise RuntimeError('cannot schedule new futures after shutdown') |
| if _global_shutdown: |
| raise RuntimeError('cannot schedule new futures after ' |
| 'interpreter shutdown') |
| |
| f = _base.Future() |
| w = _WorkItem(f, fn, args, kwargs) |
| |
| self._pending_work_items[self._queue_count] = w |
| self._work_ids.put(self._queue_count) |
| self._queue_count += 1 |
| # Wake up queue management thread |
| self._executor_manager_thread_wakeup.wakeup() |
| |
| self._adjust_process_count() |
| self._start_executor_manager_thread() |
| return f |
| submit.__doc__ = _base.Executor.submit.__doc__ |
| |
| def map(self, fn, *iterables, timeout=None, chunksize=1): |
| """Returns an iterator equivalent to map(fn, iter). |
| |
| Args: |
| fn: A callable that will take as many arguments as there are |
| passed iterables. |
| timeout: The maximum number of seconds to wait. If None, then there |
| is no limit on the wait time. |
| chunksize: If greater than one, the iterables will be chopped into |
| chunks of size chunksize and submitted to the process pool. |
| If set to one, the items in the list will be sent one at a time. |
| |
| Returns: |
| An iterator equivalent to: map(func, *iterables) but the calls may |
| be evaluated out-of-order. |
| |
| Raises: |
| TimeoutError: If the entire result iterator could not be generated |
| before the given timeout. |
| Exception: If fn(*args) raises for any values. |
| """ |
| if chunksize < 1: |
| raise ValueError("chunksize must be >= 1.") |
| |
| results = super().map(partial(_process_chunk, fn), |
| _get_chunks(*iterables, chunksize=chunksize), |
| timeout=timeout) |
| return _chain_from_iterable_of_lists(results) |
| |
| def shutdown(self, wait=True, *, cancel_futures=False): |
| with self._shutdown_lock: |
| self._cancel_pending_futures = cancel_futures |
| self._shutdown_thread = True |
| if self._executor_manager_thread_wakeup is not None: |
| # Wake up queue management thread |
| self._executor_manager_thread_wakeup.wakeup() |
| |
| if self._executor_manager_thread is not None and wait: |
| self._executor_manager_thread.join() |
| # To reduce the risk of opening too many files, remove references to |
| # objects that use file descriptors. |
| self._executor_manager_thread = None |
| self._call_queue = None |
| if self._result_queue is not None and wait: |
| self._result_queue.close() |
| self._result_queue = None |
| self._processes = None |
| self._executor_manager_thread_wakeup = None |
| |
| shutdown.__doc__ = _base.Executor.shutdown.__doc__ |