blob: 13527409f1af96c8ef471efbc16757359388b7ea [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
# Provide a markdown-formatted message summarizing the reasons why a pipeline failed.
# Marge bot can use this script to provide more helpful comments when CI fails.
# Example for running locally:
# ./bin/ci/ --project-id 176 --pipeline-id 1310098
import argparse
import asyncio
import logging
from typing import Any
import aiohttp
PER_PAGE: int = 6000
async def get_pipeline_status(
session: aiohttp.ClientSession, project_id: str, pipeline_id: str
url = f"{project_id}/pipelines/{pipeline_id}""Fetching pipeline status from {url}")
async with session.get(url) as response:
pipeline_details = await response.json()
return pipeline_details.get("status")
async def get_jobs_for_pipeline(
session: aiohttp.ClientSession, project_id: str, pipeline_id: str
url = f"{project_id}/pipelines/{pipeline_id}/jobs"
jobs = []
params = {"per_page": PER_PAGE}
async with session.get(url, params=params) as response:
jobs = await response.json()
return jobs
def get_problem_jobs(jobs: list[dict[str, Any]]):
ignore_stage_list = [
problem_jobs = []
for job in jobs:
if any(ignore.lower() in job["stage"] for ignore in ignore_stage_list):
if job["status"] in {"failed", "canceled"}:
return problem_jobs
def unexpected_improvements(failed_test_array):
if failed_test_array["unexpected_improvements"]:
unexpected_improvements_count = len(
return f" {unexpected_improvements_count} improved test{'s' if unexpected_improvements_count != 1 else ''}"
return ""
def fails(failed_test_array):
if failed_test_array["fails"]:
fails_count = len(failed_test_array["fails"])
return f" {fails_count} failed test{'s' if fails_count != 1 else ''}"
return ""
def crashes(failed_test_array):
if failed_test_array["crashes"]:
crash_count = len(failed_test_array["crashes"])
return f" {crash_count} crashed test{'s' if crash_count != 1 else ''}"
return ""
def get_failed_test_details(failed_test_array):
message = ""
max_tests_to_display = 5
if failed_test_array["unexpected_improvements"]:
for i, test in enumerate(failed_test_array["unexpected_improvements"]):
if i > max_tests_to_display:
message += " \nand more...<br>"
message += f"{test}<br>"
if failed_test_array["fails"]:
for i, test in enumerate(failed_test_array["fails"]):
if i > max_tests_to_display:
message += " \nand more...<br>"
message += f"{test}<br>"
if failed_test_array["crashes"]:
for i, test in enumerate(failed_test_array["crashes"]):
if i > max_tests_to_display:
message += " \nand more...<br>"
message += f"{test}<br>"
return message
def get_failed_test_summary_message(failed_test_array):
summary_msg = "<summary>"
summary_msg += unexpected_improvements(failed_test_array)
summary_msg += fails(failed_test_array)
summary_msg += crashes(failed_test_array)
summary_msg += "</summary>"
return summary_msg
def sort_failed_tests_by_status(failures_csv):
failed_test_array = {
"unexpected_improvements": [],
"fails": [],
"crashes": [],
"timeouts": [],
for test in failures_csv.splitlines():
if "UnexpectedImprovement" in test:
elif "Fail" in test:
elif "Crash" in test:
elif "Timeout" in test:
return failed_test_array
async def get_failures_csv(session, project_id, job):
job_id = job["id"]
url = f"{project_id}/jobs/{job_id}/artifacts/results/failures.csv"
async with session.get(url) as response:
if response.status == 200:
text = await response.text()
return text
logging.debug(f"No response from: {url}")
return ""
async def get_test_failures(session, project_id, job):
failures_csv = await get_failures_csv(session, project_id, job)
if not failures_csv:
return ""
# If just one test failed, don't bother with more complicated sorting
lines = failures_csv.splitlines()
if len(lines) == 1:
return ": " + lines[0] + "<br>"
failed_test_array = sort_failed_tests_by_status(failures_csv)
failures_msg = "<details>"
failures_msg += get_failed_test_summary_message(failed_test_array)
failures_msg += get_failed_test_details(failed_test_array)
failures_msg += "</details>"
return failures_msg
async def get_trace_failures(session, project_id, job):
project_json = await get_project_json(session, project_id)
path = project_json.get("path", "")
if not path:
return ""
job_id = job["id"]
url = f"{path}/-/jobs/{job_id}/artifacts/results/summary/problems.html"
async with session.get(url) as response:
if response.status == 200:
return url
logging.debug(f"No response from: {url}")
return ""
async def get_project_json(session, project_id):
url_project_id = f"{project_id}"
async with session.get(url_project_id) as response:
if response.status == 200:
return await response.json()
logging.debug(f"No response from: {url_project_id}")
return ""
async def get_job_log(session: aiohttp.ClientSession, project_id: str, job_id: int):
project_json = await get_project_json(session, project_id)
path_with_namespace = project_json.get("path_with_namespace", "")
if not path_with_namespace:
return ""
url_job_log = (
async with session.get(url_job_log) as response:
if response.status == 200:
return await response.text()
logging.debug(f"No response from job log: {url_job_log}")
return ""
async def search_job_log_for_errors(session, project_id, job):
log_error_message = ""
# Bypass these generic error messages in hopes of finding a more specific error.
# The entries are case insensitive. Keep them in alphabetical order and don't
# forget to add a comma after each entry
ignore_list = [
"403: b",
"building c",
"error_msg : None",
"error generated",
"errors generated",
"exit code",
"exit status",
"exiting now",
"job failed",
"no files to upload",
"performing test",
"ret code",
job_log = await get_job_log(session, project_id, job["id"])
for line in reversed(job_log.splitlines()):
if "fatal" in line.lower():
# remove date and formatting before fatal message
log_error_message = line[line.lower().find("fatal") :]
if "error" in line.lower():
if any(ignore.lower() in line.lower() for ignore in ignore_list):
# remove date and formatting before error message
log_error_message = line[line.lower().find("error") :].strip()
# if there is no further info after the word error then it's not helpful
# so reset the message and try again.
if log_error_message.lower() in {"error", "errors", "error:", "errors:"}:
log_error_message = ""
# timeout msg from .gitlab-ci/lava/
if "expected to take at least" in line.lower():
log_error_message = line
return log_error_message
async def process_single_job(session, project_id, job):
job_url = job.get("web_url", "")
if not job_url:"Job {job['name']} is missing a web_url")
job_name = job.get("name", "Unnamed Job")
message = f"[{job_name}]({job_url})"
# if a job times out it's cancelled, so worth mentioning here
if job["status"] == "canceled":
return f"{message}: canceled<br>"
# if it's not a script failure then all we can do is give the gitlab assigned reason
if job["failure_reason"] != "script_failure":
return f"{message}: {job['failure_reason']}<br>"
test_failures = await get_test_failures(session, project_id, job)
if test_failures:
return f"{message}{test_failures}"
trace_failures = await get_trace_failures(session, project_id, job)
if trace_failures:
return f"{message}: has a [trace failure]({trace_failures})<br>"
log_error_message = await search_job_log_for_errors(session, project_id, job)
if log_error_message:
return f"{message}: {log_error_message}<br>"
return f"{message}<br>"
async def process_job_with_limit(session, project_id, job):
# Use at most 10 concurrent tasks
semaphore = asyncio.Semaphore(10)
async with semaphore:
return await process_single_job(session, project_id, job)
async def process_problem_jobs(session, project_id, problem_jobs):
problem_jobs_count = len(problem_jobs)
if problem_jobs_count == 1:
message = f"<br>There were problems with job: "
message += await process_single_job(session, project_id, problem_jobs[0])
return message
message = f"<details>"
message += f"<summary>"
message += f"There were problems with {problem_jobs_count} jobs: "
message += "</summary>"
tasks = [process_job_with_limit(session, project_id, job) for job in problem_jobs]
results = await asyncio.gather(*tasks)
for result in results:
message += result
message += f"</details>"
return message
async def main(pipeline_id: str, project_id: str = "176") -> str:
message = ""
timeout = aiohttp.ClientTimeout(total=120)
async with aiohttp.ClientSession(timeout=timeout) as session:
pipeline_status = await get_pipeline_status(
session, project_id, pipeline_id
logging.debug(f"Pipeline status: {pipeline_status}")
if pipeline_status != "failed":
return message
jobs = await get_jobs_for_pipeline(session, project_id, pipeline_id)
problem_jobs = get_problem_jobs(jobs)
if len(problem_jobs) == 0:
return message
message = await process_problem_jobs(session, project_id, problem_jobs)
except Exception as e:
logging.error(f"An error occurred: {e}")
return ""
return message
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch GitLab pipeline details")
"--project-id", default="176", help="Project ID (default: 176 i.e. mesa/mesa)"
parser.add_argument("--pipeline-id", required=True, help="Pipeline ID")
args = parser.parse_args()
message =, args.project_id))