blob: 0a401e69fc8f8ec5d9e913e29b1a1696a1e9259e [file] [log] [blame]
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import subprocess
import sys
import time
from enum import Enum
from typing import Any, NamedTuple
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str | None
line: int | None
char: int | None
code: str
severity: LintSeverity
name: str
original: str | None
replacement: str | None
description: str | None
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
# fmt: off
# https://www.flake8rules.com/
DOCUMENTED_IN_FLAKE8RULES: set[str] = {
"E101", "E111", "E112", "E113", "E114", "E115", "E116", "E117",
"E121", "E122", "E123", "E124", "E125", "E126", "E127", "E128", "E129",
"E131", "E133",
"E201", "E202", "E203",
"E211",
"E221", "E222", "E223", "E224", "E225", "E226", "E227", "E228",
"E231",
"E241", "E242",
"E251",
"E261", "E262", "E265", "E266",
"E271", "E272", "E273", "E274", "E275",
"E301", "E302", "E303", "E304", "E305", "E306",
"E401", "E402",
"E501", "E502",
"E701", "E702", "E703", "E704",
"E711", "E712", "E713", "E714",
"E721", "E722",
"E731",
"E741", "E742", "E743",
"E901", "E902", "E999",
"W191",
"W291", "W292", "W293",
"W391",
"W503", "W504",
"W601", "W602", "W603", "W604", "W605",
"F401", "F402", "F403", "F404", "F405",
"F811", "F812",
"F821", "F822", "F823",
"F831",
"F841",
"F901",
"C901",
}
# https://pypi.org/project/flake8-comprehensions/#rules
DOCUMENTED_IN_FLAKE8COMPREHENSIONS: set[str] = {
"C400", "C401", "C402", "C403", "C404", "C405", "C406", "C407", "C408", "C409",
"C410",
"C411", "C412", "C413", "C414", "C415", "C416",
}
# https://github.com/PyCQA/flake8-bugbear#list-of-warnings
DOCUMENTED_IN_BUGBEAR: set[str] = {
"B001", "B002", "B003", "B004", "B005", "B006", "B007", "B008", "B009", "B010",
"B011", "B012", "B013", "B014", "B015",
"B301", "B302", "B303", "B304", "B305", "B306",
"B901", "B902", "B903", "B950",
}
# fmt: on
# stdin:2: W802 undefined name 'foo'
# stdin:3:6: T484 Name 'foo' is not defined
# stdin:3:-100: W605 invalid escape sequence '\/'
# stdin:3:1: E302 expected 2 blank lines, found 1
RESULTS_RE: re.Pattern[str] = re.compile(
r"""(?mx)
^
(?P<file>.*?):
(?P<line>\d+):
(?:(?P<column>-?\d+):)?
\s(?P<code>\S+?):?
\s(?P<message>.*)
$
"""
)
def _test_results_re() -> None:
"""
>>> def t(s): return RESULTS_RE.search(s).groupdict()
>>> t(r"file.py:80:1: E302 expected 2 blank lines, found 1")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '80', 'column': '1', 'code': 'E302',
'message': 'expected 2 blank lines, found 1'}
>>> t(r"file.py:7:1: P201: Resource `stdout` is acquired but not always released.")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '7', 'column': '1', 'code': 'P201',
'message': 'Resource `stdout` is acquired but not always released.'}
>>> t(r"file.py:8:-10: W605 invalid escape sequence '/'")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '8', 'column': '-10', 'code': 'W605',
'message': "invalid escape sequence '/'"}
"""
pass
def _run_command(
args: list[str],
*,
extra_env: dict[str, str] | None,
) -> subprocess.CompletedProcess[str]:
logging.debug(
"$ %s",
" ".join(
([f"{k}={v}" for (k, v) in extra_env.items()] if extra_env else []) + args
),
)
start_time = time.monotonic()
try:
return subprocess.run(
args,
capture_output=True,
check=True,
encoding="utf-8",
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
def run_command(
args: list[str],
*,
extra_env: dict[str, str] | None,
retries: int,
) -> subprocess.CompletedProcess[str]:
remaining_retries = retries
while True:
try:
return _run_command(args, extra_env=extra_env)
except subprocess.CalledProcessError as err:
if remaining_retries == 0 or not re.match(
r"^ERROR:1:1: X000 linting with .+ timed out after \d+ seconds",
err.stdout,
):
raise err
remaining_retries -= 1
logging.warning(
"(%s/%s) Retrying because command failed with: %r",
retries - remaining_retries,
retries,
err,
)
time.sleep(1)
def get_issue_severity(code: str) -> LintSeverity:
# "B901": `return x` inside a generator
# "B902": Invalid first argument to a method
# "B903": __slots__ efficiency
# "B950": Line too long
# "C4": Flake8 Comprehensions
# "C9": Cyclomatic complexity
# "E2": PEP8 horizontal whitespace "errors"
# "E3": PEP8 blank line "errors"
# "E5": PEP8 line length "errors"
# "F401": Name imported but unused
# "F403": Star imports used
# "F405": Name possibly from star imports
# "T400": type checking Notes
# "T49": internal type checker errors or unmatched messages
if any(
code.startswith(x)
for x in [
"B9",
"C4",
"C9",
"E2",
"E3",
"E5",
"F401",
"F403",
"F405",
"T400",
"T49",
]
):
return LintSeverity.ADVICE
# "F821": Undefined name
# "E999": syntax error
if any(code.startswith(x) for x in ["F821", "E999"]):
return LintSeverity.ERROR
# "F": PyFlakes Error
# "B": flake8-bugbear Error
# "E": PEP8 "Error"
# "W": PEP8 Warning
# possibly other plugins...
return LintSeverity.WARNING
def get_issue_documentation_url(code: str) -> str:
if code in DOCUMENTED_IN_FLAKE8RULES:
return f"https://www.flake8rules.com/rules/{code}.html"
if code in DOCUMENTED_IN_FLAKE8COMPREHENSIONS:
return "https://pypi.org/project/flake8-comprehensions/#rules"
if code in DOCUMENTED_IN_BUGBEAR:
return "https://github.com/PyCQA/flake8-bugbear#list-of-warnings"
return ""
def check_files(
filenames: list[str],
flake8_plugins_path: str | None,
severities: dict[str, LintSeverity],
retries: int,
) -> list[LintMessage]:
try:
proc = run_command(
[sys.executable, "-mflake8", "--exit-zero"] + filenames,
extra_env={"FLAKE8_PLUGINS_PATH": flake8_plugins_path}
if flake8_plugins_path
else None,
retries=retries,
)
except (OSError, subprocess.CalledProcessError) as err:
return [
LintMessage(
path=None,
line=None,
char=None,
code="FLAKE8",
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
if not isinstance(err, subprocess.CalledProcessError)
else (
"COMMAND (exit code {returncode})\n"
"{command}\n\n"
"STDERR\n{stderr}\n\n"
"STDOUT\n{stdout}"
).format(
returncode=err.returncode,
command=" ".join(as_posix(x) for x in err.cmd),
stderr=err.stderr.strip() or "(empty)",
stdout=err.stdout.strip() or "(empty)",
)
),
)
]
return [
LintMessage(
path=match["file"],
name=match["code"],
description=f"{match['message']}\nSee {get_issue_documentation_url(match['code'])}",
line=int(match["line"]),
char=int(match["column"])
if match["column"] is not None and not match["column"].startswith("-")
else None,
code="FLAKE8",
severity=severities.get(match["code"]) or get_issue_severity(match["code"]),
original=None,
replacement=None,
)
for match in RESULTS_RE.finditer(proc.stdout)
]
def main() -> None:
parser = argparse.ArgumentParser(
description="Flake8 wrapper linter.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--flake8-plugins-path",
help="FLAKE8_PLUGINS_PATH env value",
)
parser.add_argument(
"--severity",
action="append",
help="map code to severity (e.g. `B950:advice`)",
)
parser.add_argument(
"--retries",
default=3,
type=int,
help="times to retry timed out flake8",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
flake8_plugins_path = (
None
if args.flake8_plugins_path is None
else os.path.realpath(args.flake8_plugins_path)
)
severities: dict[str, LintSeverity] = {}
if args.severity:
for severity in args.severity:
parts = severity.split(":", 1)
assert len(parts) == 2, f"invalid severity `{severity}`"
severities[parts[0]] = LintSeverity(parts[1])
lint_messages = check_files(
args.filenames, flake8_plugins_path, severities, args.retries
)
for lint_message in lint_messages:
print(json.dumps(lint_message._asdict()), flush=True)
if __name__ == "__main__":
main()