| from __future__ import annotations |
| |
| import os |
| import sys |
| import time |
| from unittest.mock import patch |
| |
| import pytest |
| |
| # Skip if import PyYAML failed. PyYAML missing possible because |
| # watchdog installed without watchmedo. See Installation section |
| # in README.rst |
| yaml = pytest.importorskip("yaml") |
| |
| from yaml.constructor import ConstructorError # noqa: E402 |
| from yaml.scanner import ScannerError # noqa: E402 |
| |
| from watchdog import watchmedo # noqa: E402 |
| from watchdog.events import FileModifiedEvent, FileOpenedEvent # noqa: E402 |
| from watchdog.tricks import AutoRestartTrick, ShellCommandTrick # noqa: E402 |
| from watchdog.utils import WatchdogShutdownError, platform # noqa: E402 |
| |
| |
| def test_load_config_valid(tmpdir): |
| """Verifies the load of a valid yaml file""" |
| |
| yaml_file = os.path.join(tmpdir, "config_file.yaml") |
| with open(yaml_file, "w") as f: |
| f.write("one: value\ntwo:\n- value1\n- value2\n") |
| |
| config = watchmedo.load_config(yaml_file) |
| assert isinstance(config, dict) |
| assert "one" in config |
| assert "two" in config |
| assert isinstance(config["two"], list) |
| assert config["one"] == "value" |
| assert config["two"] == ["value1", "value2"] |
| |
| |
| def test_load_config_invalid(tmpdir): |
| """Verifies if safe load avoid the execution |
| of untrusted code inside yaml files""" |
| |
| critical_dir = os.path.join(tmpdir, "critical") |
| yaml_file = os.path.join(tmpdir, "tricks_file.yaml") |
| with open(yaml_file, "w") as f: |
| content = f'one: value\nrun: !!python/object/apply:os.system ["mkdir {critical_dir}"]\n' |
| f.write(content) |
| |
| # PyYAML get_single_data() raises different exceptions for Linux and Windows |
| with pytest.raises((ConstructorError, ScannerError)): |
| watchmedo.load_config(yaml_file) |
| |
| assert not os.path.exists(critical_dir) |
| |
| |
| def make_dummy_script(tmpdir, n=10): |
| script = os.path.join(tmpdir, f"auto-test-{n}.py") |
| with open(script, "w") as f: |
| f.write('import time\nfor i in range(%d):\n\tprint("+++++ %%d" %% i, flush=True)\n\ttime.sleep(1)\n' % n) |
| return script |
| |
| |
| def test_kill_auto_restart(tmpdir, capfd): |
| script = make_dummy_script(tmpdir) |
| a = AutoRestartTrick([sys.executable, script]) |
| a.start() |
| time.sleep(3) |
| a.stop() |
| cap = capfd.readouterr() |
| assert "+++++ 0" in cap.out |
| assert "+++++ 9" not in cap.out # we killed the subprocess before the end |
| # in windows we seem to lose the subprocess stderr |
| # assert 'KeyboardInterrupt' in cap.err |
| |
| |
| def test_shell_command_wait_for_completion(tmpdir, capfd): |
| script = make_dummy_script(tmpdir, n=1) |
| command = f"{sys.executable} {script}" |
| trick = ShellCommandTrick(command, wait_for_process=True) |
| assert not trick.is_process_running() |
| start_time = time.monotonic() |
| trick.on_any_event(FileModifiedEvent("foo/bar.baz")) |
| elapsed = time.monotonic() - start_time |
| assert not trick.is_process_running() |
| assert elapsed >= 1 |
| |
| |
| def test_shell_command_subprocess_termination_nowait(tmpdir): |
| script = make_dummy_script(tmpdir, n=1) |
| command = f"{sys.executable} {script}" |
| trick = ShellCommandTrick(command, wait_for_process=False) |
| assert not trick.is_process_running() |
| trick.on_any_event(FileModifiedEvent("foo/bar.baz")) |
| assert trick.is_process_running() |
| time.sleep(5) |
| assert not trick.is_process_running() |
| |
| |
| def test_shell_command_subprocess_termination_not_happening_on_file_opened_event( |
| tmpdir, |
| ): |
| # FIXME: see issue #949, and find a way to better handle that scenario |
| script = make_dummy_script(tmpdir, n=1) |
| command = f"{sys.executable} {script}" |
| trick = ShellCommandTrick(command, wait_for_process=False) |
| assert not trick.is_process_running() |
| trick.on_any_event(FileOpenedEvent("foo/bar.baz")) |
| assert not trick.is_process_running() |
| time.sleep(5) |
| assert not trick.is_process_running() |
| |
| |
| def test_auto_restart_not_happening_on_file_opened_event(tmpdir, capfd): |
| # FIXME: see issue #949, and find a way to better handle that scenario |
| script = make_dummy_script(tmpdir, n=2) |
| trick = AutoRestartTrick([sys.executable, script]) |
| trick.start() |
| time.sleep(1) |
| trick.on_any_event(FileOpenedEvent("foo/bar.baz")) |
| trick.on_any_event(FileOpenedEvent("foo/bar2.baz")) |
| trick.on_any_event(FileOpenedEvent("foo/bar3.baz")) |
| time.sleep(1) |
| trick.stop() |
| cap = capfd.readouterr() |
| assert cap.out.splitlines(keepends=False).count("+++++ 0") == 1 |
| assert trick.restart_count == 0 |
| |
| |
| def test_auto_restart_on_file_change(tmpdir, capfd): |
| """Simulate changing 3 files. |
| |
| Expect 3 restarts. |
| """ |
| script = make_dummy_script(tmpdir, n=2) |
| trick = AutoRestartTrick([sys.executable, script]) |
| trick.start() |
| time.sleep(1) |
| trick.on_any_event(FileModifiedEvent("foo/bar.baz")) |
| trick.on_any_event(FileModifiedEvent("foo/bar2.baz")) |
| trick.on_any_event(FileModifiedEvent("foo/bar3.baz")) |
| time.sleep(1) |
| trick.stop() |
| cap = capfd.readouterr() |
| assert cap.out.splitlines(keepends=False).count("+++++ 0") >= 2 |
| assert trick.restart_count == 3 |
| |
| |
| @pytest.mark.xfail( |
| condition=platform.is_darwin() or platform.is_windows() or sys.implementation.name == "pypy", |
| reason="known to be problematic, see #973", |
| ) |
| def test_auto_restart_on_file_change_debounce(tmpdir, capfd): |
| """Simulate changing 3 files quickly and then another change later. |
| |
| Expect 2 restarts due to debouncing. |
| """ |
| script = make_dummy_script(tmpdir, n=2) |
| trick = AutoRestartTrick([sys.executable, script], debounce_interval_seconds=0.5) |
| trick.start() |
| time.sleep(1) |
| trick.on_any_event(FileModifiedEvent("foo/bar.baz")) |
| trick.on_any_event(FileModifiedEvent("foo/bar2.baz")) |
| time.sleep(0.1) |
| trick.on_any_event(FileModifiedEvent("foo/bar3.baz")) |
| time.sleep(1) |
| trick.on_any_event(FileModifiedEvent("foo/bar.baz")) |
| time.sleep(1) |
| trick.stop() |
| cap = capfd.readouterr() |
| assert cap.out.splitlines(keepends=False).count("+++++ 0") == 3 |
| assert trick.restart_count == 2 |
| |
| |
| @pytest.mark.flaky(max_runs=5, min_passes=1) |
| @pytest.mark.parametrize( |
| "restart_on_command_exit", |
| [ |
| True, |
| pytest.param( |
| False, |
| marks=pytest.mark.xfail( |
| condition=platform.is_darwin() or platform.is_windows(), |
| reason="known to be problematic, see #972", |
| ), |
| ), |
| ], |
| ) |
| def test_auto_restart_subprocess_termination(tmpdir, capfd, restart_on_command_exit): |
| """Run auto-restart with a script that terminates in about 2 seconds. |
| |
| After 5 seconds, expect it to have been restarted at least once. |
| """ |
| script = make_dummy_script(tmpdir, n=2) |
| trick = AutoRestartTrick([sys.executable, script], restart_on_command_exit=restart_on_command_exit) |
| trick.start() |
| time.sleep(5) |
| trick.stop() |
| cap = capfd.readouterr() |
| if restart_on_command_exit: |
| assert cap.out.splitlines(keepends=False).count("+++++ 0") > 1 |
| assert trick.restart_count >= 1 |
| else: |
| assert cap.out.splitlines(keepends=False).count("+++++ 0") == 1 |
| assert trick.restart_count == 0 |
| |
| |
| def test_auto_restart_arg_parsing_basic(): |
| args = watchmedo.cli.parse_args(["auto-restart", "-d", ".", "--recursive", "--debug-force-polling", "cmd"]) |
| assert args.func is watchmedo.auto_restart |
| assert args.command == "cmd" |
| assert args.directories == ["."] |
| assert args.recursive |
| assert args.debug_force_polling |
| |
| |
| def test_auto_restart_arg_parsing(): |
| args = watchmedo.cli.parse_args( |
| [ |
| "auto-restart", |
| "-d", |
| ".", |
| "--kill-after", |
| "12.5", |
| "--debounce-interval=0.2", |
| "cmd", |
| ] |
| ) |
| assert args.func is watchmedo.auto_restart |
| assert args.command == "cmd" |
| assert args.directories == ["."] |
| assert args.kill_after == pytest.approx(12.5) |
| assert args.debounce_interval == pytest.approx(0.2) |
| |
| |
| def test_shell_command_arg_parsing(): |
| args = watchmedo.cli.parse_args(["shell-command", "--command='cmd'"]) |
| assert args.command == "'cmd'" |
| |
| |
| @pytest.mark.parametrize("cmdline", [["auto-restart", "-d", ".", "cmd"], ["log", "."]]) |
| @pytest.mark.parametrize( |
| "verbosity", |
| [ |
| ([], "WARNING"), |
| (["-q"], "ERROR"), |
| (["--quiet"], "ERROR"), |
| (["-v"], "INFO"), |
| (["--verbose"], "INFO"), |
| (["-vv"], "DEBUG"), |
| (["-v", "-v"], "DEBUG"), |
| (["--verbose", "-v"], "DEBUG"), |
| ], |
| ) |
| def test_valid_verbosity(cmdline, verbosity): |
| (verbosity_cmdline_args, expected_log_level) = verbosity |
| cmd = [cmdline[0], *verbosity_cmdline_args, *cmdline[1:]] |
| args = watchmedo.cli.parse_args(cmd) |
| log_level = watchmedo._get_log_level_from_args(args) # noqa: SLF001 |
| assert log_level == expected_log_level |
| |
| |
| @pytest.mark.parametrize("cmdline", [["auto-restart", "-d", ".", "cmd"], ["log", "."]]) |
| @pytest.mark.parametrize( |
| "verbosity_cmdline_args", |
| [ |
| ["-q", "-v"], |
| ["-v", "-q"], |
| ["-qq"], |
| ["-q", "-q"], |
| ["--quiet", "--quiet"], |
| ["--quiet", "-q"], |
| ["-vvv"], |
| ["-vvvv"], |
| ["-v", "-v", "-v"], |
| ["-vv", "-v"], |
| ["--verbose", "-vv"], |
| ], |
| ) |
| def test_invalid_verbosity(cmdline, verbosity_cmdline_args): |
| cmd = [cmdline[0], *verbosity_cmdline_args, *cmdline[1:]] |
| with pytest.raises((watchmedo.LogLevelError, SystemExit)): # noqa: PT012 |
| args = watchmedo.cli.parse_args(cmd) |
| watchmedo._get_log_level_from_args(args) # noqa: SLF001 |
| |
| |
| @pytest.mark.parametrize("command", ["tricks-from", "tricks"]) |
| def test_tricks_from_file(command, tmp_path): |
| tricks_file = tmp_path / "tricks.yaml" |
| tricks_file.write_text( |
| """ |
| tricks: |
| - watchdog.tricks.LoggerTrick: |
| patterns: ["*.py", "*.js"] |
| """ |
| ) |
| args = watchmedo.cli.parse_args([command, str(tricks_file)]) |
| |
| checkpoint = False |
| |
| def mocked_sleep(_): |
| nonlocal checkpoint |
| checkpoint = True |
| raise WatchdogShutdownError |
| |
| with patch("time.sleep", mocked_sleep): |
| watchmedo.tricks_from(args) |
| assert checkpoint |