blob: 5d0aa28ae68b1740a47b9823f26d4980f34234fc [file] [edit]
from __future__ import annotations
import logging
import os
import stat
import time
from queue import Empty
from typing import TYPE_CHECKING
import pytest
from watchdog.events import (
DirCreatedEvent,
DirDeletedEvent,
DirModifiedEvent,
DirMovedEvent,
FileClosedEvent,
FileClosedNoWriteEvent,
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
FileOpenedEvent,
)
from watchdog.utils import platform
from .shell import mkdir, mkfile, mv, rm, touch
if TYPE_CHECKING:
from .utils import ExpectEvent, P, StartWatching, TestEventQueue
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
if platform.is_darwin():
# enable more verbose logs
fsevents_logger = logging.getLogger("fsevents")
fsevents_logger.setLevel(logging.DEBUG)
def rerun_filter(exc, *args):
time.sleep(5)
return bool(issubclass(exc[0], Empty) and platform.is_windows())
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_create(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
start_watching()
open(p("a"), "a").close()
expect_event(FileCreatedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileOpenedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
@pytest.mark.skipif(not platform.is_linux(), reason="FileClosed*Event only supported in GNU/Linux")
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_closed(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
with open(p("a"), "a"):
start_watching()
# After file creation/open in append mode
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
event = event_queue.get(timeout=5)[0]
assert os.path.normpath(event.src_path) == os.path.normpath(p(""))
assert isinstance(event, DirModifiedEvent)
# After read-only, only IN_CLOSE_NOWRITE is emitted
open(p("a")).close()
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileOpenedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedNoWriteEvent)
assert event_queue.empty()
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
platform.is_darwin() or platform.is_windows(),
reason="Windows and macOS enforce proper encoding",
)
def test_create_wrong_encoding(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
start_watching()
open(p("a_\udce4"), "a").close()
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a_\udce4")
assert isinstance(event, FileCreatedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert os.path.normpath(event.src_path) == os.path.normpath(p(""))
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_delete(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkfile(p("a"))
start_watching()
rm(p("a"))
expect_event(FileDeletedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_modify(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkfile(p("a"))
start_watching()
touch(p("a"))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileOpenedEvent)
expect_event(FileModifiedEvent(p("a")))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_chmod(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkfile(p("a"))
start_watching()
# Note: We use S_IREAD here because chmod on Windows only
# allows setting the read-only flag.
os.chmod(p("a"), stat.S_IREAD)
expect_event(FileModifiedEvent(p("a")))
# Reset permissions to allow cleanup.
os.chmod(p("a"), stat.S_IWRITE)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching()
mv(p("dir1", "a"), p("dir2", "b"))
if not platform.is_windows():
expect_event(FileMovedEvent(p("dir1", "a"), p("dir2", "b")))
else:
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "a")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "b")
assert isinstance(event, FileCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_case_change(
p: P,
event_queue: TestEventQueue,
start_watching: StartWatching,
expect_event: ExpectEvent,
) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "file"))
start_watching()
mv(p("dir1", "file"), p("dir2", "FILE"))
if not platform.is_windows():
expect_event(FileMovedEvent(p("dir1", "file"), p("dir2", "FILE")))
else:
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "file")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "FILE")
assert isinstance(event, FileCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move_to(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(path=p("dir2"))
mv(p("dir1", "a"), p("dir2", "b"))
expect_event(FileCreatedEvent(p("dir2", "b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p("dir2")))
@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux")
def test_move_to_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(path=p("dir2"), use_full_emitter=True)
mv(p("dir1", "a"), p("dir2", "b"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, FileMovedEvent)
assert event.dest_path == p("dir2", "b")
assert event.src_path == "" # Should be blank since the path was not watched
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move_from(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(path=p("dir1"))
mv(p("dir1", "a"), p("dir2", "b"))
expect_event(FileDeletedEvent(p("dir1", "a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p("dir1")))
@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux")
def test_move_from_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(path=p("dir1"), use_full_emitter=True)
mv(p("dir1", "a"), p("dir2", "b"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, FileMovedEvent)
assert event.src_path == p("dir1", "a")
assert event.dest_path == "" # Should be blank since path not watched
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_separate_consecutive_moves(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkdir(p("dir1"))
mkfile(p("dir1", "a"))
mkfile(p("b"))
start_watching(path=p("dir1"))
mv(p("dir1", "a"), p("c"))
mv(p("b"), p("dir1", "d"))
dir_modif = DirModifiedEvent(p("dir1"))
a_deleted = FileDeletedEvent(p("dir1", "a"))
d_created = FileCreatedEvent(p("dir1", "d"))
expected_events = [a_deleted, dir_modif, d_created, dir_modif]
if platform.is_windows():
expected_events = [a_deleted, d_created]
if platform.is_bsd():
# Due to the way kqueue works, we can't really order
# 'Created' and 'Deleted' events in time, so creation queues first
expected_events = [d_created, a_deleted, dir_modif, dir_modif]
for expected_event in expected_events:
expect_event(expected_event)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test")
def test_delete_self(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
mkdir(p("dir1"))
emitter = start_watching(path=p("dir1"))
rm(p("dir1"), recursive=True)
expect_event(DirDeletedEvent(p("dir1")))
emitter.join(5)
assert not emitter.is_alive()
@pytest.mark.skipif(
platform.is_windows() or platform.is_bsd(),
reason="Windows|BSD create another set of events for this test",
)
def test_fast_subdirectory_creation_deletion(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
root_dir = p("dir1")
sub_dir = p("dir1", "subdir1")
times = 30
mkdir(root_dir)
start_watching(path=root_dir)
for _ in range(times):
mkdir(sub_dir)
rm(sub_dir, recursive=True)
time.sleep(0.1) # required for macOS emitter to catch up with us
count = {DirCreatedEvent: 0, DirModifiedEvent: 0, DirDeletedEvent: 0}
etype_for_dir = {
DirCreatedEvent: sub_dir,
DirModifiedEvent: root_dir,
DirDeletedEvent: sub_dir,
}
for _ in range(times * 4):
event = event_queue.get(timeout=5)[0]
logger.debug(event)
etype = type(event)
count[etype] += 1
assert event.src_path == etype_for_dir[etype]
assert count[DirCreatedEvent] >= count[DirDeletedEvent]
assert count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent]
assert count == {
DirCreatedEvent: times,
DirModifiedEvent: times * 2,
DirDeletedEvent: times,
}
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_passing_unicode_should_give_unicode(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
start_watching(path=str(p()))
mkfile(p("a"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event.src_path, str)
@pytest.mark.skipif(
platform.is_windows(),
reason="Windows ReadDirectoryChangesW supports only" " unicode for paths.",
)
def test_passing_bytes_should_give_bytes(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
start_watching(path=p().encode())
mkfile(p("a"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event.src_path, bytes)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_recursive_on(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
mkdir(p("dir1", "dir2", "dir3"), parents=True)
start_watching()
touch(p("dir1", "dir2", "dir3", "a"))
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileCreatedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3")
assert isinstance(event, DirModifiedEvent)
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileOpenedEvent)
if not platform.is_bsd():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_recursive_off(
p: P,
event_queue: TestEventQueue,
start_watching: StartWatching,
expect_event: ExpectEvent,
) -> None:
mkdir(p("dir1"))
start_watching(recursive=False)
touch(p("dir1", "a"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkfile(p("b"))
expect_event(FileCreatedEvent(p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
expect_event(FileOpenedEvent(p("b")))
expect_event(FileClosedEvent(p("b")))
# currently limiting these additional events to macOS only, see https://github.com/gorakhargosh/watchdog/pull/779
if platform.is_darwin():
mkdir(p("dir1", "dir2"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkfile(p("dir1", "dir2", "somefile"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkdir(p("dir3"))
expect_event(DirModifiedEvent(p())) # the contents of the parent directory changed
mv(p("dir1", "dir2", "somefile"), p("somefile"))
expect_event(FileMovedEvent(p("dir1", "dir2", "somefile"), p("somefile")))
expect_event(DirModifiedEvent(p()))
mv(p("dir1", "dir2"), p("dir2"))
expect_event(DirMovedEvent(p("dir1", "dir2"), p("dir2")))
expect_event(DirModifiedEvent(p()))
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_renaming_top_level_directory(
p: P,
event_queue: TestEventQueue,
start_watching: StartWatching,
expect_event: ExpectEvent,
) -> None:
start_watching()
mkdir(p("a"))
expect_event(DirCreatedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
mkdir(p("a", "b"))
expect_event(DirCreatedEvent(p("a", "b")))
expect_event(DirModifiedEvent(p("a")))
mv(p("a"), p("a2"))
expect_event(DirMovedEvent(p("a"), p("a2")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
expect_event(DirModifiedEvent(p()))
expect_event(DirMovedEvent(p("a", "b"), p("a2", "b"), is_synthetic=True))
if platform.is_bsd():
expect_event(DirModifiedEvent(p()))
open(p("a2", "b", "c"), "a").close()
# DirModifiedEvent may emitted, but sometimes after waiting time is out.
events = []
while True:
events.append(event_queue.get(timeout=5)[0])
if event_queue.empty():
break
assert all(
isinstance(e, (FileCreatedEvent, FileMovedEvent, FileOpenedEvent, DirModifiedEvent, FileClosedEvent))
for e in events
)
for event in events:
if isinstance(event, FileCreatedEvent):
assert event.src_path == p("a2", "b", "c")
elif isinstance(event, FileMovedEvent):
assert event.dest_path == p("a2", "b", "c")
assert event.src_path == p("a", "b", "c")
elif isinstance(event, DirModifiedEvent):
assert event.src_path == p("a2", "b")
@pytest.mark.skipif(platform.is_windows(), reason="Windows create another set of events for this test")
def test_move_nested_subdirectories(
p: P,
event_queue: TestEventQueue,
start_watching: StartWatching,
expect_event: ExpectEvent,
) -> None:
mkdir(p("dir1/dir2/dir3"), parents=True)
mkfile(p("dir1/dir2/dir3", "a"))
start_watching()
mv(p("dir1/dir2"), p("dir2"))
expect_event(DirMovedEvent(p("dir1", "dir2"), p("dir2")))
expect_event(DirModifiedEvent(p("dir1")))
expect_event(DirModifiedEvent(p()))
expect_event(DirMovedEvent(p("dir1", "dir2", "dir3"), p("dir2", "dir3"), is_synthetic=True))
expect_event(FileMovedEvent(p("dir1", "dir2", "dir3", "a"), p("dir2", "dir3", "a"), is_synthetic=True))
if platform.is_bsd():
event = event_queue.get(timeout=5)[0]
assert p(event.src_path) == p()
assert isinstance(event, DirModifiedEvent)
event = event_queue.get(timeout=5)[0]
assert p(event.src_path) == p("dir1")
assert isinstance(event, DirModifiedEvent)
touch(p("dir2/dir3", "a"))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2/dir3", "a")
assert isinstance(event, FileOpenedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2/dir3", "a")
assert isinstance(event, FileModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
not platform.is_windows(),
reason="Non-Windows create another set of events for this test",
)
def test_move_nested_subdirectories_on_windows(
p: P,
event_queue: TestEventQueue,
start_watching: StartWatching,
) -> None:
mkdir(p("dir1/dir2/dir3"), parents=True)
mkfile(p("dir1/dir2/dir3", "a"))
start_watching(path=p(""))
mv(p("dir1/dir2"), p("dir2"))
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2")
assert isinstance(event, DirCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "dir3")
assert isinstance(event, DirCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "dir3", "a")
assert isinstance(event, FileCreatedEvent)
touch(p("dir2/dir3", "a"))
events = []
while True:
events.append(event_queue.get(timeout=5)[0])
if event_queue.empty():
break
assert all(isinstance(e, (FileModifiedEvent, DirModifiedEvent)) for e in events)
for event in events:
if isinstance(event, FileModifiedEvent):
assert event.src_path == p("dir2", "dir3", "a")
elif isinstance(event, DirModifiedEvent):
assert event.src_path in [p("dir2"), p("dir2", "dir3")]
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test")
def test_file_lifecyle(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None:
start_watching()
mkfile(p("a"))
touch(p("a"))
mv(p("a"), p("b"))
rm(p("b"))
expect_event(FileCreatedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
expect_event(FileOpenedEvent(p("a")))
expect_event(FileClosedEvent(p("a")))
expect_event(DirModifiedEvent(p()))
expect_event(FileOpenedEvent(p("a")))
expect_event(FileModifiedEvent(p("a")))
if platform.is_linux():
expect_event(FileClosedEvent(p("a")))
expect_event(DirModifiedEvent(p()))
expect_event(FileMovedEvent(p("a"), p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
expect_event(DirModifiedEvent(p()))
expect_event(FileDeletedEvent(p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))