| import collections.abc |
| import contextlib |
| import errno |
| import os |
| import re |
| import stat |
| import sys |
| import time |
| import unittest |
| import warnings |
| |
| |
| # Filename used for testing |
| if os.name == 'java': |
| # Jython disallows @ in module names |
| TESTFN_ASCII = '$test' |
| else: |
| TESTFN_ASCII = '@test' |
| |
| # Disambiguate TESTFN for parallel testing, while letting it remain a valid |
| # module name. |
| TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) |
| |
| # TESTFN_UNICODE is a non-ascii filename |
| TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" |
| if sys.platform == 'darwin': |
| # In Mac OS X's VFS API file names are, by definition, canonically |
| # decomposed Unicode, encoded using UTF-8. See QA1173: |
| # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html |
| import unicodedata |
| TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) |
| |
| # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be |
| # encoded by the filesystem encoding (in strict mode). It can be None if we |
| # cannot generate such filename. |
| TESTFN_UNENCODABLE = None |
| if os.name == 'nt': |
| # skip win32s (0) or Windows 9x/ME (1) |
| if sys.getwindowsversion().platform >= 2: |
| # Different kinds of characters from various languages to minimize the |
| # probability that the whole name is encodable to MBCS (issue #9819) |
| TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" |
| try: |
| TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding()) |
| except UnicodeEncodeError: |
| pass |
| else: |
| print('WARNING: The filename %r CAN be encoded by the filesystem ' |
| 'encoding (%s). Unicode filename tests may not be effective' |
| % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) |
| TESTFN_UNENCODABLE = None |
| # macOS and Emscripten deny unencodable filenames (invalid utf-8) |
| elif sys.platform not in {'darwin', 'emscripten', 'wasi'}: |
| try: |
| # ascii and utf-8 cannot encode the byte 0xff |
| b'\xff'.decode(sys.getfilesystemencoding()) |
| except UnicodeDecodeError: |
| # 0xff will be encoded using the surrogate character u+DCFF |
| TESTFN_UNENCODABLE = TESTFN_ASCII \ |
| + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape') |
| else: |
| # File system encoding (eg. ISO-8859-* encodings) can encode |
| # the byte 0xff. Skip some unicode filename tests. |
| pass |
| |
| # FS_NONASCII: non-ASCII character encodable by os.fsencode(), |
| # or an empty string if there is no such character. |
| FS_NONASCII = '' |
| for character in ( |
| # First try printable and common characters to have a readable filename. |
| # For each character, the encoding list are just example of encodings able |
| # to encode the character (the list is not exhaustive). |
| |
| # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 |
| '\u00E6', |
| # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 |
| '\u0130', |
| # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 |
| '\u0141', |
| # U+03C6 (Greek Small Letter Phi): cp1253 |
| '\u03C6', |
| # U+041A (Cyrillic Capital Letter Ka): cp1251 |
| '\u041A', |
| # U+05D0 (Hebrew Letter Alef): Encodable to cp424 |
| '\u05D0', |
| # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic |
| '\u060C', |
| # U+062A (Arabic Letter Teh): cp720 |
| '\u062A', |
| # U+0E01 (Thai Character Ko Kai): cp874 |
| '\u0E01', |
| |
| # Then try more "special" characters. "special" because they may be |
| # interpreted or displayed differently depending on the exact locale |
| # encoding and the font. |
| |
| # U+00A0 (No-Break Space) |
| '\u00A0', |
| # U+20AC (Euro Sign) |
| '\u20AC', |
| ): |
| try: |
| # If Python is set up to use the legacy 'mbcs' in Windows, |
| # 'replace' error mode is used, and encode() returns b'?' |
| # for characters missing in the ANSI codepage |
| if os.fsdecode(os.fsencode(character)) != character: |
| raise UnicodeError |
| except UnicodeError: |
| pass |
| else: |
| FS_NONASCII = character |
| break |
| |
| # Save the initial cwd |
| SAVEDCWD = os.getcwd() |
| |
| # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be |
| # decoded from the filesystem encoding (in strict mode). It can be None if we |
| # cannot generate such filename (ex: the latin1 encoding can decode any byte |
| # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks |
| # to the surrogateescape error handler (PEP 383), but not from the filesystem |
| # encoding in strict mode. |
| TESTFN_UNDECODABLE = None |
| for name in ( |
| # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows |
| # accepts it to create a file or a directory, or don't accept to enter to |
| # such directory (when the bytes name is used). So test b'\xe7' first: |
| # it is not decodable from cp932. |
| b'\xe7w\xf0', |
| # undecodable from ASCII, UTF-8 |
| b'\xff', |
| # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 |
| # and cp857 |
| b'\xae\xd5' |
| # undecodable from UTF-8 (UNIX and Mac OS X) |
| b'\xed\xb2\x80', b'\xed\xb4\x80', |
| # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, |
| # cp1253, cp1254, cp1255, cp1257, cp1258 |
| b'\x81\x98', |
| ): |
| try: |
| name.decode(sys.getfilesystemencoding()) |
| except UnicodeDecodeError: |
| TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name |
| break |
| |
| if FS_NONASCII: |
| TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII |
| else: |
| TESTFN_NONASCII = None |
| TESTFN = TESTFN_NONASCII or TESTFN_ASCII |
| |
| |
| def make_bad_fd(): |
| """ |
| Create an invalid file descriptor by opening and closing a file and return |
| its fd. |
| """ |
| file = open(TESTFN, "wb") |
| try: |
| return file.fileno() |
| finally: |
| file.close() |
| unlink(TESTFN) |
| |
| |
| _can_symlink = None |
| |
| |
| def can_symlink(): |
| global _can_symlink |
| if _can_symlink is not None: |
| return _can_symlink |
| # WASI / wasmtime prevents symlinks with absolute paths, see man |
| # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute |
| # paths. Skip symlink tests on WASI for now. |
| src = os.path.abspath(TESTFN) |
| symlink_path = src + "can_symlink" |
| try: |
| os.symlink(src, symlink_path) |
| can = True |
| except (OSError, NotImplementedError, AttributeError): |
| can = False |
| else: |
| os.remove(symlink_path) |
| _can_symlink = can |
| return can |
| |
| |
| def skip_unless_symlink(test): |
| """Skip decorator for tests that require functional symlink""" |
| ok = can_symlink() |
| msg = "Requires functional symlink implementation" |
| return test if ok else unittest.skip(msg)(test) |
| |
| |
| _can_xattr = None |
| |
| |
| def can_xattr(): |
| import tempfile |
| global _can_xattr |
| if _can_xattr is not None: |
| return _can_xattr |
| if not hasattr(os, "setxattr"): |
| can = False |
| else: |
| import platform |
| tmp_dir = tempfile.mkdtemp() |
| tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) |
| try: |
| with open(TESTFN, "wb") as fp: |
| try: |
| # TESTFN & tempfile may use different file systems with |
| # different capabilities |
| os.setxattr(tmp_fp, b"user.test", b"") |
| os.setxattr(tmp_name, b"trusted.foo", b"42") |
| os.setxattr(fp.fileno(), b"user.test", b"") |
| # Kernels < 2.6.39 don't respect setxattr flags. |
| kernel_version = platform.release() |
| m = re.match(r"2.6.(\d{1,2})", kernel_version) |
| can = m is None or int(m.group(1)) >= 39 |
| except OSError: |
| can = False |
| finally: |
| unlink(TESTFN) |
| unlink(tmp_name) |
| rmdir(tmp_dir) |
| _can_xattr = can |
| return can |
| |
| |
| def skip_unless_xattr(test): |
| """Skip decorator for tests that require functional extended attributes""" |
| ok = can_xattr() |
| msg = "no non-broken extended attribute support" |
| return test if ok else unittest.skip(msg)(test) |
| |
| |
| _can_chmod = None |
| |
| def can_chmod(): |
| global _can_chmod |
| if _can_chmod is not None: |
| return _can_chmod |
| if not hasattr(os, "chown"): |
| _can_chmod = False |
| return _can_chmod |
| try: |
| with open(TESTFN, "wb") as f: |
| try: |
| os.chmod(TESTFN, 0o777) |
| mode1 = os.stat(TESTFN).st_mode |
| os.chmod(TESTFN, 0o666) |
| mode2 = os.stat(TESTFN).st_mode |
| except OSError as e: |
| can = False |
| else: |
| can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2) |
| finally: |
| unlink(TESTFN) |
| _can_chmod = can |
| return can |
| |
| |
| def skip_unless_working_chmod(test): |
| """Skip tests that require working os.chmod() |
| |
| WASI SDK 15.0 cannot change file mode bits. |
| """ |
| ok = can_chmod() |
| msg = "requires working os.chmod()" |
| return test if ok else unittest.skip(msg)(test) |
| |
| |
| # Check whether the current effective user has the capability to override |
| # DAC (discretionary access control). Typically user root is able to |
| # bypass file read, write, and execute permission checks. The capability |
| # is independent of the effective user. See capabilities(7). |
| _can_dac_override = None |
| |
| def can_dac_override(): |
| global _can_dac_override |
| |
| if not can_chmod(): |
| _can_dac_override = False |
| if _can_dac_override is not None: |
| return _can_dac_override |
| |
| try: |
| with open(TESTFN, "wb") as f: |
| os.chmod(TESTFN, 0o400) |
| try: |
| with open(TESTFN, "wb"): |
| pass |
| except OSError: |
| _can_dac_override = False |
| else: |
| _can_dac_override = True |
| finally: |
| unlink(TESTFN) |
| |
| return _can_dac_override |
| |
| |
| def skip_if_dac_override(test): |
| ok = not can_dac_override() |
| msg = "incompatible with CAP_DAC_OVERRIDE" |
| return test if ok else unittest.skip(msg)(test) |
| |
| |
| def skip_unless_dac_override(test): |
| ok = can_dac_override() |
| msg = "requires CAP_DAC_OVERRIDE" |
| return test if ok else unittest.skip(msg)(test) |
| |
| |
| def unlink(filename): |
| try: |
| _unlink(filename) |
| except (FileNotFoundError, NotADirectoryError): |
| pass |
| |
| |
| if sys.platform.startswith("win"): |
| def _waitfor(func, pathname, waitall=False): |
| # Perform the operation |
| func(pathname) |
| # Now setup the wait loop |
| if waitall: |
| dirname = pathname |
| else: |
| dirname, name = os.path.split(pathname) |
| dirname = dirname or '.' |
| # Check for `pathname` to be removed from the filesystem. |
| # The exponential backoff of the timeout amounts to a total |
| # of ~1 second after which the deletion is probably an error |
| # anyway. |
| # Testing on an [email protected] shows that usually only 1 iteration is |
| # required when contention occurs. |
| timeout = 0.001 |
| while timeout < 1.0: |
| # Note we are only testing for the existence of the file(s) in |
| # the contents of the directory regardless of any security or |
| # access rights. If we have made it this far, we have sufficient |
| # permissions to do that much using Python's equivalent of the |
| # Windows API FindFirstFile. |
| # Other Windows APIs can fail or give incorrect results when |
| # dealing with files that are pending deletion. |
| L = os.listdir(dirname) |
| if not (L if waitall else name in L): |
| return |
| # Increase the timeout and try again |
| time.sleep(timeout) |
| timeout *= 2 |
| warnings.warn('tests may fail, delete still pending for ' + pathname, |
| RuntimeWarning, stacklevel=4) |
| |
| def _unlink(filename): |
| _waitfor(os.unlink, filename) |
| |
| def _rmdir(dirname): |
| _waitfor(os.rmdir, dirname) |
| |
| def _rmtree(path): |
| from test.support import _force_run |
| |
| def _rmtree_inner(path): |
| for name in _force_run(path, os.listdir, path): |
| fullname = os.path.join(path, name) |
| try: |
| mode = os.lstat(fullname).st_mode |
| except OSError as exc: |
| print("support.rmtree(): os.lstat(%r) failed with %s" |
| % (fullname, exc), |
| file=sys.__stderr__) |
| mode = 0 |
| if stat.S_ISDIR(mode): |
| _waitfor(_rmtree_inner, fullname, waitall=True) |
| _force_run(fullname, os.rmdir, fullname) |
| else: |
| _force_run(fullname, os.unlink, fullname) |
| _waitfor(_rmtree_inner, path, waitall=True) |
| _waitfor(lambda p: _force_run(p, os.rmdir, p), path) |
| |
| def _longpath(path): |
| try: |
| import ctypes |
| except ImportError: |
| # No ctypes means we can't expands paths. |
| pass |
| else: |
| buffer = ctypes.create_unicode_buffer(len(path) * 2) |
| length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, |
| len(buffer)) |
| if length: |
| return buffer[:length] |
| return path |
| else: |
| _unlink = os.unlink |
| _rmdir = os.rmdir |
| |
| def _rmtree(path): |
| import shutil |
| try: |
| shutil.rmtree(path) |
| return |
| except OSError: |
| pass |
| |
| def _rmtree_inner(path): |
| from test.support import _force_run |
| for name in _force_run(path, os.listdir, path): |
| fullname = os.path.join(path, name) |
| try: |
| mode = os.lstat(fullname).st_mode |
| except OSError: |
| mode = 0 |
| if stat.S_ISDIR(mode): |
| _rmtree_inner(fullname) |
| _force_run(path, os.rmdir, fullname) |
| else: |
| _force_run(path, os.unlink, fullname) |
| _rmtree_inner(path) |
| os.rmdir(path) |
| |
| def _longpath(path): |
| return path |
| |
| |
| def rmdir(dirname): |
| try: |
| _rmdir(dirname) |
| except FileNotFoundError: |
| pass |
| |
| |
| def rmtree(path): |
| try: |
| _rmtree(path) |
| except FileNotFoundError: |
| pass |
| |
| |
| @contextlib.contextmanager |
| def temp_dir(path=None, quiet=False): |
| """Return a context manager that creates a temporary directory. |
| |
| Arguments: |
| |
| path: the directory to create temporarily. If omitted or None, |
| defaults to creating a temporary directory using tempfile.mkdtemp. |
| |
| quiet: if False (the default), the context manager raises an exception |
| on error. Otherwise, if the path is specified and cannot be |
| created, only a warning is issued. |
| |
| """ |
| import tempfile |
| dir_created = False |
| if path is None: |
| path = tempfile.mkdtemp() |
| dir_created = True |
| path = os.path.realpath(path) |
| else: |
| try: |
| os.mkdir(path) |
| dir_created = True |
| except OSError as exc: |
| if not quiet: |
| raise |
| warnings.warn(f'tests may fail, unable to create ' |
| f'temporary directory {path!r}: {exc}', |
| RuntimeWarning, stacklevel=3) |
| if dir_created: |
| pid = os.getpid() |
| try: |
| yield path |
| finally: |
| # In case the process forks, let only the parent remove the |
| # directory. The child has a different process id. (bpo-30028) |
| if dir_created and pid == os.getpid(): |
| rmtree(path) |
| |
| |
| @contextlib.contextmanager |
| def change_cwd(path, quiet=False): |
| """Return a context manager that changes the current working directory. |
| |
| Arguments: |
| |
| path: the directory to use as the temporary current working directory. |
| |
| quiet: if False (the default), the context manager raises an exception |
| on error. Otherwise, it issues only a warning and keeps the current |
| working directory the same. |
| |
| """ |
| saved_dir = os.getcwd() |
| try: |
| os.chdir(os.path.realpath(path)) |
| except OSError as exc: |
| if not quiet: |
| raise |
| warnings.warn(f'tests may fail, unable to change the current working ' |
| f'directory to {path!r}: {exc}', |
| RuntimeWarning, stacklevel=3) |
| try: |
| yield os.getcwd() |
| finally: |
| os.chdir(saved_dir) |
| |
| |
| @contextlib.contextmanager |
| def temp_cwd(name='tempcwd', quiet=False): |
| """ |
| Context manager that temporarily creates and changes the CWD. |
| |
| The function temporarily changes the current working directory |
| after creating a temporary directory in the current directory with |
| name *name*. If *name* is None, the temporary directory is |
| created using tempfile.mkdtemp. |
| |
| If *quiet* is False (default) and it is not possible to |
| create or change the CWD, an error is raised. If *quiet* is True, |
| only a warning is raised and the original CWD is used. |
| |
| """ |
| with temp_dir(path=name, quiet=quiet) as temp_path: |
| with change_cwd(temp_path, quiet=quiet) as cwd_dir: |
| yield cwd_dir |
| |
| |
| def create_empty_file(filename): |
| """Create an empty file. If the file already exists, truncate it.""" |
| fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) |
| os.close(fd) |
| |
| |
| @contextlib.contextmanager |
| def open_dir_fd(path): |
| """Open a file descriptor to a directory.""" |
| assert os.path.isdir(path) |
| flags = os.O_RDONLY |
| if hasattr(os, "O_DIRECTORY"): |
| flags |= os.O_DIRECTORY |
| dir_fd = os.open(path, flags) |
| try: |
| yield dir_fd |
| finally: |
| os.close(dir_fd) |
| |
| |
| def fs_is_case_insensitive(directory): |
| """Detects if the file system for the specified directory |
| is case-insensitive.""" |
| import tempfile |
| with tempfile.NamedTemporaryFile(dir=directory) as base: |
| base_path = base.name |
| case_path = base_path.upper() |
| if case_path == base_path: |
| case_path = base_path.lower() |
| try: |
| return os.path.samefile(base_path, case_path) |
| except FileNotFoundError: |
| return False |
| |
| |
| class FakePath: |
| """Simple implementing of the path protocol. |
| """ |
| def __init__(self, path): |
| self.path = path |
| |
| def __repr__(self): |
| return f'<FakePath {self.path!r}>' |
| |
| def __fspath__(self): |
| if (isinstance(self.path, BaseException) or |
| isinstance(self.path, type) and |
| issubclass(self.path, BaseException)): |
| raise self.path |
| else: |
| return self.path |
| |
| |
| def fd_count(): |
| """Count the number of open file descriptors. |
| """ |
| if sys.platform.startswith(('linux', 'freebsd', 'emscripten')): |
| try: |
| names = os.listdir("/proc/self/fd") |
| # Subtract one because listdir() internally opens a file |
| # descriptor to list the content of the /proc/self/fd/ directory. |
| return len(names) - 1 |
| except FileNotFoundError: |
| pass |
| |
| MAXFD = 256 |
| if hasattr(os, 'sysconf'): |
| try: |
| MAXFD = os.sysconf("SC_OPEN_MAX") |
| except OSError: |
| pass |
| |
| old_modes = None |
| if sys.platform == 'win32': |
| # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process |
| # on invalid file descriptor if Python is compiled in debug mode |
| try: |
| import msvcrt |
| msvcrt.CrtSetReportMode |
| except (AttributeError, ImportError): |
| # no msvcrt or a release build |
| pass |
| else: |
| old_modes = {} |
| for report_type in (msvcrt.CRT_WARN, |
| msvcrt.CRT_ERROR, |
| msvcrt.CRT_ASSERT): |
| old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, |
| 0) |
| |
| try: |
| count = 0 |
| for fd in range(MAXFD): |
| try: |
| # Prefer dup() over fstat(). fstat() can require input/output |
| # whereas dup() doesn't. |
| fd2 = os.dup(fd) |
| except OSError as e: |
| if e.errno != errno.EBADF: |
| raise |
| else: |
| os.close(fd2) |
| count += 1 |
| finally: |
| if old_modes is not None: |
| for report_type in (msvcrt.CRT_WARN, |
| msvcrt.CRT_ERROR, |
| msvcrt.CRT_ASSERT): |
| msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) |
| |
| return count |
| |
| |
| if hasattr(os, "umask"): |
| @contextlib.contextmanager |
| def temp_umask(umask): |
| """Context manager that temporarily sets the process umask.""" |
| oldmask = os.umask(umask) |
| try: |
| yield |
| finally: |
| os.umask(oldmask) |
| else: |
| @contextlib.contextmanager |
| def temp_umask(umask): |
| """no-op on platforms without umask()""" |
| yield |
| |
| |
| class EnvironmentVarGuard(collections.abc.MutableMapping): |
| |
| """Class to help protect the environment variable properly. Can be used as |
| a context manager.""" |
| |
| def __init__(self): |
| self._environ = os.environ |
| self._changed = {} |
| |
| def __getitem__(self, envvar): |
| return self._environ[envvar] |
| |
| def __setitem__(self, envvar, value): |
| # Remember the initial value on the first access |
| if envvar not in self._changed: |
| self._changed[envvar] = self._environ.get(envvar) |
| self._environ[envvar] = value |
| |
| def __delitem__(self, envvar): |
| # Remember the initial value on the first access |
| if envvar not in self._changed: |
| self._changed[envvar] = self._environ.get(envvar) |
| if envvar in self._environ: |
| del self._environ[envvar] |
| |
| def keys(self): |
| return self._environ.keys() |
| |
| def __iter__(self): |
| return iter(self._environ) |
| |
| def __len__(self): |
| return len(self._environ) |
| |
| def set(self, envvar, value): |
| self[envvar] = value |
| |
| def unset(self, envvar): |
| del self[envvar] |
| |
| def copy(self): |
| # We do what os.environ.copy() does. |
| return dict(self) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *ignore_exc): |
| for (k, v) in self._changed.items(): |
| if v is None: |
| if k in self._environ: |
| del self._environ[k] |
| else: |
| self._environ[k] = v |
| os.environ = self._environ |