blob: 37bf98df796723a806833f21903448788cf1a4b9 [file] [log] [blame]
Haibo Huangd8830302020-03-03 10:09:46 -08001"""Utility functions for copying and archiving files and directory trees.
2
3XXX The functions here don't copy the resource fork or other metadata on Mac.
4
5"""
6
7import os
8import sys
9import stat
10import fnmatch
11import collections
12import errno
13
14try:
15 import zlib
16 del zlib
17 _ZLIB_SUPPORTED = True
18except ImportError:
19 _ZLIB_SUPPORTED = False
20
21try:
22 import bz2
23 del bz2
24 _BZ2_SUPPORTED = True
25except ImportError:
26 _BZ2_SUPPORTED = False
27
28try:
29 import lzma
30 del lzma
31 _LZMA_SUPPORTED = True
32except ImportError:
33 _LZMA_SUPPORTED = False
34
Haibo Huangd8830302020-03-03 10:09:46 -080035_WINDOWS = os.name == 'nt'
36posix = nt = None
37if os.name == 'posix':
38 import posix
39elif _WINDOWS:
40 import nt
41
42COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
43_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
44_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
45
Haibo Huang5eba2b42021-01-22 11:22:02 -080046# CMD defaults in Windows 10
47_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
48
Haibo Huangd8830302020-03-03 10:09:46 -080049__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
50 "copytree", "move", "rmtree", "Error", "SpecialFileError",
51 "ExecError", "make_archive", "get_archive_formats",
52 "register_archive_format", "unregister_archive_format",
53 "get_unpack_formats", "register_unpack_format",
54 "unregister_unpack_format", "unpack_archive",
55 "ignore_patterns", "chown", "which", "get_terminal_size",
56 "SameFileError"]
57 # disk_usage is added later, if available on the platform
58
59class Error(OSError):
60 pass
61
62class SameFileError(Error):
63 """Raised when source and destination are the same file."""
64
65class SpecialFileError(OSError):
66 """Raised when trying to do a kind of operation (e.g. copying) which is
67 not supported on a special file (e.g. a named pipe)"""
68
69class ExecError(OSError):
70 """Raised when a command could not be executed"""
71
72class ReadError(OSError):
73 """Raised when an archive cannot be read"""
74
75class RegistryError(Exception):
76 """Raised when a registry operation with the archiving
77 and unpacking registries fails"""
78
79class _GiveupOnFastCopy(Exception):
80 """Raised as a signal to fallback on using raw read()/write()
81 file copy when fast-copy functions fail to do so.
82 """
83
84def _fastcopy_fcopyfile(fsrc, fdst, flags):
85 """Copy a regular file content or metadata by using high-performance
86 fcopyfile(3) syscall (macOS).
87 """
88 try:
89 infd = fsrc.fileno()
90 outfd = fdst.fileno()
91 except Exception as err:
92 raise _GiveupOnFastCopy(err) # not a regular file
93
94 try:
95 posix._fcopyfile(infd, outfd, flags)
96 except OSError as err:
97 err.filename = fsrc.name
98 err.filename2 = fdst.name
99 if err.errno in {errno.EINVAL, errno.ENOTSUP}:
100 raise _GiveupOnFastCopy(err)
101 else:
102 raise err from None
103
104def _fastcopy_sendfile(fsrc, fdst):
105 """Copy data from one regular mmap-like fd to another by using
106 high-performance sendfile(2) syscall.
107 This should work on Linux >= 2.6.33 only.
108 """
109 # Note: copyfileobj() is left alone in order to not introduce any
110 # unexpected breakage. Possible risks by using zero-copy calls
111 # in copyfileobj() are:
112 # - fdst cannot be open in "a"(ppend) mode
113 # - fsrc and fdst may be open in "t"(ext) mode
114 # - fsrc may be a BufferedReader (which hides unread data in a buffer),
115 # GzipFile (which decompresses data), HTTPResponse (which decodes
116 # chunks).
117 # - possibly others (e.g. encrypted fs/partition?)
118 global _USE_CP_SENDFILE
119 try:
120 infd = fsrc.fileno()
121 outfd = fdst.fileno()
122 except Exception as err:
123 raise _GiveupOnFastCopy(err) # not a regular file
124
125 # Hopefully the whole file will be copied in a single call.
126 # sendfile() is called in a loop 'till EOF is reached (0 return)
127 # so a bufsize smaller or bigger than the actual file size
128 # should not make any difference, also in case the file content
129 # changes while being copied.
130 try:
131 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
132 except OSError:
133 blocksize = 2 ** 27 # 128MiB
134 # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
135 # see bpo-38319.
136 if sys.maxsize < 2 ** 32:
137 blocksize = min(blocksize, 2 ** 30)
138
139 offset = 0
140 while True:
141 try:
142 sent = os.sendfile(outfd, infd, offset, blocksize)
143 except OSError as err:
144 # ...in oder to have a more informative exception.
145 err.filename = fsrc.name
146 err.filename2 = fdst.name
147
148 if err.errno == errno.ENOTSOCK:
149 # sendfile() on this platform (probably Linux < 2.6.33)
150 # does not support copies between regular files (only
151 # sockets).
152 _USE_CP_SENDFILE = False
153 raise _GiveupOnFastCopy(err)
154
155 if err.errno == errno.ENOSPC: # filesystem is full
156 raise err from None
157
158 # Give up on first call and if no data was copied.
159 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
160 raise _GiveupOnFastCopy(err)
161
162 raise err
163 else:
164 if sent == 0:
165 break # EOF
166 offset += sent
167
168def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
169 """readinto()/memoryview() based variant of copyfileobj().
170 *fsrc* must support readinto() method and both files must be
171 open in binary mode.
172 """
173 # Localize variable access to minimize overhead.
174 fsrc_readinto = fsrc.readinto
175 fdst_write = fdst.write
176 with memoryview(bytearray(length)) as mv:
177 while True:
178 n = fsrc_readinto(mv)
179 if not n:
180 break
181 elif n < length:
182 with mv[:n] as smv:
183 fdst.write(smv)
184 else:
185 fdst_write(mv)
186
187def copyfileobj(fsrc, fdst, length=0):
188 """copy data from file-like object fsrc to file-like object fdst"""
189 # Localize variable access to minimize overhead.
190 if not length:
191 length = COPY_BUFSIZE
192 fsrc_read = fsrc.read
193 fdst_write = fdst.write
194 while True:
195 buf = fsrc_read(length)
196 if not buf:
197 break
198 fdst_write(buf)
199
200def _samefile(src, dst):
201 # Macintosh, Unix.
202 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
203 try:
204 return os.path.samestat(src.stat(), os.stat(dst))
205 except OSError:
206 return False
207
208 if hasattr(os.path, 'samefile'):
209 try:
210 return os.path.samefile(src, dst)
211 except OSError:
212 return False
213
214 # All other platforms: check for same pathname.
215 return (os.path.normcase(os.path.abspath(src)) ==
216 os.path.normcase(os.path.abspath(dst)))
217
218def _stat(fn):
219 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
220
221def _islink(fn):
222 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
223
224def copyfile(src, dst, *, follow_symlinks=True):
225 """Copy data from src to dst in the most efficient way possible.
226
227 If follow_symlinks is not set and src is a symbolic link, a new
228 symlink will be created instead of copying the file it points to.
229
230 """
Haibo Huang5980f852020-03-05 12:22:08 -0800231 sys.audit("shutil.copyfile", src, dst)
232
Haibo Huangd8830302020-03-03 10:09:46 -0800233 if _samefile(src, dst):
234 raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
235
236 file_size = 0
237 for i, fn in enumerate([src, dst]):
238 try:
239 st = _stat(fn)
240 except OSError:
241 # File most likely does not exist
242 pass
243 else:
244 # XXX What about other special files? (sockets, devices...)
245 if stat.S_ISFIFO(st.st_mode):
246 fn = fn.path if isinstance(fn, os.DirEntry) else fn
247 raise SpecialFileError("`%s` is a named pipe" % fn)
248 if _WINDOWS and i == 0:
249 file_size = st.st_size
250
251 if not follow_symlinks and _islink(src):
252 os.symlink(os.readlink(src), dst)
253 else:
Yi Kong71199322022-08-30 15:53:45 +0800254 with open(src, 'rb') as fsrc:
255 try:
256 with open(dst, 'wb') as fdst:
257 # macOS
258 if _HAS_FCOPYFILE:
259 try:
260 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
261 return dst
262 except _GiveupOnFastCopy:
263 pass
264 # Linux
265 elif _USE_CP_SENDFILE:
266 try:
267 _fastcopy_sendfile(fsrc, fdst)
268 return dst
269 except _GiveupOnFastCopy:
270 pass
271 # Windows, see:
272 # https://github.com/python/cpython/pull/7160#discussion_r195405230
273 elif _WINDOWS and file_size > 0:
274 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
275 return dst
Haibo Huangd8830302020-03-03 10:09:46 -0800276
Yi Kong71199322022-08-30 15:53:45 +0800277 copyfileobj(fsrc, fdst)
278
279 # Issue 43219, raise a less confusing exception
280 except IsADirectoryError as e:
281 if not os.path.exists(dst):
282 raise FileNotFoundError(f'Directory does not exist: {dst}') from e
283 else:
284 raise
Haibo Huangd8830302020-03-03 10:09:46 -0800285
286 return dst
287
288def copymode(src, dst, *, follow_symlinks=True):
289 """Copy mode bits from src to dst.
290
291 If follow_symlinks is not set, symlinks aren't followed if and only
292 if both `src` and `dst` are symlinks. If `lchmod` isn't available
293 (e.g. Linux) this method does nothing.
294
295 """
Haibo Huang5980f852020-03-05 12:22:08 -0800296 sys.audit("shutil.copymode", src, dst)
297
Haibo Huangd8830302020-03-03 10:09:46 -0800298 if not follow_symlinks and _islink(src) and os.path.islink(dst):
299 if hasattr(os, 'lchmod'):
300 stat_func, chmod_func = os.lstat, os.lchmod
301 else:
302 return
303 else:
304 stat_func, chmod_func = _stat, os.chmod
305
306 st = stat_func(src)
307 chmod_func(dst, stat.S_IMODE(st.st_mode))
308
309if hasattr(os, 'listxattr'):
310 def _copyxattr(src, dst, *, follow_symlinks=True):
311 """Copy extended filesystem attributes from `src` to `dst`.
312
313 Overwrite existing attributes.
314
315 If `follow_symlinks` is false, symlinks won't be followed.
316
317 """
318
319 try:
320 names = os.listxattr(src, follow_symlinks=follow_symlinks)
321 except OSError as e:
322 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
323 raise
324 return
325 for name in names:
326 try:
327 value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
328 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
329 except OSError as e:
330 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
331 errno.EINVAL):
332 raise
333else:
334 def _copyxattr(*args, **kwargs):
335 pass
336
337def copystat(src, dst, *, follow_symlinks=True):
338 """Copy file metadata
339
340 Copy the permission bits, last access time, last modification time, and
341 flags from `src` to `dst`. On Linux, copystat() also copies the "extended
342 attributes" where possible. The file contents, owner, and group are
343 unaffected. `src` and `dst` are path-like objects or path names given as
344 strings.
345
346 If the optional flag `follow_symlinks` is not set, symlinks aren't
347 followed if and only if both `src` and `dst` are symlinks.
348 """
Haibo Huang5980f852020-03-05 12:22:08 -0800349 sys.audit("shutil.copystat", src, dst)
350
Haibo Huangd8830302020-03-03 10:09:46 -0800351 def _nop(*args, ns=None, follow_symlinks=None):
352 pass
353
354 # follow symlinks (aka don't not follow symlinks)
355 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
356 if follow:
357 # use the real function if it exists
358 def lookup(name):
359 return getattr(os, name, _nop)
360 else:
361 # use the real function only if it exists
362 # *and* it supports follow_symlinks
363 def lookup(name):
364 fn = getattr(os, name, _nop)
365 if fn in os.supports_follow_symlinks:
366 return fn
367 return _nop
368
369 if isinstance(src, os.DirEntry):
370 st = src.stat(follow_symlinks=follow)
371 else:
372 st = lookup("stat")(src, follow_symlinks=follow)
373 mode = stat.S_IMODE(st.st_mode)
374 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
375 follow_symlinks=follow)
376 # We must copy extended attributes before the file is (potentially)
377 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
378 _copyxattr(src, dst, follow_symlinks=follow)
379 try:
380 lookup("chmod")(dst, mode, follow_symlinks=follow)
381 except NotImplementedError:
382 # if we got a NotImplementedError, it's because
383 # * follow_symlinks=False,
384 # * lchown() is unavailable, and
385 # * either
386 # * fchownat() is unavailable or
387 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
388 # (it returned ENOSUP.)
389 # therefore we're out of options--we simply cannot chown the
390 # symlink. give up, suppress the error.
391 # (which is what shutil always did in this circumstance.)
392 pass
393 if hasattr(st, 'st_flags'):
394 try:
395 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
396 except OSError as why:
397 for err in 'EOPNOTSUPP', 'ENOTSUP':
398 if hasattr(errno, err) and why.errno == getattr(errno, err):
399 break
400 else:
401 raise
402
403def copy(src, dst, *, follow_symlinks=True):
404 """Copy data and mode bits ("cp src dst"). Return the file's destination.
405
406 The destination may be a directory.
407
408 If follow_symlinks is false, symlinks won't be followed. This
409 resembles GNU's "cp -P src dst".
410
411 If source and destination are the same file, a SameFileError will be
412 raised.
413
414 """
415 if os.path.isdir(dst):
416 dst = os.path.join(dst, os.path.basename(src))
417 copyfile(src, dst, follow_symlinks=follow_symlinks)
418 copymode(src, dst, follow_symlinks=follow_symlinks)
419 return dst
420
421def copy2(src, dst, *, follow_symlinks=True):
422 """Copy data and metadata. Return the file's destination.
423
424 Metadata is copied with copystat(). Please see the copystat function
425 for more information.
426
427 The destination may be a directory.
428
429 If follow_symlinks is false, symlinks won't be followed. This
430 resembles GNU's "cp -P src dst".
431 """
432 if os.path.isdir(dst):
433 dst = os.path.join(dst, os.path.basename(src))
434 copyfile(src, dst, follow_symlinks=follow_symlinks)
435 copystat(src, dst, follow_symlinks=follow_symlinks)
436 return dst
437
438def ignore_patterns(*patterns):
439 """Function that can be used as copytree() ignore parameter.
440
441 Patterns is a sequence of glob-style patterns
442 that are used to exclude files"""
443 def _ignore_patterns(path, names):
444 ignored_names = []
445 for pattern in patterns:
446 ignored_names.extend(fnmatch.filter(names, pattern))
447 return set(ignored_names)
448 return _ignore_patterns
449
450def _copytree(entries, src, dst, symlinks, ignore, copy_function,
451 ignore_dangling_symlinks, dirs_exist_ok=False):
452 if ignore is not None:
Haibo Huang5980f852020-03-05 12:22:08 -0800453 ignored_names = ignore(os.fspath(src), [x.name for x in entries])
Haibo Huangd8830302020-03-03 10:09:46 -0800454 else:
455 ignored_names = set()
456
457 os.makedirs(dst, exist_ok=dirs_exist_ok)
458 errors = []
459 use_srcentry = copy_function is copy2 or copy_function is copy
460
461 for srcentry in entries:
462 if srcentry.name in ignored_names:
463 continue
464 srcname = os.path.join(src, srcentry.name)
465 dstname = os.path.join(dst, srcentry.name)
466 srcobj = srcentry if use_srcentry else srcname
467 try:
468 is_symlink = srcentry.is_symlink()
469 if is_symlink and os.name == 'nt':
470 # Special check for directory junctions, which appear as
471 # symlinks but we want to recurse.
472 lstat = srcentry.stat(follow_symlinks=False)
473 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
474 is_symlink = False
475 if is_symlink:
476 linkto = os.readlink(srcname)
477 if symlinks:
478 # We can't just leave it to `copy_function` because legacy
479 # code with a custom `copy_function` may rely on copytree
480 # doing the right thing.
481 os.symlink(linkto, dstname)
482 copystat(srcobj, dstname, follow_symlinks=not symlinks)
483 else:
484 # ignore dangling symlink if the flag is on
485 if not os.path.exists(linkto) and ignore_dangling_symlinks:
486 continue
487 # otherwise let the copy occur. copy2 will raise an error
488 if srcentry.is_dir():
489 copytree(srcobj, dstname, symlinks, ignore,
490 copy_function, dirs_exist_ok=dirs_exist_ok)
491 else:
492 copy_function(srcobj, dstname)
493 elif srcentry.is_dir():
494 copytree(srcobj, dstname, symlinks, ignore, copy_function,
495 dirs_exist_ok=dirs_exist_ok)
496 else:
497 # Will raise a SpecialFileError for unsupported file types
498 copy_function(srcobj, dstname)
499 # catch the Error from the recursive copytree so that we can
500 # continue with other files
501 except Error as err:
502 errors.extend(err.args[0])
503 except OSError as why:
504 errors.append((srcname, dstname, str(why)))
505 try:
506 copystat(src, dst)
507 except OSError as why:
508 # Copying file access times may fail on Windows
509 if getattr(why, 'winerror', None) is None:
510 errors.append((src, dst, str(why)))
511 if errors:
512 raise Error(errors)
513 return dst
514
515def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
516 ignore_dangling_symlinks=False, dirs_exist_ok=False):
517 """Recursively copy a directory tree and return the destination directory.
518
519 dirs_exist_ok dictates whether to raise an exception in case dst or any
520 missing parent directory already exists.
521
522 If exception(s) occur, an Error is raised with a list of reasons.
523
524 If the optional symlinks flag is true, symbolic links in the
525 source tree result in symbolic links in the destination tree; if
526 it is false, the contents of the files pointed to by symbolic
527 links are copied. If the file pointed by the symlink doesn't
528 exist, an exception will be added in the list of errors raised in
529 an Error exception at the end of the copy process.
530
531 You can set the optional ignore_dangling_symlinks flag to true if you
532 want to silence this exception. Notice that this has no effect on
533 platforms that don't support os.symlink.
534
535 The optional ignore argument is a callable. If given, it
536 is called with the `src` parameter, which is the directory
537 being visited by copytree(), and `names` which is the list of
538 `src` contents, as returned by os.listdir():
539
540 callable(src, names) -> ignored_names
541
542 Since copytree() is called recursively, the callable will be
543 called once for each directory that is copied. It returns a
544 list of names relative to the `src` directory that should
545 not be copied.
546
547 The optional copy_function argument is a callable that will be used
548 to copy each file. It will be called with the source path and the
549 destination path as arguments. By default, copy2() is used, but any
550 function that supports the same signature (like copy()) can be used.
551
552 """
553 sys.audit("shutil.copytree", src, dst)
554 with os.scandir(src) as itr:
555 entries = list(itr)
556 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
557 ignore=ignore, copy_function=copy_function,
558 ignore_dangling_symlinks=ignore_dangling_symlinks,
559 dirs_exist_ok=dirs_exist_ok)
560
561if hasattr(os.stat_result, 'st_file_attributes'):
562 # Special handling for directory junctions to make them behave like
563 # symlinks for shutil.rmtree, since in general they do not appear as
564 # regular links.
565 def _rmtree_isdir(entry):
566 try:
567 st = entry.stat(follow_symlinks=False)
568 return (stat.S_ISDIR(st.st_mode) and not
569 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
570 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
571 except OSError:
572 return False
573
574 def _rmtree_islink(path):
575 try:
576 st = os.lstat(path)
577 return (stat.S_ISLNK(st.st_mode) or
578 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
579 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
580 except OSError:
581 return False
582else:
583 def _rmtree_isdir(entry):
584 try:
585 return entry.is_dir(follow_symlinks=False)
586 except OSError:
587 return False
588
589 def _rmtree_islink(path):
590 return os.path.islink(path)
591
592# version vulnerable to race conditions
593def _rmtree_unsafe(path, onerror):
594 try:
595 with os.scandir(path) as scandir_it:
596 entries = list(scandir_it)
597 except OSError:
598 onerror(os.scandir, path, sys.exc_info())
599 entries = []
600 for entry in entries:
601 fullname = entry.path
602 if _rmtree_isdir(entry):
603 try:
604 if entry.is_symlink():
605 # This can only happen if someone replaces
606 # a directory with a symlink after the call to
607 # os.scandir or entry.is_dir above.
608 raise OSError("Cannot call rmtree on a symbolic link")
609 except OSError:
610 onerror(os.path.islink, fullname, sys.exc_info())
611 continue
612 _rmtree_unsafe(fullname, onerror)
613 else:
614 try:
615 os.unlink(fullname)
616 except OSError:
617 onerror(os.unlink, fullname, sys.exc_info())
618 try:
619 os.rmdir(path)
620 except OSError:
621 onerror(os.rmdir, path, sys.exc_info())
622
623# Version using fd-based APIs to protect against races
624def _rmtree_safe_fd(topfd, path, onerror):
625 try:
626 with os.scandir(topfd) as scandir_it:
627 entries = list(scandir_it)
628 except OSError as err:
629 err.filename = path
630 onerror(os.scandir, path, sys.exc_info())
631 return
632 for entry in entries:
633 fullname = os.path.join(path, entry.name)
634 try:
635 is_dir = entry.is_dir(follow_symlinks=False)
636 except OSError:
637 is_dir = False
638 else:
639 if is_dir:
640 try:
641 orig_st = entry.stat(follow_symlinks=False)
642 is_dir = stat.S_ISDIR(orig_st.st_mode)
643 except OSError:
644 onerror(os.lstat, fullname, sys.exc_info())
645 continue
646 if is_dir:
647 try:
648 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
Yi Kong71199322022-08-30 15:53:45 +0800649 dirfd_closed = False
Haibo Huangd8830302020-03-03 10:09:46 -0800650 except OSError:
651 onerror(os.open, fullname, sys.exc_info())
652 else:
653 try:
654 if os.path.samestat(orig_st, os.fstat(dirfd)):
655 _rmtree_safe_fd(dirfd, fullname, onerror)
656 try:
Yi Kong71199322022-08-30 15:53:45 +0800657 os.close(dirfd)
658 dirfd_closed = True
Haibo Huangd8830302020-03-03 10:09:46 -0800659 os.rmdir(entry.name, dir_fd=topfd)
660 except OSError:
661 onerror(os.rmdir, fullname, sys.exc_info())
662 else:
663 try:
664 # This can only happen if someone replaces
665 # a directory with a symlink after the call to
666 # os.scandir or stat.S_ISDIR above.
667 raise OSError("Cannot call rmtree on a symbolic "
668 "link")
669 except OSError:
670 onerror(os.path.islink, fullname, sys.exc_info())
671 finally:
Yi Kong71199322022-08-30 15:53:45 +0800672 if not dirfd_closed:
673 os.close(dirfd)
Haibo Huangd8830302020-03-03 10:09:46 -0800674 else:
675 try:
676 os.unlink(entry.name, dir_fd=topfd)
677 except OSError:
678 onerror(os.unlink, fullname, sys.exc_info())
679
680_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
681 os.supports_dir_fd and
682 os.scandir in os.supports_fd and
683 os.stat in os.supports_follow_symlinks)
684
685def rmtree(path, ignore_errors=False, onerror=None):
686 """Recursively delete a directory tree.
687
688 If ignore_errors is set, errors are ignored; otherwise, if onerror
689 is set, it is called to handle the error with arguments (func,
690 path, exc_info) where func is platform and implementation dependent;
691 path is the argument to that function that caused it to fail; and
692 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
693 is false and onerror is None, an exception is raised.
694
695 """
696 sys.audit("shutil.rmtree", path)
697 if ignore_errors:
698 def onerror(*args):
699 pass
700 elif onerror is None:
701 def onerror(*args):
702 raise
703 if _use_fd_functions:
704 # While the unsafe rmtree works fine on bytes, the fd based does not.
705 if isinstance(path, bytes):
706 path = os.fsdecode(path)
707 # Note: To guard against symlink races, we use the standard
708 # lstat()/open()/fstat() trick.
709 try:
710 orig_st = os.lstat(path)
711 except Exception:
712 onerror(os.lstat, path, sys.exc_info())
713 return
714 try:
715 fd = os.open(path, os.O_RDONLY)
Yi Kong71199322022-08-30 15:53:45 +0800716 fd_closed = False
Haibo Huangd8830302020-03-03 10:09:46 -0800717 except Exception:
Haibo Huang5eba2b42021-01-22 11:22:02 -0800718 onerror(os.open, path, sys.exc_info())
Haibo Huangd8830302020-03-03 10:09:46 -0800719 return
720 try:
721 if os.path.samestat(orig_st, os.fstat(fd)):
722 _rmtree_safe_fd(fd, path, onerror)
723 try:
Yi Kong71199322022-08-30 15:53:45 +0800724 os.close(fd)
725 fd_closed = True
Haibo Huangd8830302020-03-03 10:09:46 -0800726 os.rmdir(path)
727 except OSError:
728 onerror(os.rmdir, path, sys.exc_info())
729 else:
730 try:
731 # symlinks to directories are forbidden, see bug #1669
732 raise OSError("Cannot call rmtree on a symbolic link")
733 except OSError:
734 onerror(os.path.islink, path, sys.exc_info())
735 finally:
Yi Kong71199322022-08-30 15:53:45 +0800736 if not fd_closed:
737 os.close(fd)
Haibo Huangd8830302020-03-03 10:09:46 -0800738 else:
739 try:
740 if _rmtree_islink(path):
741 # symlinks to directories are forbidden, see bug #1669
742 raise OSError("Cannot call rmtree on a symbolic link")
743 except OSError:
744 onerror(os.path.islink, path, sys.exc_info())
745 # can't continue even if onerror hook returns
746 return
747 return _rmtree_unsafe(path, onerror)
748
749# Allow introspection of whether or not the hardening against symlink
750# attacks is supported on the current platform
751rmtree.avoids_symlink_attacks = _use_fd_functions
752
753def _basename(path):
Haibo Huang5eba2b42021-01-22 11:22:02 -0800754 """A basename() variant which first strips the trailing slash, if present.
755 Thus we always get the last component of the path, even for directories.
756
757 path: Union[PathLike, str]
758
759 e.g.
760 >>> os.path.basename('/bar/foo')
761 'foo'
762 >>> os.path.basename('/bar/foo/')
763 ''
764 >>> _basename('/bar/foo/')
765 'foo'
766 """
767 path = os.fspath(path)
Haibo Huangd8830302020-03-03 10:09:46 -0800768 sep = os.path.sep + (os.path.altsep or '')
769 return os.path.basename(path.rstrip(sep))
770
771def move(src, dst, copy_function=copy2):
772 """Recursively move a file or directory to another location. This is
773 similar to the Unix "mv" command. Return the file or directory's
774 destination.
775
776 If the destination is a directory or a symlink to a directory, the source
777 is moved inside the directory. The destination path must not already
778 exist.
779
780 If the destination already exists but is not a directory, it may be
781 overwritten depending on os.rename() semantics.
782
783 If the destination is on our current filesystem, then rename() is used.
784 Otherwise, src is copied to the destination and then removed. Symlinks are
785 recreated under the new name if os.rename() fails because of cross
786 filesystem renames.
787
788 The optional `copy_function` argument is a callable that will be used
789 to copy the source or it will be delegated to `copytree`.
790 By default, copy2() is used, but any function that supports the same
791 signature (like copy()) can be used.
792
793 A lot more could be done here... A look at a mv.c shows a lot of
794 the issues this implementation glosses over.
795
796 """
Haibo Huang5980f852020-03-05 12:22:08 -0800797 sys.audit("shutil.move", src, dst)
Haibo Huangd8830302020-03-03 10:09:46 -0800798 real_dst = dst
799 if os.path.isdir(dst):
800 if _samefile(src, dst):
801 # We might be on a case insensitive filesystem,
802 # perform the rename anyway.
803 os.rename(src, dst)
804 return
805
Haibo Huang5eba2b42021-01-22 11:22:02 -0800806 # Using _basename instead of os.path.basename is important, as we must
807 # ignore any trailing slash to avoid the basename returning ''
Haibo Huangd8830302020-03-03 10:09:46 -0800808 real_dst = os.path.join(dst, _basename(src))
Haibo Huang5eba2b42021-01-22 11:22:02 -0800809
Haibo Huangd8830302020-03-03 10:09:46 -0800810 if os.path.exists(real_dst):
811 raise Error("Destination path '%s' already exists" % real_dst)
812 try:
813 os.rename(src, real_dst)
814 except OSError:
815 if os.path.islink(src):
816 linkto = os.readlink(src)
817 os.symlink(linkto, real_dst)
818 os.unlink(src)
819 elif os.path.isdir(src):
820 if _destinsrc(src, dst):
821 raise Error("Cannot move a directory '%s' into itself"
822 " '%s'." % (src, dst))
Yi Kong71199322022-08-30 15:53:45 +0800823 if (_is_immutable(src)
824 or (not os.access(src, os.W_OK) and os.listdir(src)
825 and sys.platform == 'darwin')):
826 raise PermissionError("Cannot move the non-empty directory "
827 "'%s': Lacking write permission to '%s'."
828 % (src, src))
Haibo Huangd8830302020-03-03 10:09:46 -0800829 copytree(src, real_dst, copy_function=copy_function,
830 symlinks=True)
831 rmtree(src)
832 else:
833 copy_function(src, real_dst)
834 os.unlink(src)
835 return real_dst
836
837def _destinsrc(src, dst):
838 src = os.path.abspath(src)
839 dst = os.path.abspath(dst)
840 if not src.endswith(os.path.sep):
841 src += os.path.sep
842 if not dst.endswith(os.path.sep):
843 dst += os.path.sep
844 return dst.startswith(src)
845
Yi Kong71199322022-08-30 15:53:45 +0800846def _is_immutable(src):
847 st = _stat(src)
848 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
849 return hasattr(st, 'st_flags') and st.st_flags in immutable_states
850
Haibo Huangd8830302020-03-03 10:09:46 -0800851def _get_gid(name):
852 """Returns a gid, given a group name."""
Yi Kong71199322022-08-30 15:53:45 +0800853 if name is None:
Haibo Huangd8830302020-03-03 10:09:46 -0800854 return None
Yi Kong71199322022-08-30 15:53:45 +0800855
856 try:
857 from grp import getgrnam
858 except ImportError:
859 return None
860
Haibo Huangd8830302020-03-03 10:09:46 -0800861 try:
862 result = getgrnam(name)
863 except KeyError:
864 result = None
865 if result is not None:
866 return result[2]
867 return None
868
869def _get_uid(name):
870 """Returns an uid, given a user name."""
Yi Kong71199322022-08-30 15:53:45 +0800871 if name is None:
Haibo Huangd8830302020-03-03 10:09:46 -0800872 return None
Yi Kong71199322022-08-30 15:53:45 +0800873
874 try:
875 from pwd import getpwnam
876 except ImportError:
877 return None
878
Haibo Huangd8830302020-03-03 10:09:46 -0800879 try:
880 result = getpwnam(name)
881 except KeyError:
882 result = None
883 if result is not None:
884 return result[2]
885 return None
886
887def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
888 owner=None, group=None, logger=None):
889 """Create a (possibly compressed) tar file from all the files under
890 'base_dir'.
891
892 'compress' must be "gzip" (the default), "bzip2", "xz", or None.
893
894 'owner' and 'group' can be used to define an owner and a group for the
895 archive that is being built. If not provided, the current owner and group
896 will be used.
897
898 The output tar file will be named 'base_name' + ".tar", possibly plus
899 the appropriate compression extension (".gz", ".bz2", or ".xz").
900
901 Returns the output filename.
902 """
903 if compress is None:
904 tar_compression = ''
905 elif _ZLIB_SUPPORTED and compress == 'gzip':
906 tar_compression = 'gz'
907 elif _BZ2_SUPPORTED and compress == 'bzip2':
908 tar_compression = 'bz2'
909 elif _LZMA_SUPPORTED and compress == 'xz':
910 tar_compression = 'xz'
911 else:
912 raise ValueError("bad value for 'compress', or compression format not "
913 "supported : {0}".format(compress))
914
915 import tarfile # late import for breaking circular dependency
916
917 compress_ext = '.' + tar_compression if compress else ''
918 archive_name = base_name + '.tar' + compress_ext
919 archive_dir = os.path.dirname(archive_name)
920
921 if archive_dir and not os.path.exists(archive_dir):
922 if logger is not None:
923 logger.info("creating %s", archive_dir)
924 if not dry_run:
925 os.makedirs(archive_dir)
926
927 # creating the tarball
928 if logger is not None:
929 logger.info('Creating tar archive')
930
931 uid = _get_uid(owner)
932 gid = _get_gid(group)
933
934 def _set_uid_gid(tarinfo):
935 if gid is not None:
936 tarinfo.gid = gid
937 tarinfo.gname = group
938 if uid is not None:
939 tarinfo.uid = uid
940 tarinfo.uname = owner
941 return tarinfo
942
943 if not dry_run:
944 tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
945 try:
946 tar.add(base_dir, filter=_set_uid_gid)
947 finally:
948 tar.close()
949
950 return archive_name
951
952def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
953 """Create a zip file from all the files under 'base_dir'.
954
955 The output zip file will be named 'base_name' + ".zip". Returns the
956 name of the output zip file.
957 """
958 import zipfile # late import for breaking circular dependency
959
960 zip_filename = base_name + ".zip"
961 archive_dir = os.path.dirname(base_name)
962
963 if archive_dir and not os.path.exists(archive_dir):
964 if logger is not None:
965 logger.info("creating %s", archive_dir)
966 if not dry_run:
967 os.makedirs(archive_dir)
968
969 if logger is not None:
970 logger.info("creating '%s' and adding '%s' to it",
971 zip_filename, base_dir)
972
973 if not dry_run:
974 with zipfile.ZipFile(zip_filename, "w",
975 compression=zipfile.ZIP_DEFLATED) as zf:
976 path = os.path.normpath(base_dir)
977 if path != os.curdir:
978 zf.write(path, path)
979 if logger is not None:
980 logger.info("adding '%s'", path)
981 for dirpath, dirnames, filenames in os.walk(base_dir):
982 for name in sorted(dirnames):
983 path = os.path.normpath(os.path.join(dirpath, name))
984 zf.write(path, path)
985 if logger is not None:
986 logger.info("adding '%s'", path)
987 for name in filenames:
988 path = os.path.normpath(os.path.join(dirpath, name))
989 if os.path.isfile(path):
990 zf.write(path, path)
991 if logger is not None:
992 logger.info("adding '%s'", path)
993
994 return zip_filename
995
996_ARCHIVE_FORMATS = {
997 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
998}
999
1000if _ZLIB_SUPPORTED:
1001 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
1002 "gzip'ed tar-file")
1003 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
1004
1005if _BZ2_SUPPORTED:
1006 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
1007 "bzip2'ed tar-file")
1008
1009if _LZMA_SUPPORTED:
1010 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1011 "xz'ed tar-file")
1012
1013def get_archive_formats():
1014 """Returns a list of supported formats for archiving and unarchiving.
1015
1016 Each element of the returned sequence is a tuple (name, description)
1017 """
1018 formats = [(name, registry[2]) for name, registry in
1019 _ARCHIVE_FORMATS.items()]
1020 formats.sort()
1021 return formats
1022
1023def register_archive_format(name, function, extra_args=None, description=''):
1024 """Registers an archive format.
1025
1026 name is the name of the format. function is the callable that will be
1027 used to create archives. If provided, extra_args is a sequence of
1028 (name, value) tuples that will be passed as arguments to the callable.
1029 description can be provided to describe the format, and will be returned
1030 by the get_archive_formats() function.
1031 """
1032 if extra_args is None:
1033 extra_args = []
1034 if not callable(function):
1035 raise TypeError('The %s object is not callable' % function)
1036 if not isinstance(extra_args, (tuple, list)):
1037 raise TypeError('extra_args needs to be a sequence')
1038 for element in extra_args:
1039 if not isinstance(element, (tuple, list)) or len(element) !=2:
1040 raise TypeError('extra_args elements are : (arg_name, value)')
1041
1042 _ARCHIVE_FORMATS[name] = (function, extra_args, description)
1043
1044def unregister_archive_format(name):
1045 del _ARCHIVE_FORMATS[name]
1046
1047def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1048 dry_run=0, owner=None, group=None, logger=None):
1049 """Create an archive file (eg. zip or tar).
1050
1051 'base_name' is the name of the file to create, minus any format-specific
1052 extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1053 "bztar", or "xztar". Or any other registered format.
1054
1055 'root_dir' is a directory that will be the root directory of the
1056 archive; ie. we typically chdir into 'root_dir' before creating the
1057 archive. 'base_dir' is the directory where we start archiving from;
1058 ie. 'base_dir' will be the common prefix of all files and
1059 directories in the archive. 'root_dir' and 'base_dir' both default
1060 to the current directory. Returns the name of the archive file.
1061
1062 'owner' and 'group' are used when creating a tar archive. By default,
1063 uses the current owner and group.
1064 """
1065 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1066 save_cwd = os.getcwd()
1067 if root_dir is not None:
1068 if logger is not None:
1069 logger.debug("changing into '%s'", root_dir)
1070 base_name = os.path.abspath(base_name)
1071 if not dry_run:
1072 os.chdir(root_dir)
1073
1074 if base_dir is None:
1075 base_dir = os.curdir
1076
1077 kwargs = {'dry_run': dry_run, 'logger': logger}
1078
1079 try:
1080 format_info = _ARCHIVE_FORMATS[format]
1081 except KeyError:
1082 raise ValueError("unknown archive format '%s'" % format) from None
1083
1084 func = format_info[0]
1085 for arg, val in format_info[1]:
1086 kwargs[arg] = val
1087
1088 if format != 'zip':
1089 kwargs['owner'] = owner
1090 kwargs['group'] = group
1091
1092 try:
1093 filename = func(base_name, base_dir, **kwargs)
1094 finally:
1095 if root_dir is not None:
1096 if logger is not None:
1097 logger.debug("changing back to '%s'", save_cwd)
1098 os.chdir(save_cwd)
1099
1100 return filename
1101
1102
1103def get_unpack_formats():
1104 """Returns a list of supported formats for unpacking.
1105
1106 Each element of the returned sequence is a tuple
1107 (name, extensions, description)
1108 """
1109 formats = [(name, info[0], info[3]) for name, info in
1110 _UNPACK_FORMATS.items()]
1111 formats.sort()
1112 return formats
1113
1114def _check_unpack_options(extensions, function, extra_args):
1115 """Checks what gets registered as an unpacker."""
1116 # first make sure no other unpacker is registered for this extension
1117 existing_extensions = {}
1118 for name, info in _UNPACK_FORMATS.items():
1119 for ext in info[0]:
1120 existing_extensions[ext] = name
1121
1122 for extension in extensions:
1123 if extension in existing_extensions:
1124 msg = '%s is already registered for "%s"'
1125 raise RegistryError(msg % (extension,
1126 existing_extensions[extension]))
1127
1128 if not callable(function):
1129 raise TypeError('The registered function must be a callable')
1130
1131
1132def register_unpack_format(name, extensions, function, extra_args=None,
1133 description=''):
1134 """Registers an unpack format.
1135
1136 `name` is the name of the format. `extensions` is a list of extensions
1137 corresponding to the format.
1138
1139 `function` is the callable that will be
1140 used to unpack archives. The callable will receive archives to unpack.
1141 If it's unable to handle an archive, it needs to raise a ReadError
1142 exception.
1143
1144 If provided, `extra_args` is a sequence of
1145 (name, value) tuples that will be passed as arguments to the callable.
1146 description can be provided to describe the format, and will be returned
1147 by the get_unpack_formats() function.
1148 """
1149 if extra_args is None:
1150 extra_args = []
1151 _check_unpack_options(extensions, function, extra_args)
1152 _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1153
1154def unregister_unpack_format(name):
1155 """Removes the pack format from the registry."""
1156 del _UNPACK_FORMATS[name]
1157
1158def _ensure_directory(path):
1159 """Ensure that the parent directory of `path` exists"""
1160 dirname = os.path.dirname(path)
1161 if not os.path.isdir(dirname):
1162 os.makedirs(dirname)
1163
1164def _unpack_zipfile(filename, extract_dir):
1165 """Unpack zip `filename` to `extract_dir`
1166 """
1167 import zipfile # late import for breaking circular dependency
1168
1169 if not zipfile.is_zipfile(filename):
1170 raise ReadError("%s is not a zip file" % filename)
1171
1172 zip = zipfile.ZipFile(filename)
1173 try:
1174 for info in zip.infolist():
1175 name = info.filename
1176
1177 # don't extract absolute paths or ones with .. in them
1178 if name.startswith('/') or '..' in name:
1179 continue
1180
Yi Kong71199322022-08-30 15:53:45 +08001181 targetpath = os.path.join(extract_dir, *name.split('/'))
1182 if not targetpath:
Haibo Huangd8830302020-03-03 10:09:46 -08001183 continue
1184
Yi Kong71199322022-08-30 15:53:45 +08001185 _ensure_directory(targetpath)
Haibo Huangd8830302020-03-03 10:09:46 -08001186 if not name.endswith('/'):
1187 # file
Yi Kong71199322022-08-30 15:53:45 +08001188 with zip.open(name, 'r') as source, \
1189 open(targetpath, 'wb') as target:
1190 copyfileobj(source, target)
Haibo Huangd8830302020-03-03 10:09:46 -08001191 finally:
1192 zip.close()
1193
1194def _unpack_tarfile(filename, extract_dir):
1195 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1196 """
1197 import tarfile # late import for breaking circular dependency
1198 try:
1199 tarobj = tarfile.open(filename)
1200 except tarfile.TarError:
1201 raise ReadError(
1202 "%s is not a compressed or uncompressed tar file" % filename)
1203 try:
1204 tarobj.extractall(extract_dir)
1205 finally:
1206 tarobj.close()
1207
1208_UNPACK_FORMATS = {
1209 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1210 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),
1211}
1212
1213if _ZLIB_SUPPORTED:
1214 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1215 "gzip'ed tar-file")
1216
1217if _BZ2_SUPPORTED:
1218 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1219 "bzip2'ed tar-file")
1220
1221if _LZMA_SUPPORTED:
1222 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1223 "xz'ed tar-file")
1224
1225def _find_unpack_format(filename):
1226 for name, info in _UNPACK_FORMATS.items():
1227 for extension in info[0]:
1228 if filename.endswith(extension):
1229 return name
1230 return None
1231
1232def unpack_archive(filename, extract_dir=None, format=None):
1233 """Unpack an archive.
1234
1235 `filename` is the name of the archive.
1236
1237 `extract_dir` is the name of the target directory, where the archive
1238 is unpacked. If not provided, the current working directory is used.
1239
1240 `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1241 or "xztar". Or any other registered format. If not provided,
1242 unpack_archive will use the filename extension and see if an unpacker
1243 was registered for that extension.
1244
1245 In case none is found, a ValueError is raised.
1246 """
Haibo Huang5980f852020-03-05 12:22:08 -08001247 sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1248
Haibo Huangd8830302020-03-03 10:09:46 -08001249 if extract_dir is None:
1250 extract_dir = os.getcwd()
1251
1252 extract_dir = os.fspath(extract_dir)
1253 filename = os.fspath(filename)
1254
1255 if format is not None:
1256 try:
1257 format_info = _UNPACK_FORMATS[format]
1258 except KeyError:
1259 raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1260
1261 func = format_info[1]
1262 func(filename, extract_dir, **dict(format_info[2]))
1263 else:
1264 # we need to look at the registered unpackers supported extensions
1265 format = _find_unpack_format(filename)
1266 if format is None:
1267 raise ReadError("Unknown archive format '{0}'".format(filename))
1268
1269 func = _UNPACK_FORMATS[format][1]
1270 kwargs = dict(_UNPACK_FORMATS[format][2])
1271 func(filename, extract_dir, **kwargs)
1272
1273
1274if hasattr(os, 'statvfs'):
1275
1276 __all__.append('disk_usage')
1277 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1278 _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1279 _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1280 _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1281
1282 def disk_usage(path):
1283 """Return disk usage statistics about the given path.
1284
1285 Returned value is a named tuple with attributes 'total', 'used' and
1286 'free', which are the amount of total, used and free space, in bytes.
1287 """
1288 st = os.statvfs(path)
1289 free = st.f_bavail * st.f_frsize
1290 total = st.f_blocks * st.f_frsize
1291 used = (st.f_blocks - st.f_bfree) * st.f_frsize
1292 return _ntuple_diskusage(total, used, free)
1293
1294elif _WINDOWS:
1295
1296 __all__.append('disk_usage')
1297 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1298
1299 def disk_usage(path):
1300 """Return disk usage statistics about the given path.
1301
1302 Returned values is a named tuple with attributes 'total', 'used' and
1303 'free', which are the amount of total, used and free space, in bytes.
1304 """
1305 total, free = nt._getdiskusage(path)
1306 used = total - free
1307 return _ntuple_diskusage(total, used, free)
1308
1309
1310def chown(path, user=None, group=None):
1311 """Change owner user and group of the given path.
1312
1313 user and group can be the uid/gid or the user/group names, and in that case,
1314 they are converted to their respective uid/gid.
1315 """
Haibo Huang5980f852020-03-05 12:22:08 -08001316 sys.audit('shutil.chown', path, user, group)
Haibo Huangd8830302020-03-03 10:09:46 -08001317
1318 if user is None and group is None:
1319 raise ValueError("user and/or group must be set")
1320
1321 _user = user
1322 _group = group
1323
1324 # -1 means don't change it
1325 if user is None:
1326 _user = -1
1327 # user can either be an int (the uid) or a string (the system username)
1328 elif isinstance(user, str):
1329 _user = _get_uid(user)
1330 if _user is None:
1331 raise LookupError("no such user: {!r}".format(user))
1332
1333 if group is None:
1334 _group = -1
1335 elif not isinstance(group, int):
1336 _group = _get_gid(group)
1337 if _group is None:
1338 raise LookupError("no such group: {!r}".format(group))
1339
1340 os.chown(path, _user, _group)
1341
1342def get_terminal_size(fallback=(80, 24)):
1343 """Get the size of the terminal window.
1344
1345 For each of the two dimensions, the environment variable, COLUMNS
1346 and LINES respectively, is checked. If the variable is defined and
1347 the value is a positive integer, it is used.
1348
1349 When COLUMNS or LINES is not defined, which is the common case,
1350 the terminal connected to sys.__stdout__ is queried
1351 by invoking os.get_terminal_size.
1352
1353 If the terminal size cannot be successfully queried, either because
1354 the system doesn't support querying, or because we are not
1355 connected to a terminal, the value given in fallback parameter
1356 is used. Fallback defaults to (80, 24) which is the default
1357 size used by many terminal emulators.
1358
1359 The value returned is a named tuple of type os.terminal_size.
1360 """
1361 # columns, lines are the working values
1362 try:
1363 columns = int(os.environ['COLUMNS'])
1364 except (KeyError, ValueError):
1365 columns = 0
1366
1367 try:
1368 lines = int(os.environ['LINES'])
1369 except (KeyError, ValueError):
1370 lines = 0
1371
1372 # only query if necessary
1373 if columns <= 0 or lines <= 0:
1374 try:
1375 size = os.get_terminal_size(sys.__stdout__.fileno())
1376 except (AttributeError, ValueError, OSError):
1377 # stdout is None, closed, detached, or not a terminal, or
1378 # os.get_terminal_size() is unsupported
1379 size = os.terminal_size(fallback)
1380 if columns <= 0:
1381 columns = size.columns
1382 if lines <= 0:
1383 lines = size.lines
1384
1385 return os.terminal_size((columns, lines))
1386
1387
1388# Check that a given file can be accessed with the correct mode.
1389# Additionally check that `file` is not a directory, as on Windows
1390# directories pass the os.access check.
1391def _access_check(fn, mode):
1392 return (os.path.exists(fn) and os.access(fn, mode)
1393 and not os.path.isdir(fn))
1394
1395
1396def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1397 """Given a command, mode, and a PATH string, return the path which
1398 conforms to the given mode on the PATH, or None if there is no such
1399 file.
1400
1401 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1402 of os.environ.get("PATH"), or can be overridden with a custom search
1403 path.
1404
1405 """
1406 # If we're given a path with a directory part, look it up directly rather
1407 # than referring to PATH directories. This includes checking relative to the
1408 # current directory, e.g. ./script
1409 if os.path.dirname(cmd):
1410 if _access_check(cmd, mode):
1411 return cmd
1412 return None
1413
1414 use_bytes = isinstance(cmd, bytes)
1415
1416 if path is None:
1417 path = os.environ.get("PATH", None)
1418 if path is None:
1419 try:
1420 path = os.confstr("CS_PATH")
1421 except (AttributeError, ValueError):
1422 # os.confstr() or CS_PATH is not available
1423 path = os.defpath
1424 # bpo-35755: Don't use os.defpath if the PATH environment variable is
1425 # set to an empty string
1426
1427 # PATH='' doesn't match, whereas PATH=':' looks in the current directory
1428 if not path:
1429 return None
1430
1431 if use_bytes:
1432 path = os.fsencode(path)
1433 path = path.split(os.fsencode(os.pathsep))
1434 else:
1435 path = os.fsdecode(path)
1436 path = path.split(os.pathsep)
1437
1438 if sys.platform == "win32":
1439 # The current directory takes precedence on Windows.
1440 curdir = os.curdir
1441 if use_bytes:
1442 curdir = os.fsencode(curdir)
1443 if curdir not in path:
1444 path.insert(0, curdir)
1445
1446 # PATHEXT is necessary to check on Windows.
Haibo Huang5eba2b42021-01-22 11:22:02 -08001447 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1448 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
1449
Haibo Huangd8830302020-03-03 10:09:46 -08001450 if use_bytes:
1451 pathext = [os.fsencode(ext) for ext in pathext]
1452 # See if the given file matches any of the expected path extensions.
1453 # This will allow us to short circuit when given "python.exe".
1454 # If it does match, only test that one, otherwise we have to try
1455 # others.
1456 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
1457 files = [cmd]
1458 else:
1459 files = [cmd + ext for ext in pathext]
1460 else:
1461 # On other platforms you don't have things like PATHEXT to tell you
1462 # what file suffixes are executable, so just pass on cmd as-is.
1463 files = [cmd]
1464
1465 seen = set()
1466 for dir in path:
1467 normdir = os.path.normcase(dir)
1468 if not normdir in seen:
1469 seen.add(normdir)
1470 for thefile in files:
1471 name = os.path.join(dir, thefile)
1472 if _access_check(name, mode):
1473 return name
1474 return None