| # -*- coding: utf-8 -*- |
| |
| from concurrent.futures import Executor, Future, ThreadPoolExecutor |
| from types import TracebackType |
| from typing import Any, Callable, Dict, Optional, Tuple, Type |
| |
| from pyee.base import EventEmitter |
| |
| __all__ = ["ExecutorEventEmitter"] |
| |
| |
| class ExecutorEventEmitter(EventEmitter): |
| """An event emitter class which runs handlers in a ``concurrent.futures`` |
| executor. |
| |
| By default, this class creates a default ``ThreadPoolExecutor``, but |
| a custom executor may also be passed in explicitly to, for instance, |
| use a ``ProcessPoolExecutor`` instead. |
| |
| This class runs all emitted events on the configured executor. Errors |
| captured by the resulting Future are automatically emitted on the |
| ``error`` event. This is unlike the EventEmitter, which have no error |
| handling. |
| |
| The underlying executor may be shut down by calling the ``shutdown`` |
| method. Alternately you can treat the event emitter as a context manager:: |
| |
| with ExecutorEventEmitter() as ee: |
| # Underlying executor open |
| |
| @ee.on('data') |
| def handler(data): |
| print(data) |
| |
| ee.emit('event') |
| |
| # Underlying executor closed |
| |
| Since the function call is scheduled on an executor, emit is always |
| non-blocking. |
| |
| No effort is made to ensure thread safety, beyond using an executor. |
| """ |
| |
| def __init__(self, executor: Executor = None): |
| super(ExecutorEventEmitter, self).__init__() |
| if executor: |
| self._executor: Executor = executor |
| else: |
| self._executor = ThreadPoolExecutor() |
| |
| def _emit_run( |
| self, |
| f: Callable, |
| args: Tuple[Any, ...], |
| kwargs: Dict[str, Any], |
| ): |
| future: Future = self._executor.submit(f, *args, **kwargs) |
| |
| @future.add_done_callback |
| def _callback(f: Future) -> None: |
| exc: Optional[BaseException] = f.exception() |
| if isinstance(exc, Exception): |
| self.emit("error", exc) |
| elif exc is not None: |
| raise exc |
| |
| def shutdown(self, wait: bool = True) -> None: |
| """Call ``shutdown`` on the internal executor.""" |
| |
| self._executor.shutdown(wait=wait) |
| |
| def __enter__(self) -> "ExecutorEventEmitter": |
| return self |
| |
| def __exit__( |
| self, type: Type[Exception], value: Exception, traceback: TracebackType |
| ) -> Optional[bool]: |
| self.shutdown() |