blob: 97607e07fa0a6f63736c3d530787893e5bb16672 [file] [log] [blame]
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from collections import defaultdict
from difflib import SequenceMatcher
from typing import Any
import requests
from setuptools import distutils # type: ignore[import]
ALL_SKIPPED_THRESHOLD = 100
SIMILARITY_THRESHOLD = 0.75
FAILURE_CHAIN_THRESHOLD = 2
MAX_CONCURRENT_ALERTS = 1
FAILED_JOB_PATTERN = (
r"^- \[(.*)\]\(.*\) failed consecutively starting with commit \[.*\]\(.*\)$"
)
PENDING = "pending"
NEUTRAL = "neutral"
SKIPPED = "skipped"
SUCCESS = "success"
FAILURE = "failure"
CANCELED = "canceled"
ISSUES_WITH_LABEL_QUERY = """
query ($owner: String!, $name: String!, $labels: [String!]) {
repository(owner: $owner, name: $name, followRenames: false) {
issues(last: 10, labels: $labels, states: [OPEN]) {
nodes {
id
title
closed
number
body
createdAt
comments(first: 100) {
nodes {
bodyText
databaseId
}
}
}
}
}
}
"""
NUM_ISSUES_QUERY = """
query ($query: String!) {
search(type: ISSUE, query: $query) {
issueCount
}
}
"""
DISABLED_ALERTS = [
"rerun_disabled_tests",
"unstable",
]
class JobStatus:
job_name: str = ""
jobs: list[Any] = []
current_status: Any = None
job_statuses: list[Any] = []
filtered_statuses: list[Any] = []
failure_chain: list[Any] = []
flaky_jobs: list[Any] = []
def __init__(self, job_name: str, job_statuses: list[Any]) -> None:
self.job_name = job_name
self.job_statuses = job_statuses
self.filtered_statuses = list(
filter(lambda j: not is_job_skipped(j), job_statuses)
)
self.current_status = self.get_current_status()
self.failure_chain = self.get_most_recent_failure_chain()
self.flaky_jobs = self.get_flaky_jobs()
def get_current_status(self) -> Any:
"""
When getting the current status, we want the latest status which is not pending,
be it success or failure
"""
for status in self.filtered_statuses:
if status["conclusion"] != PENDING:
return status
return None
def get_unique_failures(self, jobs: list[Any]) -> dict[str, list[Any]]:
"""
Returns list of jobs grouped by failureCaptures from the input list
"""
failures = defaultdict(list)
for job in jobs:
if job["conclusion"] == "failure":
found_similar_failure = False
if "failureCaptures" not in job:
failures["unclassified"] = [job]
continue
# This is now a list returned by HUD API, not a string
failureCaptures = " ".join(job["failureCaptures"])
for failure in failures:
seq = SequenceMatcher(None, failureCaptures, failure)
if seq.ratio() > SIMILARITY_THRESHOLD:
failures[failure].append(job)
found_similar_failure = True
break
if not found_similar_failure:
failures[failureCaptures] = [job]
return failures
# A flaky job is if it's the only job that has that failureCapture and is not the most recent job
def get_flaky_jobs(self) -> list[Any]:
unique_failures = self.get_unique_failures(self.filtered_statuses)
flaky_jobs = []
for failure in unique_failures:
failure_list = unique_failures[failure]
if (
len(failure_list) == 1
and failure_list[0]["sha"] != self.current_status["sha"]
):
flaky_jobs.append(failure_list[0])
return flaky_jobs
# The most recent failure chain is an array of jobs that have the same-ish failures.
# A success in the middle of the chain will terminate the chain.
def get_most_recent_failure_chain(self) -> list[Any]:
failures = []
found_most_recent_failure = False
for job in self.filtered_statuses:
if is_job_failed(job):
failures.append(job)
found_most_recent_failure = True
if found_most_recent_failure and not is_job_failed(job):
break
return failures
def should_alert(self) -> bool:
# Group jobs by their failures. The length of the failure chain is used
# to raise the alert, so we can do a simple tweak here to use the length
# of the longest unique chain
unique_failures = self.get_unique_failures(self.failure_chain)
return (
self.current_status is not None
and self.current_status["conclusion"] != SUCCESS
and any(
len(failure_chain) >= FAILURE_CHAIN_THRESHOLD
for failure_chain in unique_failures.values()
)
and all(
disabled_alert not in self.job_name
for disabled_alert in DISABLED_ALERTS
)
)
def __repr__(self) -> str:
return f"jobName: {self.job_name}"
def fetch_hud_data(repo: str, branch: str) -> Any:
response = requests.get(f"https://hud.pytorch.org/api/hud/{repo}/{branch}/0")
response.raise_for_status()
hud_data = json.loads(response.text)
return (hud_data["jobNames"], hud_data["shaGrid"])
# Creates a Dict of Job Name -> [JobData]. Essentially a Column in HUD
def map_job_data(jobNames: Any, shaGrid: Any) -> dict[str, Any]:
jobData = defaultdict(list)
for sha in shaGrid:
for ind, job in enumerate(sha["jobs"]):
jobData[jobNames[ind]].append(job)
return jobData
def is_job_failed(job: Any) -> bool:
conclusion = job["conclusion"] if "conclusion" in job else None
return conclusion is not None and conclusion != SUCCESS and conclusion != PENDING
def is_job_skipped(job: Any) -> bool:
conclusion = job["conclusion"] if "conclusion" in job else None
return conclusion in (NEUTRAL, SKIPPED) or conclusion is None
def get_failed_jobs(job_data: list[Any]) -> list[Any]:
return [job for job in job_data if job["conclusion"] == "failure"]
def classify_jobs(
all_job_names: list[str], sha_grid: Any, filtered_jobs_names: set[str]
) -> tuple[list[JobStatus], list[Any]]:
"""
Creates Job Statuses which has the logic for if need to alert or if there's flaky jobs.
Classifies jobs into jobs to alert on and flaky jobs.
:param all_job_names: list of all job names as returned by the HUD
:param sha_grid: list of all job data as returned by the HUD (parallel index to all_job_names)
:param filtered_jobs_names: set of job names to actually consider
:return:
"""
job_data = map_job_data(all_job_names, sha_grid)
job_statuses: list[JobStatus] = []
for job in job_data:
job_statuses.append(JobStatus(job, job_data[job]))
jobs_to_alert_on = []
flaky_jobs = []
for job_status in job_statuses:
if job_status.job_name not in filtered_jobs_names:
continue
if job_status.should_alert():
jobs_to_alert_on.append(job_status)
flaky_jobs.extend(job_status.flaky_jobs)
return jobs_to_alert_on, flaky_jobs
# filter job names that don't match the regex
def filter_job_names(job_names: list[str], job_name_regex: str) -> list[str]:
if job_name_regex:
return [
job_name for job_name in job_names if re.match(job_name_regex, job_name)
]
return job_names
def get_recurrently_failing_jobs_alerts(
repo: str, branch: str, job_name_regex: str
) -> list[dict[str, Any]]:
job_names, sha_grid = fetch_hud_data(repo=repo, branch=branch)
filtered_job_names = set(filter_job_names(job_names, job_name_regex))
if job_name_regex:
print()
print(f"Filtered to {len(filtered_job_names)} jobs:")
if len(filtered_job_names) == 0:
print("No jobs matched the regex")
elif len(filtered_job_names) == len(job_names):
print("All jobs matched the regex")
else:
print("\n".join(filtered_job_names))
(recurrently_failing_jobs, flaky_jobs) = classify_jobs(
job_names, sha_grid, filtered_job_names
)
alerts = []
for job in recurrently_failing_jobs:
entry = {
"AlertType": "Recurrently Failing Job",
"AlertObject": job.job_name,
"OncallTeams": [],
"OncallIndividuals": [],
"Flags": [],
"sha": job.failure_chain[-1]["sha"],
"branch": branch,
}
alerts.append(entry)
return alerts
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo",
help="Repository to do checks for",
type=str,
default=os.getenv("REPO_TO_CHECK", "pytorch/pytorch"),
)
parser.add_argument(
"--branch",
help="Branch to do checks for",
type=str,
default=os.getenv("BRANCH_TO_CHECK", "main"),
)
parser.add_argument(
"--job-name-regex",
help="Consider only job names matching given regex (if omitted, all jobs are matched)",
type=str,
default=os.getenv("JOB_NAME_REGEX", ""),
)
parser.add_argument(
"--with-flaky-test-alert",
help="Run this script with the flaky test alerting",
type=distutils.util.strtobool,
default=os.getenv("WITH_FLAKY_TEST_ALERT", "YES"),
)
parser.add_argument(
"--dry-run",
help="Whether or not to actually post issues",
type=distutils.util.strtobool,
default=os.getenv("DRY_RUN", "YES"),
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
data = json.dumps(
get_recurrently_failing_jobs_alerts(args.repo, args.branch, args.job_name_regex)
)
print(data)