| # -*- coding: utf-8 -*- |
| |
| from collections import OrderedDict |
| from threading import Lock |
| from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union |
| |
| |
| class PyeeException(Exception): |
| """An exception internal to pyee.""" |
| |
| |
| Handler = TypeVar(name="Handler", bound=Callable) |
| |
| |
| class EventEmitter: |
| """The base event emitter class. All other event emitters inherit from |
| this class. |
| |
| Most events are registered with an emitter via the ``on`` and ``once`` |
| methods, and fired with the ``emit`` method. However, pyee event emitters |
| have two *special* events: |
| |
| - ``new_listener``: Fires whenever a new listener is created. Listeners for |
| this event do not fire upon their own creation. |
| |
| - ``error``: When emitted raises an Exception by default, behavior can be |
| overridden by attaching callback to the event. |
| |
| For example:: |
| |
| @ee.on('error') |
| def on_error(message): |
| logging.err(message) |
| |
| ee.emit('error', Exception('something blew up')) |
| |
| All callbacks are handled in a synchronous, blocking manner. As in node.js, |
| raised exceptions are not automatically handled for you---you must catch |
| your own exceptions, and treat them accordingly. |
| """ |
| |
| def __init__(self) -> None: |
| self._events: Dict[ |
| str, |
| "OrderedDict[Callable, Callable]", |
| ] = dict() |
| self._lock: Lock = Lock() |
| |
| def on( |
| self, event: str, f: Optional[Handler] = None |
| ) -> Union[Handler, Callable[[Handler], Handler]]: |
| """Registers the function ``f`` to the event name ``event``, if provided. |
| |
| If ``f`` isn't provided, this method calls ``EventEmitter#listens_to`, and |
| otherwise calls ``EventEmitter#add_listener``. In other words, you may either |
| use it as a decorator:: |
| |
| @ee.on('data') |
| def data_handler(data): |
| print(data) |
| |
| Or directly:: |
| |
| ee.on('data', data_handler) |
| |
| In both the decorated and undecorated forms, the event handler is |
| returned. The upshot of this is that you can call decorated handlers |
| directly, as well as use them in remove_listener calls. |
| |
| Note that this method's return type is a union type. If you are using |
| mypy or pyright, you will probably want to use either |
| ``EventEmitter#listens_to`` or ``EventEmitter#add_listener``. |
| """ |
| if f is None: |
| return self.listens_to(event) |
| else: |
| return self.add_listener(event, f) |
| |
| def listens_to(self, event: str) -> Callable[[Handler], Handler]: |
| """Returns a decorator which will register the decorated function to |
| the event name ``event``:: |
| |
| @ee.listens_to("event") |
| def data_handler(data): |
| print(data) |
| |
| By only supporting the decorator use case, this method has improved |
| type safety over ``EventEmitter#on``. |
| """ |
| |
| def on(f: Handler) -> Handler: |
| self._add_event_handler(event, f, f) |
| return f |
| |
| return on |
| |
| def add_listener(self, event: str, f: Handler) -> Handler: |
| """Register the function ``f`` to the event name ``event``:: |
| |
| def data_handler(data): |
| print(data) |
| |
| h = ee.add_listener("event", data_handler) |
| |
| By not supporting the decorator use case, this method has improved |
| type safety over ``EventEmitter#on``. |
| """ |
| self._add_event_handler(event, f, f) |
| return f |
| |
| def _add_event_handler(self, event: str, k: Callable, v: Callable): |
| # Fire 'new_listener' *before* adding the new listener! |
| self.emit("new_listener", event, k) |
| |
| # Add the necessary function |
| # Note that k and v are the same for `on` handlers, but |
| # different for `once` handlers, where v is a wrapped version |
| # of k which removes itself before calling k |
| with self._lock: |
| if event not in self._events: |
| self._events[event] = OrderedDict() |
| self._events[event][k] = v |
| |
| def _emit_run( |
| self, |
| f: Callable, |
| args: Tuple[Any, ...], |
| kwargs: Dict[str, Any], |
| ) -> None: |
| f(*args, **kwargs) |
| |
| def event_names(self) -> Set[str]: |
| """Get a set of events that this emitter is listening to.""" |
| return set(self._events.keys()) |
| |
| def _emit_handle_potential_error(self, event: str, error: Any) -> None: |
| if event == "error": |
| if isinstance(error, Exception): |
| raise error |
| else: |
| raise PyeeException(f"Uncaught, unspecified 'error' event: {error}") |
| |
| def _call_handlers( |
| self, |
| event: str, |
| args: Tuple[Any, ...], |
| kwargs: Dict[str, Any], |
| ) -> bool: |
| handled = False |
| |
| with self._lock: |
| funcs = list(self._events.get(event, OrderedDict()).values()) |
| for f in funcs: |
| self._emit_run(f, args, kwargs) |
| handled = True |
| |
| return handled |
| |
| def emit( |
| self, |
| event: str, |
| *args: Any, |
| **kwargs: Any, |
| ) -> bool: |
| """Emit ``event``, passing ``*args`` and ``**kwargs`` to each attached |
| function. Returns ``True`` if any functions are attached to ``event``; |
| otherwise returns ``False``. |
| |
| Example:: |
| |
| ee.emit('data', '00101001') |
| |
| Assuming ``data`` is an attached function, this will call |
| ``data('00101001')'``. |
| """ |
| handled = self._call_handlers(event, args, kwargs) |
| |
| if not handled: |
| self._emit_handle_potential_error(event, args[0] if args else None) |
| |
| return handled |
| |
| def once( |
| self, |
| event: str, |
| f: Callable = None, |
| ) -> Callable: |
| """The same as ``ee.on``, except that the listener is automatically |
| removed after being called. |
| """ |
| |
| def _wrapper(f: Callable) -> Callable: |
| def g( |
| *args: Any, |
| **kwargs: Any, |
| ) -> Any: |
| with self._lock: |
| # Check that the event wasn't removed already right |
| # before the lock |
| if event in self._events and f in self._events[event]: |
| self._remove_listener(event, f) |
| else: |
| return None |
| # f may return a coroutine, so we need to return that |
| # result here so that emit can schedule it |
| return f(*args, **kwargs) |
| |
| self._add_event_handler(event, f, g) |
| return f |
| |
| if f is None: |
| return _wrapper |
| else: |
| return _wrapper(f) |
| |
| def _remove_listener(self, event: str, f: Callable) -> None: |
| """Naked unprotected removal.""" |
| self._events[event].pop(f) |
| if not len(self._events[event]): |
| del self._events[event] |
| |
| def remove_listener(self, event: str, f: Callable) -> None: |
| """Removes the function ``f`` from ``event``.""" |
| with self._lock: |
| self._remove_listener(event, f) |
| |
| def remove_all_listeners(self, event: Optional[str] = None) -> None: |
| """Remove all listeners attached to ``event``. |
| If ``event`` is ``None``, remove all listeners on all events. |
| """ |
| with self._lock: |
| if event is not None: |
| self._events[event] = OrderedDict() |
| else: |
| self._events = dict() |
| |
| def listeners(self, event: str) -> List[Callable]: |
| """Returns a list of all listeners registered to the ``event``.""" |
| return list(self._events.get(event, OrderedDict()).keys()) |