| import fnmatch |
| import glob |
| import os |
| import os.path |
| import shutil |
| import stat |
| |
| from .iterutil import iter_many |
| |
| |
| USE_CWD = object() |
| |
| |
| C_SOURCE_SUFFIXES = ('.c', '.h') |
| |
| |
| def create_backup(old, backup=None): |
| if isinstance(old, str): |
| filename = old |
| else: |
| filename = getattr(old, 'name', None) |
| if not filename: |
| return None |
| if not backup or backup is True: |
| backup = f'{filename}.bak' |
| try: |
| shutil.copyfile(filename, backup) |
| except FileNotFoundError as exc: |
| if exc.filename != filename: |
| raise # re-raise |
| backup = None |
| return backup |
| |
| |
| ################################## |
| # filenames |
| |
| def fix_filename(filename, relroot=USE_CWD, *, |
| fixroot=True, |
| _badprefix=f'..{os.path.sep}', |
| ): |
| """Return a normalized, absolute-path copy of the given filename.""" |
| if not relroot or relroot is USE_CWD: |
| return os.path.abspath(filename) |
| if fixroot: |
| relroot = os.path.abspath(relroot) |
| return _fix_filename(filename, relroot) |
| |
| |
| def _fix_filename(filename, relroot, *, |
| _badprefix=f'..{os.path.sep}', |
| ): |
| orig = filename |
| |
| # First we normalize. |
| filename = os.path.normpath(filename) |
| if filename.startswith(_badprefix): |
| raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') |
| |
| # Now make sure it is absolute (relative to relroot). |
| if not os.path.isabs(filename): |
| filename = os.path.join(relroot, filename) |
| else: |
| relpath = os.path.relpath(filename, relroot) |
| if os.path.join(relroot, relpath) != filename: |
| raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}') |
| |
| return filename |
| |
| |
| def fix_filenames(filenames, relroot=USE_CWD): |
| if not relroot or relroot is USE_CWD: |
| filenames = (os.path.abspath(v) for v in filenames) |
| else: |
| relroot = os.path.abspath(relroot) |
| filenames = (_fix_filename(v, relroot) for v in filenames) |
| return filenames, relroot |
| |
| |
| def format_filename(filename, relroot=USE_CWD, *, |
| fixroot=True, |
| normalize=True, |
| _badprefix=f'..{os.path.sep}', |
| ): |
| """Return a consistent relative-path representation of the filename.""" |
| orig = filename |
| if normalize: |
| filename = os.path.normpath(filename) |
| if relroot is None: |
| # Otherwise leave it as-is. |
| return filename |
| elif relroot is USE_CWD: |
| # Make it relative to CWD. |
| filename = os.path.relpath(filename) |
| else: |
| # Make it relative to "relroot". |
| if fixroot: |
| relroot = os.path.abspath(relroot) |
| elif not relroot: |
| raise ValueError('missing relroot') |
| filename = os.path.relpath(filename, relroot) |
| if filename.startswith(_badprefix): |
| raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') |
| return filename |
| |
| |
| ################################## |
| # find files |
| |
| def match_glob(filename, pattern): |
| if fnmatch.fnmatch(filename, pattern): |
| return True |
| |
| # fnmatch doesn't handle ** quite right. It will not match the |
| # following: |
| # |
| # ('x/spam.py', 'x/**/*.py') |
| # ('spam.py', '**/*.py') |
| # |
| # though it *will* match the following: |
| # |
| # ('x/y/spam.py', 'x/**/*.py') |
| # ('x/spam.py', '**/*.py') |
| |
| if '**/' not in pattern: |
| return False |
| |
| # We only accommodate the single-"**" case. |
| return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1)) |
| |
| |
| def process_filenames(filenames, *, |
| start=None, |
| include=None, |
| exclude=None, |
| relroot=USE_CWD, |
| ): |
| if relroot and relroot is not USE_CWD: |
| relroot = os.path.abspath(relroot) |
| if start: |
| start = fix_filename(start, relroot, fixroot=False) |
| if include: |
| include = set(fix_filename(v, relroot, fixroot=False) |
| for v in include) |
| if exclude: |
| exclude = set(fix_filename(v, relroot, fixroot=False) |
| for v in exclude) |
| |
| onempty = Exception('no filenames provided') |
| for filename, solo in iter_many(filenames, onempty): |
| filename = fix_filename(filename, relroot, fixroot=False) |
| relfile = format_filename(filename, relroot, fixroot=False, normalize=False) |
| check, start = _get_check(filename, start, include, exclude) |
| yield filename, relfile, check, solo |
| |
| |
| def expand_filenames(filenames): |
| for filename in filenames: |
| # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)? |
| if '**/' in filename: |
| yield from glob.glob(filename.replace('**/', '')) |
| yield from glob.glob(filename) |
| |
| |
| def _get_check(filename, start, include, exclude): |
| if start and filename != start: |
| return (lambda: '<skipped>'), start |
| else: |
| def check(): |
| if _is_excluded(filename, exclude, include): |
| return '<excluded>' |
| return None |
| return check, None |
| |
| |
| def _is_excluded(filename, exclude, include): |
| if include: |
| for included in include: |
| if match_glob(filename, included): |
| return False |
| return True |
| elif exclude: |
| for excluded in exclude: |
| if match_glob(filename, excluded): |
| return True |
| return False |
| else: |
| return False |
| |
| |
| def _walk_tree(root, *, |
| _walk=os.walk, |
| ): |
| # A wrapper around os.walk that resolves the filenames. |
| for parent, _, names in _walk(root): |
| for name in names: |
| yield os.path.join(parent, name) |
| |
| |
| def walk_tree(root, *, |
| suffix=None, |
| walk=_walk_tree, |
| ): |
| """Yield each file in the tree under the given directory name. |
| |
| If "suffix" is provided then only files with that suffix will |
| be included. |
| """ |
| if suffix and not isinstance(suffix, str): |
| raise ValueError('suffix must be a string') |
| |
| for filename in walk(root): |
| if suffix and not filename.endswith(suffix): |
| continue |
| yield filename |
| |
| |
| def glob_tree(root, *, |
| suffix=None, |
| _glob=glob.iglob, |
| ): |
| """Yield each file in the tree under the given directory name. |
| |
| If "suffix" is provided then only files with that suffix will |
| be included. |
| """ |
| suffix = suffix or '' |
| if not isinstance(suffix, str): |
| raise ValueError('suffix must be a string') |
| |
| for filename in _glob(f'{root}/*{suffix}'): |
| yield filename |
| for filename in _glob(f'{root}/**/*{suffix}'): |
| yield filename |
| |
| |
| def iter_files(root, suffix=None, relparent=None, *, |
| get_files=os.walk, |
| _glob=glob_tree, |
| _walk=walk_tree, |
| ): |
| """Yield each file in the tree under the given directory name. |
| |
| If "root" is a non-string iterable then do the same for each of |
| those trees. |
| |
| If "suffix" is provided then only files with that suffix will |
| be included. |
| |
| if "relparent" is provided then it is used to resolve each |
| filename as a relative path. |
| """ |
| if not isinstance(root, str): |
| roots = root |
| for root in roots: |
| yield from iter_files(root, suffix, relparent, |
| get_files=get_files, |
| _glob=_glob, _walk=_walk) |
| return |
| |
| # Use the right "walk" function. |
| if get_files in (glob.glob, glob.iglob, glob_tree): |
| get_files = _glob |
| else: |
| _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files |
| get_files = (lambda *a, **k: _walk(*a, walk=_files, **k)) |
| |
| # Handle a single suffix. |
| if suffix and not isinstance(suffix, str): |
| filenames = get_files(root) |
| suffix = tuple(suffix) |
| else: |
| filenames = get_files(root, suffix=suffix) |
| suffix = None |
| |
| for filename in filenames: |
| if suffix and not isinstance(suffix, str): # multiple suffixes |
| if not filename.endswith(suffix): |
| continue |
| if relparent: |
| filename = os.path.relpath(filename, relparent) |
| yield filename |
| |
| |
| def iter_files_by_suffix(root, suffixes, relparent=None, *, |
| walk=walk_tree, |
| _iter_files=iter_files, |
| ): |
| """Yield each file in the tree that has the given suffixes. |
| |
| Unlike iter_files(), the results are in the original suffix order. |
| """ |
| if isinstance(suffixes, str): |
| suffixes = [suffixes] |
| # XXX Ignore repeated suffixes? |
| for suffix in suffixes: |
| yield from _iter_files(root, suffix, relparent) |
| |
| |
| ################################## |
| # file info |
| |
| # XXX posix-only? |
| |
| S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
| S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH |
| S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH |
| |
| |
| def is_readable(file, *, user=None, check=False): |
| filename, st, mode = _get_file_info(file) |
| if check: |
| try: |
| okay = _check_file(filename, S_IRANY) |
| except NotImplementedError: |
| okay = NotImplemented |
| if okay is not NotImplemented: |
| return okay |
| # Fall back to checking the mode. |
| return _check_mode(st, mode, S_IRANY, user) |
| |
| |
| def is_writable(file, *, user=None, check=False): |
| filename, st, mode = _get_file_info(file) |
| if check: |
| try: |
| okay = _check_file(filename, S_IWANY) |
| except NotImplementedError: |
| okay = NotImplemented |
| if okay is not NotImplemented: |
| return okay |
| # Fall back to checking the mode. |
| return _check_mode(st, mode, S_IWANY, user) |
| |
| |
| def is_executable(file, *, user=None, check=False): |
| filename, st, mode = _get_file_info(file) |
| if check: |
| try: |
| okay = _check_file(filename, S_IXANY) |
| except NotImplementedError: |
| okay = NotImplemented |
| if okay is not NotImplemented: |
| return okay |
| # Fall back to checking the mode. |
| return _check_mode(st, mode, S_IXANY, user) |
| |
| |
| def _get_file_info(file): |
| filename = st = mode = None |
| if isinstance(file, int): |
| mode = file |
| elif isinstance(file, os.stat_result): |
| st = file |
| else: |
| if isinstance(file, str): |
| filename = file |
| elif hasattr(file, 'name') and os.path.exists(file.name): |
| filename = file.name |
| else: |
| raise NotImplementedError(file) |
| st = os.stat(filename) |
| return filename, st, mode or st.st_mode |
| |
| |
| def _check_file(filename, check): |
| if not isinstance(filename, str): |
| raise Exception(f'filename required to check file, got {filename}') |
| if check & S_IRANY: |
| flags = os.O_RDONLY |
| elif check & S_IWANY: |
| flags = os.O_WRONLY |
| elif check & S_IXANY: |
| # We can worry about S_IXANY later |
| return NotImplemented |
| else: |
| raise NotImplementedError(check) |
| |
| try: |
| fd = os.open(filename, flags) |
| except PermissionError: |
| return False |
| # We do not ignore other exceptions. |
| else: |
| os.close(fd) |
| return True |
| |
| |
| def _get_user_info(user): |
| import pwd |
| username = uid = gid = groups = None |
| if user is None: |
| uid = os.geteuid() |
| #username = os.getlogin() |
| username = pwd.getpwuid(uid)[0] |
| gid = os.getgid() |
| groups = os.getgroups() |
| else: |
| if isinstance(user, int): |
| uid = user |
| entry = pwd.getpwuid(uid) |
| username = entry.pw_name |
| elif isinstance(user, str): |
| username = user |
| entry = pwd.getpwnam(username) |
| uid = entry.pw_uid |
| else: |
| raise NotImplementedError(user) |
| gid = entry.pw_gid |
| os.getgrouplist(username, gid) |
| return username, uid, gid, groups |
| |
| |
| def _check_mode(st, mode, check, user): |
| orig = check |
| _, uid, gid, groups = _get_user_info(user) |
| if check & S_IRANY: |
| check -= S_IRANY |
| matched = False |
| if mode & stat.S_IRUSR: |
| if st.st_uid == uid: |
| matched = True |
| if mode & stat.S_IRGRP: |
| if st.st_uid == gid or st.st_uid in groups: |
| matched = True |
| if mode & stat.S_IROTH: |
| matched = True |
| if not matched: |
| return False |
| if check & S_IWANY: |
| check -= S_IWANY |
| matched = False |
| if mode & stat.S_IWUSR: |
| if st.st_uid == uid: |
| matched = True |
| if mode & stat.S_IWGRP: |
| if st.st_uid == gid or st.st_uid in groups: |
| matched = True |
| if mode & stat.S_IWOTH: |
| matched = True |
| if not matched: |
| return False |
| if check & S_IXANY: |
| check -= S_IXANY |
| matched = False |
| if mode & stat.S_IXUSR: |
| if st.st_uid == uid: |
| matched = True |
| if mode & stat.S_IXGRP: |
| if st.st_uid == gid or st.st_uid in groups: |
| matched = True |
| if mode & stat.S_IXOTH: |
| matched = True |
| if not matched: |
| return False |
| if check: |
| raise NotImplementedError((orig, check)) |
| return True |