blob: 8b82f3a8138e5d002b756938450c311702a8ce6b [file] [log] [blame] [edit]
from __future__ import annotations
import dataclasses
import os
from queue import Queue
from typing import Protocol
from watchdog.events import FileSystemEvent
from watchdog.observers.api import EventEmitter, ObservedWatch
from watchdog.utils import platform
Emitter: type[EventEmitter]
if platform.is_linux():
from watchdog.observers.inotify import InotifyEmitter as Emitter
from watchdog.observers.inotify import InotifyFullEmitter
elif platform.is_darwin():
from watchdog.observers.fsevents import FSEventsEmitter as Emitter
elif platform.is_windows():
from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter
elif platform.is_bsd():
from watchdog.observers.kqueue import KqueueEmitter as Emitter
class P(Protocol):
def __call__(self, *args: str) -> str: ...
class StartWatching(Protocol):
def __call__(
self,
*,
path: bytes | str | None = ...,
use_full_emitter: bool = ...,
recursive: bool = ...,
) -> EventEmitter: ...
class ExpectEvent(Protocol):
def __call__(self, expected_event: FileSystemEvent, *, timeout: float = ...) -> None: ...
TestEventQueue = Queue[tuple[FileSystemEvent, ObservedWatch]]
@dataclasses.dataclass()
class Helper:
tmp: str
emitters: list[EventEmitter] = dataclasses.field(default_factory=list)
event_queue: TestEventQueue = dataclasses.field(default_factory=Queue)
def joinpath(self, *args: str) -> str:
return os.path.join(self.tmp, *args)
def start_watching(
self,
*,
path: bytes | str | None = None,
use_full_emitter: bool = False,
recursive: bool = True,
) -> EventEmitter:
# TODO: check if other platforms expect the trailing slash (e.g. `p('')`)
path = self.tmp if path is None else path
watcher = ObservedWatch(path, recursive=recursive)
emitter_cls = InotifyFullEmitter if platform.is_linux() and use_full_emitter else Emitter
emitter = emitter_cls(self.event_queue, watcher)
if platform.is_darwin():
# TODO: I think this could be better... .suppress_history should maybe
# become a common attribute.
from watchdog.observers.fsevents import FSEventsEmitter
assert isinstance(emitter, FSEventsEmitter)
emitter.suppress_history = True
self.emitters.append(emitter)
emitter.start()
return emitter
def expect_event(self, expected_event: FileSystemEvent, timeout: float = 2) -> None:
"""Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue.
Provides some robustness for the otherwise flaky nature of asynchronous notifications.
"""
assert self.event_queue.get(timeout=timeout)[0] == expected_event
def close(self) -> None:
for emitter in self.emitters:
emitter.stop()
for emitter in self.emitters:
if emitter.is_alive():
emitter.join(5)
alive = [emitter.is_alive() for emitter in self.emitters]
self.emitters = []
assert alive == [False] * len(alive)