blob: 6cce4994d783293183853158411a484d561e1c21 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import argparse
import logging
import re
from typing import (TYPE_CHECKING, Any, Dict, Generic, List, Optional, Sequence,
Tuple, Type, TypeVar, cast)
from ordered_set import OrderedSet
from crossbench import helper
from crossbench.cli.parser import CrossBenchArgumentParser
from crossbench.flags.base import Flags
from crossbench.parse import ObjectParser
from crossbench.stories.press_benchmark import PressBenchmarkStory
from crossbench.stories.story import Story
if TYPE_CHECKING:
from crossbench import path as pth
from crossbench.browsers.browser import Browser
from crossbench.runner.runner import Runner
class BenchmarkProbeMixin:
NAME: str = ""
IS_GENERAL_PURPOSE: bool = False
def __init__(self, *args, **kwargs):
self._benchmark = kwargs.pop("benchmark")
assert isinstance(self._benchmark, Benchmark)
super().__init__(*args, **kwargs)
@property
def benchmark(self) -> Benchmark:
return self._benchmark
class Benchmark(abc.ABC):
NAME: str = ""
DEFAULT_STORY_CLS: Type[Story] = Story
PROBES: Tuple[Type[BenchmarkProbeMixin], ...] = ()
DEFAULT_REPETITIONS: int = 1
@classmethod
def cli_help(cls) -> str:
assert cls.__doc__, (f"Benchmark class {cls} must provide a doc string.")
# Return the first non-empty line
return cls.__doc__.strip().splitlines()[0]
@classmethod
def cli_description(cls) -> str:
assert cls.__doc__
return cls.__doc__.strip()
@classmethod
def cli_epilog(cls) -> str:
return ""
@classmethod
def aliases(cls) -> Tuple[str, ...]:
return tuple()
@classmethod
def add_cli_parser(
cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
parser = subparsers.add_parser(
cls.NAME,
formatter_class=argparse.RawDescriptionHelpFormatter,
help=cls.cli_help(),
description=cls.cli_description(),
epilog=cls.cli_epilog(),
aliases=aliases)
assert isinstance(parser, CrossBenchArgumentParser)
return parser
@classmethod
def describe(cls) -> Dict[str, Any]:
return {
"name": cls.NAME,
"description": "\n".join(helper.wrap_lines(cls.cli_description(), 70)),
"stories": [],
"probes-default": {
probe_cls.NAME:
"\n".join(
list(
helper.wrap_lines((probe_cls.__doc__ or "").strip(),
70))) for probe_cls in cls.PROBES
}
}
@classmethod
def default_probe_config_path(cls) -> Optional[pth.LocalPath]:
return None
@classmethod
def default_network_config_path(cls) -> Optional[pth.LocalPath]:
return None
@classmethod
def extra_flags(cls, browser: Browser) -> Flags:
del browser
return Flags()
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
del args
return {}
@classmethod
def from_cli_args(cls, args: argparse.Namespace) -> Benchmark:
kwargs = cls.kwargs_from_cli(args)
return cls(**kwargs)
def __init__(self, stories: Sequence[Story]) -> None:
assert self.NAME is not None, f"{self} has no .NAME property"
assert self.DEFAULT_STORY_CLS != Story, (
f"{self} has no .DEFAULT_STORY_CLS property")
self.stories: List[Story] = self._validate_stories(stories)
def _validate_stories(self, stories: Sequence[Story]) -> List[Story]:
assert stories, "No stories provided"
for story in stories:
assert isinstance(story, self.DEFAULT_STORY_CLS), (
f"story={story} should be a subclass/the same "
f"class as {self.DEFAULT_STORY_CLS}")
return list(stories)
def setup(self, runner: Runner) -> None:
del runner
StoryT = TypeVar("StoryT", bound=Story)
class StoryFilter(Generic[StoryT], metaclass=abc.ABCMeta):
CAN_COMBINE_STORIES: bool = True
@classmethod
def add_cli_parser(
cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument(
"--stories",
"--story",
dest="stories",
default="default",
help="Comma-separated list of story names. "
"Use 'all' for selecting all available stories. "
"Use 'default' for the standard selection of stories.")
if cls.CAN_COMBINE_STORIES:
is_combined_group = parser.add_mutually_exclusive_group()
is_combined_group.add_argument(
"--combined",
dest="separate",
default=False,
action="store_false",
help="Run each story in the same session. (default)")
is_combined_group.add_argument(
"--separate",
action="store_true",
help="Run each story in a fresh browser.")
return parser
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
return {"patterns": args.stories.split(",")}
@classmethod
def from_cli_args(cls, story_cls: Type[StoryT],
args: argparse.Namespace) -> StoryFilter[StoryT]:
kwargs = cls.kwargs_from_cli(args)
return cls(story_cls, **kwargs)
def __init__(self,
story_cls: Type[StoryT],
patterns: Sequence[str],
separate: bool = False) -> None:
self.story_cls: Type[StoryT] = story_cls
assert issubclass(
story_cls, Story), (f"Subclass of {Story} expected, found {story_cls}")
# Using order-preserving dict instead of set
self._known_names: Dict[str,
None] = dict.fromkeys(story_cls.all_story_names())
self.stories: Sequence[StoryT] = []
# TODO: only use one method.
self.process_all(patterns)
self.stories = self.create_stories(separate)
@abc.abstractmethod
def process_all(self, patterns: Sequence[str]) -> None:
pass
@abc.abstractmethod
def create_stories(self, separate: bool) -> Sequence[StoryT]:
pass
class SubStoryBenchmark(Benchmark, metaclass=abc.ABCMeta):
STORY_FILTER_CLS: Type[StoryFilter] = StoryFilter
@classmethod
def add_cli_parser(
cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
parser = super().add_cli_parser(subparsers, aliases)
return parser
@classmethod
def cli_description(cls) -> str:
desc = super().cli_description()
desc += "\n\n"
desc += ("Stories (alternatively use the 'describe benchmark "
f"{cls.NAME}' command):\n")
desc += ", ".join(cls.all_story_names())
desc += "\n\n"
desc += "Filtering (for --stories): "
assert cls.STORY_FILTER_CLS.__doc__, (
f"{cls.STORY_FILTER_CLS} has no doc string.")
desc += cls.STORY_FILTER_CLS.__doc__.strip()
return desc
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
kwargs = super().kwargs_from_cli(args)
kwargs["stories"] = cls.stories_from_cli_args(args)
return kwargs
@classmethod
def stories_from_cli_args(cls, args: argparse.Namespace) -> Sequence[Story]:
return cls.STORY_FILTER_CLS.from_cli_args(cls.DEFAULT_STORY_CLS,
args).stories
@classmethod
def describe(cls) -> Dict[str, Any]:
data = super().describe()
data["stories"] = cls.all_story_names()
return data
@classmethod
def all_story_names(cls) -> Sequence[str]:
return sorted(cls.DEFAULT_STORY_CLS.all_story_names())
PressBenchmarkStoryT = TypeVar(
"PressBenchmarkStoryT", bound=PressBenchmarkStory)
class PressBenchmarkStoryFilter(StoryFilter[PressBenchmarkStoryT],
Generic[PressBenchmarkStoryT]):
"""
Filter stories by name or regexp.
Syntax:
"all" Include all stories (defaults to story_names).
"name" Include story with the given name.
"-name" Exclude story with the given name'
"foo.*" Include stories whose name matches the regexp.
"-foo.*" Exclude stories whose name matches the regexp.
These patterns can be combined:
[".*", "-foo", "-bar"] Includes all except the "foo" and "bar" story
"""
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
kwargs = super().kwargs_from_cli(args)
kwargs["separate"] = args.separate
kwargs["url"] = args.custom_benchmark_url
return kwargs
def __init__(self,
story_cls: Type[PressBenchmarkStoryT],
patterns: Sequence[str],
separate: bool = False,
url: Optional[str] = None):
self.url: Optional[str] = url
self._selected_names: OrderedSet[str] = OrderedSet()
super().__init__(story_cls, patterns, separate)
assert issubclass(self.story_cls, PressBenchmarkStory)
for name in self._known_names:
assert name, "Invalid empty story name"
assert not name.startswith("-"), (
f"Known story names cannot start with '-', but got '{name}'.")
assert not name == "all", "Known story name cannot match 'all'."
def process_all(self, patterns: Sequence[str]) -> None:
if not isinstance(patterns, (list, tuple)):
raise ValueError("Expected Sequence of story name or patterns "
f"but got '{type(patterns)}'.")
for pattern in patterns:
self.process_pattern(pattern)
def process_pattern(self, pattern: str) -> None:
if pattern.startswith("-"):
self.remove(pattern[1:])
else:
self.add(pattern)
def add(self, pattern: str) -> None:
self._check_processed_pattern(pattern)
regexp = self._pattern_to_regexp(pattern)
self._add_matching(regexp, pattern)
def remove(self, pattern: str) -> None:
self._check_processed_pattern(pattern)
regexp = self._pattern_to_regexp(pattern)
self._remove_matching(regexp, pattern)
def _pattern_to_regexp(self, pattern: str) -> re.Pattern:
if pattern == "all":
return re.compile(".*")
if pattern == "default":
default_story_names = self.story_cls.default_story_names()
if default_story_names == self.story_cls.all_story_names():
return re.compile(".*")
joined_names = "|".join(re.escape(name) for name in default_story_names)
return re.compile(f"^({joined_names})$")
if pattern in self._known_names:
return re.compile(re.escape(pattern))
return re.compile(pattern)
def _check_processed_pattern(self, pattern: str) -> None:
if not pattern:
raise ValueError("Empty pattern is not allowed")
if pattern == "-":
raise ValueError(f"Empty remove pattern not allowed: '{pattern}'")
if pattern[0] == "-":
raise ValueError(f"Unprocessed negative pattern not allowed: '{pattern}'")
def _add_matching(self, regexp: re.Pattern, original_pattern: str) -> None:
substories = self._regexp_match(regexp, original_pattern)
self._selected_names.update(substories)
def _remove_matching(self, regexp: re.Pattern, original_pattern: str) -> None:
substories = self._regexp_match(regexp, original_pattern)
for substory in substories:
try:
self._selected_names.remove(substory)
except KeyError as e:
raise ValueError(
"Removing Story failed: "
f"name='{substory}' extracted by pattern='{original_pattern}'"
"is not in the filtered story list") from e
def _regexp_match(self, regexp: re.Pattern,
original_pattern: str) -> List[str]:
substories = [
substory for substory in self._known_names if regexp.fullmatch(substory)
]
if not substories:
logging.warning(
"No matching stories, using case-insensitive fallback regexp.")
iregexp: re.Pattern = re.compile(regexp.pattern, flags=re.IGNORECASE)
substories = [
substory for substory in self._known_names
if iregexp.fullmatch(substory)
]
if not substories:
raise ValueError(f"'{original_pattern}' didn't match any stories.")
if len(substories) == len(self._known_names) and self._selected_names:
raise ValueError(f"'{original_pattern}' matched all and overrode all"
"previously filtered story names.")
return substories
def create_stories(self, separate: bool) -> Sequence[PressBenchmarkStoryT]:
logging.info("SELECTED STORIES: %s",
str(list(map(str, self._selected_names))))
names = list(self._selected_names)
return self.create_stories_from_names(names, separate)
def create_stories_from_names(
self, names: List[str], separate: bool) -> Sequence[PressBenchmarkStoryT]:
return self.story_cls.from_names(names, separate=separate, url=self.url)
class PressBenchmark(SubStoryBenchmark):
STORY_FILTER_CLS = PressBenchmarkStoryFilter
DEFAULT_STORY_CLS: Type[PressBenchmarkStory] = PressBenchmarkStory
@classmethod
@abc.abstractmethod
def short_base_name(cls) -> str:
raise NotImplementedError()
@classmethod
@abc.abstractmethod
def base_name(cls) -> str:
raise NotImplementedError()
@classmethod
@abc.abstractmethod
def version(cls) -> Tuple[int, ...]:
raise NotImplementedError()
@classmethod
def aliases(cls) -> Tuple[str, ...]:
version = [str(v) for v in cls.version()]
assert version, "Expected non-empty version tuple."
version_names = []
dot_version = ".".join(version)
for name in (cls.short_base_name(), cls.base_name()):
assert name, "Expected non-empty base name."
version_names.append(f"{name}{dot_version}")
version_names.append(f"{name}_{dot_version}")
return tuple(version_names)
@classmethod
def add_cli_parser(
cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
parser = super().add_cli_parser(subparsers, aliases)
# TODO: Move story-related args to dedicated PressBenchmarkStoryFilter class
benchmark_url_group = parser.add_mutually_exclusive_group()
live_url = cls.DEFAULT_STORY_CLS.URL
local_url = cls.DEFAULT_STORY_CLS.URL_LOCAL
official_url = cls.DEFAULT_STORY_CLS.URL_OFFICIAL
benchmark_url_group.add_argument(
"--live",
"--live-url",
"--browser-ben",
dest="custom_benchmark_url",
const=None,
action="store_const",
help=(f"Use chrome live benchmark url ({live_url}) "
"on https://browserben.ch."))
benchmark_url_group.add_argument(
"--official",
"--official-url",
dest="custom_benchmark_url",
const=official_url,
action="store_const",
help=(f"Use officially hosted live/online benchmark url "
f"({official_url})."))
benchmark_url_group.add_argument(
"--local",
"--local-url",
"--url",
"--custom-benchmark-url",
type=ObjectParser.httpx_url_str,
nargs="?",
dest="custom_benchmark_url",
const=local_url,
help=(f"Use custom or locally (default={local_url}) "
"hosted benchmark url."))
cls.STORY_FILTER_CLS.add_cli_parser(parser)
return parser
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
kwargs = super().kwargs_from_cli(args)
kwargs["custom_url"] = args.custom_benchmark_url
return kwargs
@classmethod
def describe(cls) -> Dict[str, Any]:
data = super().describe()
assert issubclass(cls.DEFAULT_STORY_CLS, PressBenchmarkStory)
data["url"] = cls.DEFAULT_STORY_CLS.URL
data["url-official"] = cls.DEFAULT_STORY_CLS.URL_OFFICIAL
data["url-local"] = cls.DEFAULT_STORY_CLS.URL_LOCAL
return data
def __init__(self,
stories: Sequence[Story],
custom_url: Optional[str] = None):
super().__init__(stories)
self.custom_url = custom_url
if custom_url:
for story in stories:
press_story = cast(PressBenchmarkStory, story)
assert press_story.url == custom_url
def setup(self, runner: Runner) -> None:
super().setup(runner)
self.validate_url(runner)
def validate_url(self, runner: Runner) -> None:
if self.custom_url:
if runner.has_any_live_network():
self._validate_custom_url(runner, self.custom_url)
return
first_story = cast(PressBenchmarkStory, self.stories[0])
url = first_story.url
if not runner.has_all_live_network() and not url:
# For non-live networks we create a matching URL
return
if not url:
raise ValueError("Invalid empty url")
if all(runner.env.validate_url(url, p) for p in runner.platforms):
return
msg = [
f"Could not reach live benchmark URL: '{url}'."
f"Please make sure you're connected to the internet."
]
local_url = first_story.URL_LOCAL
if local_url:
msg.append(
f"Alternatively use --local for the default local URL: {local_url}")
raise ValueError("\n".join(msg))
def _validate_custom_url(self, runner: Runner, url: str) -> None:
if not all(runner.env.validate_url(url, p) for p in runner.platforms):
raise ValueError(
f"Could not reach custom benchmark URL: '{self.custom_url}'. "
f"Please make sure your local web server is running.")