blob: b0f4ce5240e7133c871d113dff2b4338b7e688f5 [file] [log] [blame] [edit]
from __future__ import annotations
import contextlib
import gc
import os
import threading
from functools import partial
import pytest
from .utils import ExpectEvent, Helper, P, StartWatching, TestEventQueue
@pytest.fixture
def p(tmpdir, *args):
"""
Convenience function to join the temporary directory path
with the provided arguments.
"""
return partial(os.path.join, tmpdir)
@pytest.fixture(autouse=True)
def _no_thread_leaks():
"""
Fail on thread leak.
We do not use pytest-threadleak because it is not reliable.
"""
old_thread_count = threading.active_count()
yield
gc.collect() # Clear the stuff from other function-level fixtures
assert threading.active_count() == old_thread_count # Only previously existing threads
@pytest.fixture(autouse=True)
def _no_warnings(recwarn):
"""Fail on warning."""
yield
warnings = []
for warning in recwarn: # pragma: no cover
message = str(warning.message)
filename = warning.filename
if (
"Not importing directory" in message
or "Using or importing the ABCs" in message
or "dns.hash module will be removed in future versions" in message
or "is still running" in message
or "eventlet" in filename
):
continue
warnings.append(f"{warning.filename}:{warning.lineno} {warning.message}")
assert not warnings, warnings
@pytest.fixture(name="helper")
def helper_fixture(tmpdir):
with contextlib.closing(Helper(tmp=os.fspath(tmpdir))) as helper:
yield helper
@pytest.fixture(name="p")
def p_fixture(helper: Helper) -> P:
return helper.joinpath
@pytest.fixture(name="event_queue")
def event_queue_fixture(helper: Helper) -> TestEventQueue:
return helper.event_queue
@pytest.fixture(name="start_watching")
def start_watching_fixture(helper: Helper) -> StartWatching:
return helper.start_watching
@pytest.fixture(name="expect_event")
def expect_event_fixture(helper: Helper) -> ExpectEvent:
return helper.expect_event