blob: 0372a189324001916d695bc00d9ca20fce09b265 [file] [log] [blame] [edit]
from __future__ import annotations
import pytest
from watchdog.utils import platform
if not platform.is_linux():
pytest.skip("GNU/Linux only.", allow_module_level=True)
import os
import random
import time
from watchdog.observers.inotify_buffer import InotifyBuffer
from .shell import mkdir, mount_tmpfs, mv, rm, touch, unmount
def wait_for_move_event(read_event):
while True:
event = read_event()
if isinstance(event, tuple) or event.is_move:
return event
@pytest.mark.timeout(5)
def test_move_from(p):
mkdir(p("dir1"))
mkdir(p("dir2"))
touch(p("dir1", "a"))
inotify = InotifyBuffer(p("dir1").encode())
mv(p("dir1", "a"), p("dir2", "b"))
event = wait_for_move_event(inotify.read_event)
assert event.is_moved_from
assert event.src_path == p("dir1", "a").encode()
inotify.close()
@pytest.mark.timeout(5)
def test_move_to(p):
mkdir(p("dir1"))
mkdir(p("dir2"))
touch(p("dir1", "a"))
inotify = InotifyBuffer(p("dir2").encode())
mv(p("dir1", "a"), p("dir2", "b"))
event = wait_for_move_event(inotify.read_event)
assert event.is_moved_to
assert event.src_path == p("dir2", "b").encode()
inotify.close()
@pytest.mark.timeout(5)
def test_move_internal(p):
mkdir(p("dir1"))
mkdir(p("dir2"))
touch(p("dir1", "a"))
inotify = InotifyBuffer(p("").encode(), recursive=True)
mv(p("dir1", "a"), p("dir2", "b"))
frm, to = wait_for_move_event(inotify.read_event)
assert frm.src_path == p("dir1", "a").encode()
assert to.src_path == p("dir2", "b").encode()
inotify.close()
@pytest.mark.timeout(10)
def test_move_internal_batch(p):
n = 100
mkdir(p("dir1"))
mkdir(p("dir2"))
files = [str(i) for i in range(n)]
for f in files:
touch(p("dir1", f))
inotify = InotifyBuffer(p("").encode(), recursive=True)
random.shuffle(files)
for f in files:
mv(p("dir1", f), p("dir2", f))
# Check that all n events are paired
for _ in range(n):
frm, to = wait_for_move_event(inotify.read_event)
assert os.path.dirname(frm.src_path).endswith(b"/dir1")
assert os.path.dirname(to.src_path).endswith(b"/dir2")
assert frm.name == to.name
inotify.close()
@pytest.mark.timeout(5)
def test_delete_watched_directory(p):
mkdir(p("dir"))
inotify = InotifyBuffer(p("dir").encode())
rm(p("dir"), recursive=True)
# Wait for the event to be picked up
inotify.read_event()
# Ensure InotifyBuffer shuts down cleanly without raising an exception
inotify.close()
@pytest.mark.timeout(5)
@pytest.mark.skipif("GITHUB_REF" not in os.environ, reason="sudo password prompt")
def test_unmount_watched_directory_filesystem(p):
mkdir(p("dir1"))
mount_tmpfs(p("dir1"))
mkdir(p("dir1/dir2"))
inotify = InotifyBuffer(p("dir1/dir2").encode())
unmount(p("dir1"))
# Wait for the event to be picked up
inotify.read_event()
# Ensure InotifyBuffer shuts down cleanly without raising an exception
inotify.close()
assert not inotify.is_alive()
def delay_call(function, seconds):
def delayed(*args, **kwargs):
time.sleep(seconds)
return function(*args, **kwargs)
return delayed
class InotifyBufferDelayedRead(InotifyBuffer):
def run(self, *args, **kwargs):
# Introduce a delay to trigger the race condition where the file descriptor is
# closed prior to a read being triggered.
self._inotify.read_events = delay_call(self._inotify.read_events, 1)
return super().run(*args, **kwargs)
@pytest.mark.parametrize(argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead])
def test_close_should_terminate_thread(p, cls):
inotify = cls(p("").encode(), recursive=True)
assert inotify.is_alive()
inotify.close()
assert not inotify.is_alive()