| """Adapter for https://github.com/charliermarsh/ruff.""" |
| |
| from __future__ import annotations |
| |
| import argparse |
| import concurrent.futures |
| import dataclasses |
| import enum |
| import json |
| import logging |
| import os |
| import subprocess |
| import sys |
| import time |
| from typing import Any, BinaryIO |
| |
| |
| LINTER_CODE = "RUFF" |
| IS_WINDOWS: bool = os.name == "nt" |
| |
| |
| def eprint(*args: Any, **kwargs: Any) -> None: |
| """Print to stderr.""" |
| print(*args, file=sys.stderr, flush=True, **kwargs) |
| |
| |
| class LintSeverity(str, enum.Enum): |
| """Severity of a lint message.""" |
| |
| ERROR = "error" |
| WARNING = "warning" |
| ADVICE = "advice" |
| DISABLED = "disabled" |
| |
| |
| @dataclasses.dataclass(frozen=True) |
| class LintMessage: |
| """A lint message defined by https://docs.rs/lintrunner/latest/lintrunner/lint_message/struct.LintMessage.html.""" |
| |
| path: str | None |
| line: int | None |
| char: int | None |
| code: str |
| severity: LintSeverity |
| name: str |
| original: str | None |
| replacement: str | None |
| description: str | None |
| |
| def asdict(self) -> dict[str, Any]: |
| return dataclasses.asdict(self) |
| |
| def display(self) -> None: |
| """Print to stdout for lintrunner to consume.""" |
| print(json.dumps(self.asdict()), flush=True) |
| |
| |
| def as_posix(name: str) -> str: |
| return name.replace("\\", "/") if IS_WINDOWS else name |
| |
| |
| def _run_command( |
| args: list[str], |
| *, |
| timeout: int | None, |
| stdin: BinaryIO | None, |
| input: bytes | None, |
| check: bool, |
| cwd: os.PathLike[Any] | None, |
| ) -> subprocess.CompletedProcess[bytes]: |
| logging.debug("$ %s", " ".join(args)) |
| start_time = time.monotonic() |
| try: |
| if input is not None: |
| return subprocess.run( |
| args, |
| capture_output=True, |
| shell=False, |
| input=input, |
| timeout=timeout, |
| check=check, |
| cwd=cwd, |
| ) |
| |
| return subprocess.run( |
| args, |
| stdin=stdin, |
| capture_output=True, |
| shell=False, |
| timeout=timeout, |
| check=check, |
| cwd=cwd, |
| ) |
| finally: |
| end_time = time.monotonic() |
| logging.debug("took %dms", (end_time - start_time) * 1000) |
| |
| |
| def run_command( |
| args: list[str], |
| *, |
| retries: int = 0, |
| timeout: int | None = None, |
| stdin: BinaryIO | None = None, |
| input: bytes | None = None, |
| check: bool = False, |
| cwd: os.PathLike[Any] | None = None, |
| ) -> subprocess.CompletedProcess[bytes]: |
| remaining_retries = retries |
| while True: |
| try: |
| return _run_command( |
| args, timeout=timeout, stdin=stdin, input=input, check=check, cwd=cwd |
| ) |
| except subprocess.TimeoutExpired as err: |
| if remaining_retries == 0: |
| raise err |
| remaining_retries -= 1 |
| logging.warning( |
| "(%s/%s) Retrying because command failed with: %r", |
| retries - remaining_retries, |
| retries, |
| err, |
| ) |
| time.sleep(1) |
| |
| |
| def add_default_options(parser: argparse.ArgumentParser) -> None: |
| """Add default options to a parser. |
| |
| This should be called the last in the chain of add_argument calls. |
| """ |
| parser.add_argument( |
| "--retries", |
| type=int, |
| default=3, |
| help="number of times to retry if the linter times out.", |
| ) |
| parser.add_argument( |
| "--verbose", |
| action="store_true", |
| help="verbose logging", |
| ) |
| parser.add_argument( |
| "filenames", |
| nargs="+", |
| help="paths to lint", |
| ) |
| |
| |
| def explain_rule(code: str) -> str: |
| proc = run_command( |
| ["ruff", "rule", "--output-format=json", code], |
| check=True, |
| ) |
| rule = json.loads(str(proc.stdout, "utf-8").strip()) |
| return f"\n{rule['linter']}: {rule['summary']}" |
| |
| |
| def get_issue_severity(code: str) -> LintSeverity: |
| # "B901": `return x` inside a generator |
| # "B902": Invalid first argument to a method |
| # "B903": __slots__ efficiency |
| # "B950": Line too long |
| # "C4": Flake8 Comprehensions |
| # "C9": Cyclomatic complexity |
| # "E2": PEP8 horizontal whitespace "errors" |
| # "E3": PEP8 blank line "errors" |
| # "E5": PEP8 line length "errors" |
| # "T400": type checking Notes |
| # "T49": internal type checker errors or unmatched messages |
| if any( |
| code.startswith(x) |
| for x in ( |
| "B9", |
| "C4", |
| "C9", |
| "E2", |
| "E3", |
| "E5", |
| "T400", |
| "T49", |
| "PLC", |
| "PLR", |
| ) |
| ): |
| return LintSeverity.ADVICE |
| |
| # "F821": Undefined name |
| # "E999": syntax error |
| if any(code.startswith(x) for x in ("F821", "E999", "PLE")): |
| return LintSeverity.ERROR |
| |
| # "F": PyFlakes Error |
| # "B": flake8-bugbear Error |
| # "E": PEP8 "Error" |
| # "W": PEP8 Warning |
| # possibly other plugins... |
| return LintSeverity.WARNING |
| |
| |
| def format_lint_message( |
| message: str, code: str, rules: dict[str, str], show_disable: bool |
| ) -> str: |
| if rules: |
| message += f".\n{rules.get(code) or ''}" |
| message += ".\nSee https://beta.ruff.rs/docs/rules/" |
| if show_disable: |
| message += f".\n\nTo disable, use ` # noqa: {code}`" |
| return message |
| |
| |
| def check_files( |
| filenames: list[str], |
| severities: dict[str, LintSeverity], |
| *, |
| config: str | None, |
| retries: int, |
| timeout: int, |
| explain: bool, |
| show_disable: bool, |
| ) -> list[LintMessage]: |
| try: |
| proc = run_command( |
| [ |
| sys.executable, |
| "-m", |
| "ruff", |
| "check", |
| "--exit-zero", |
| "--quiet", |
| "--output-format=json", |
| *([f"--config={config}"] if config else []), |
| *filenames, |
| ], |
| retries=retries, |
| timeout=timeout, |
| check=True, |
| ) |
| except (OSError, subprocess.CalledProcessError) as err: |
| return [ |
| LintMessage( |
| path=None, |
| line=None, |
| char=None, |
| code=LINTER_CODE, |
| severity=LintSeverity.ERROR, |
| name="command-failed", |
| original=None, |
| replacement=None, |
| description=( |
| f"Failed due to {err.__class__.__name__}:\n{err}" |
| if not isinstance(err, subprocess.CalledProcessError) |
| else ( |
| f"COMMAND (exit code {err.returncode})\n" |
| f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" |
| f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" |
| f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" |
| ) |
| ), |
| ) |
| ] |
| |
| stdout = str(proc.stdout, "utf-8").strip() |
| vulnerabilities = json.loads(stdout) |
| |
| if explain: |
| all_codes = {v["code"] for v in vulnerabilities} |
| rules = {code: explain_rule(code) for code in all_codes} |
| else: |
| rules = {} |
| |
| return [ |
| LintMessage( |
| path=vuln["filename"], |
| name=vuln["code"], |
| description=( |
| format_lint_message( |
| vuln["message"], |
| vuln["code"], |
| rules, |
| show_disable, |
| ) |
| ), |
| line=int(vuln["location"]["row"]), |
| char=int(vuln["location"]["column"]), |
| code=LINTER_CODE, |
| severity=severities.get(vuln["code"], get_issue_severity(vuln["code"])), |
| original=None, |
| replacement=None, |
| ) |
| for vuln in vulnerabilities |
| ] |
| |
| |
| def check_file_for_fixes( |
| filename: str, |
| *, |
| config: str | None, |
| retries: int, |
| timeout: int, |
| ) -> list[LintMessage]: |
| try: |
| with open(filename, "rb") as f: |
| original = f.read() |
| with open(filename, "rb") as f: |
| proc_fix = run_command( |
| [ |
| sys.executable, |
| "-m", |
| "ruff", |
| "check", |
| "--fix-only", |
| "--exit-zero", |
| *([f"--config={config}"] if config else []), |
| "--stdin-filename", |
| filename, |
| "-", |
| ], |
| stdin=f, |
| retries=retries, |
| timeout=timeout, |
| check=True, |
| ) |
| except (OSError, subprocess.CalledProcessError) as err: |
| return [ |
| LintMessage( |
| path=None, |
| line=None, |
| char=None, |
| code=LINTER_CODE, |
| severity=LintSeverity.ERROR, |
| name="command-failed", |
| original=None, |
| replacement=None, |
| description=( |
| f"Failed due to {err.__class__.__name__}:\n{err}" |
| if not isinstance(err, subprocess.CalledProcessError) |
| else ( |
| f"COMMAND (exit code {err.returncode})\n" |
| f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" |
| f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" |
| f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" |
| ) |
| ), |
| ) |
| ] |
| |
| replacement = proc_fix.stdout |
| if original == replacement: |
| return [] |
| |
| return [ |
| LintMessage( |
| path=filename, |
| name="format", |
| description="Run `lintrunner -a` to apply this patch.", |
| line=None, |
| char=None, |
| code=LINTER_CODE, |
| severity=LintSeverity.WARNING, |
| original=original.decode("utf-8"), |
| replacement=replacement.decode("utf-8"), |
| ) |
| ] |
| |
| |
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description=f"Ruff linter. Linter code: {LINTER_CODE}. Use with RUFF-FIX to auto-fix issues.", |
| fromfile_prefix_chars="@", |
| ) |
| parser.add_argument( |
| "--config", |
| default=None, |
| help="Path to the `pyproject.toml` or `ruff.toml` file to use for configuration", |
| ) |
| parser.add_argument( |
| "--explain", |
| action="store_true", |
| help="Explain a rule", |
| ) |
| parser.add_argument( |
| "--show-disable", |
| action="store_true", |
| help="Show how to disable a lint message", |
| ) |
| parser.add_argument( |
| "--timeout", |
| default=90, |
| type=int, |
| help="Seconds to wait for ruff", |
| ) |
| parser.add_argument( |
| "--severity", |
| action="append", |
| help="map code to severity (e.g. `F401:advice`). This option can be used multiple times.", |
| ) |
| parser.add_argument( |
| "--no-fix", |
| action="store_true", |
| help="Do not suggest fixes", |
| ) |
| add_default_options(parser) |
| args = parser.parse_args() |
| |
| logging.basicConfig( |
| format="<%(threadName)s:%(levelname)s> %(message)s", |
| level=logging.NOTSET |
| if args.verbose |
| else logging.DEBUG |
| if len(args.filenames) < 1000 |
| else logging.INFO, |
| stream=sys.stderr, |
| ) |
| |
| severities: dict[str, LintSeverity] = {} |
| if args.severity: |
| for severity in args.severity: |
| parts = severity.split(":", 1) |
| assert len(parts) == 2, f"invalid severity `{severity}`" |
| severities[parts[0]] = LintSeverity(parts[1]) |
| |
| lint_messages = check_files( |
| args.filenames, |
| severities=severities, |
| config=args.config, |
| retries=args.retries, |
| timeout=args.timeout, |
| explain=args.explain, |
| show_disable=args.show_disable, |
| ) |
| for lint_message in lint_messages: |
| lint_message.display() |
| |
| if args.no_fix or not lint_messages: |
| # If we're not fixing, we can exit early |
| return |
| |
| files_with_lints = {lint.path for lint in lint_messages if lint.path is not None} |
| with concurrent.futures.ThreadPoolExecutor( |
| max_workers=os.cpu_count(), |
| thread_name_prefix="Thread", |
| ) as executor: |
| futures = { |
| executor.submit( |
| check_file_for_fixes, |
| path, |
| config=args.config, |
| retries=args.retries, |
| timeout=args.timeout, |
| ): path |
| for path in files_with_lints |
| } |
| for future in concurrent.futures.as_completed(futures): |
| try: |
| for lint_message in future.result(): |
| lint_message.display() |
| except Exception: # Catch all exceptions for lintrunner |
| logging.critical('Failed at "%s".', futures[future]) |
| raise |
| |
| |
| if __name__ == "__main__": |
| main() |