| # Copyright 2024 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import annotations |
| |
| from typing import TYPE_CHECKING, Iterable, Optional |
| |
| from crossbench import exception |
| from crossbench.benchmarks.loading.action_runner.action_runner_listener import \ |
| ActionRunnerListener |
| from crossbench.benchmarks.loading.input_source import InputSource |
| |
| if TYPE_CHECKING: |
| from crossbench.benchmarks.loading import action as i_action |
| from crossbench.benchmarks.loading.config.pages import ActionBlock |
| from crossbench.benchmarks.loading.page import (CombinedPage, InteractivePage, |
| LivePage, Page) |
| from crossbench.benchmarks.loading.tab_controller import TabController |
| from crossbench.path import LocalPath |
| from crossbench.runner.run import Run |
| |
| |
| class ActionNotImplementedError(NotImplementedError): |
| |
| def __init__(self, |
| runner: ActionRunner, |
| action: i_action.Action, |
| msg_context: str = "") -> None: |
| self.runner = runner |
| self.action = action |
| |
| if msg_context: |
| msg_context = ". Context: " + msg_context |
| |
| message = (f"{str(action.TYPE).capitalize()}-action " |
| f"not implemented in {type(runner).__name__}{msg_context}") |
| super().__init__(message) |
| |
| |
| class InputSourceNotImplementedError(ActionNotImplementedError): |
| |
| def __init__(self, |
| runner: ActionRunner, |
| action: i_action.Action, |
| input_source: InputSource, |
| msg_context: str = "") -> None: |
| |
| if msg_context: |
| msg_context = ". Context: " + msg_context |
| |
| input_source_message = (f"Source: '{input_source}'" |
| f"not implemented{msg_context}") |
| |
| super().__init__(runner, action, input_source_message) |
| |
| |
| |
| class ActionRunner: |
| |
| def __init__(self): |
| self._listener = ActionRunnerListener() |
| |
| def set_listener(self, listener): |
| self._listener = listener |
| |
| # TODO: Don't share state across runs |
| _info_stack: Optional[exception.TInfoStack] |
| |
| # info_stack is a unique identifier for the currently running or most recently |
| # run action. |
| @property |
| def info_stack(self) -> exception.TInfoStack: |
| if not self._info_stack: |
| raise RuntimeError("info_stack can not be called before run_blocks") |
| return self._info_stack |
| |
| def run_blocks(self, run: Run, page: InteractivePage, |
| blocks: Iterable[ActionBlock]) -> None: |
| for block in blocks: |
| block.run_with(self, run, page) |
| |
| def run_block(self, run, block: ActionBlock) -> None: |
| block_index = block.index |
| # TODO: Instead maybe just pass context down. |
| # Or pass unique path to every action __init__ |
| with exception.annotate(f"Running block {block_index}: {block.label}"): |
| for action_index, action in enumerate(block, start=1): |
| self._info_stack = (f"block_{block_index}", f"action_{action_index}") |
| action.run_with(run, self) |
| |
| def wait(self, run: Run, action: i_action.WaitAction) -> None: |
| with run.actions("WaitAction", measure=False) as actions: |
| actions.wait(action.duration) |
| |
| def js(self, run: Run, action: i_action.JsAction) -> None: |
| with run.actions("JS", measure=False) as actions: |
| actions.js(action.script, action.timeout) |
| |
| def click(self, run: Run, action: i_action.ClickAction) -> None: |
| input_source = action.input_source |
| if input_source is InputSource.JS: |
| self.click_js(run, action) |
| elif input_source is InputSource.TOUCH: |
| self.click_touch(run, action) |
| elif input_source is InputSource.MOUSE: |
| self.click_mouse(run, action) |
| else: |
| raise RuntimeError(f"Unsupported input source: '{input_source}'") |
| |
| def scroll(self, run: Run, action: i_action.ScrollAction) -> None: |
| input_source = action.input_source |
| if input_source is InputSource.JS: |
| self.scroll_js(run, action) |
| elif input_source is InputSource.TOUCH: |
| self.scroll_touch(run, action) |
| elif input_source is InputSource.MOUSE: |
| self.scroll_mouse(run, action) |
| else: |
| raise RuntimeError(f"Unsupported input source: '{input_source}'") |
| |
| def get(self, run: Run, action: i_action.GetAction) -> None: |
| raise ActionNotImplementedError(self, action) |
| |
| def text_input(self, run: Run, action: i_action.TextInputAction) -> None: |
| input_source = action.input_source |
| if input_source is InputSource.JS: |
| self.text_input_js(run, action) |
| elif input_source is InputSource.KEYBOARD: |
| self.text_input_keyboard(run, action) |
| else: |
| raise RuntimeError(f"Unsupported input source: '{input_source}'") |
| |
| def click_js(self, run: Run, action: i_action.ClickAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def click_touch(self, run: Run, action: i_action.ClickAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def click_mouse(self, run: Run, action: i_action.ClickAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def scroll_touch(self, run: Run, action: i_action.ScrollAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def scroll_mouse(self, run: Run, action: i_action.ScrollAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def text_input_js(self, run: Run, action: i_action.TextInputAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def text_input_keyboard(self, run: Run, |
| action: i_action.TextInputAction) -> None: |
| raise InputSourceNotImplementedError(self, action, action.input_source) |
| |
| def swipe(self, run: Run, action: i_action.SwipeAction) -> None: |
| raise ActionNotImplementedError(self, action) |
| |
| def wait_for_element(self, run: Run, |
| action: i_action.WaitForElementAction) -> None: |
| raise ActionNotImplementedError(self, action) |
| |
| def wait_for_ready_state(self, run: Run, |
| action: i_action.WaitForReadyStateAction) -> None: |
| raise ActionNotImplementedError(self, action) |
| |
| def inject_new_document_script( |
| self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None: |
| raise ActionNotImplementedError(self, action) |
| |
| # screenshot_path is a helper for screenshot that generates the full path of a |
| # screenshot file based on info_stack. The screenshot dir is created if |
| # necessary. |
| # TODO: the folder management should be done in a probe. |
| def screenshot_path(self, out_dir: LocalPath, suffix: str) -> LocalPath: |
| screenshot_path = out_dir / "screenshot" |
| screenshot_path.mkdir(exist_ok=True) |
| filename = "_".join(self.info_stack) + f"_{suffix}.png" |
| return screenshot_path / filename |
| |
| # TODO: Move this into a probe, which can have multiple implementations for |
| # different platforms or fullscreen vs. window, etc. |
| def screenshot_impl(self, run: Run, suffix: str) -> None: |
| run.browser.screenshot(self.screenshot_path(run.out_dir, suffix)) |
| |
| def screenshot(self, run: Run, action: i_action.ScreenshotAction) -> None: |
| del action |
| self.screenshot_impl(run, "screenshot") |
| |
| def _maybe_navigate_to_about_blank(self, run: Run, page: Page) -> None: |
| if duration := page.about_blank_duration: |
| run.browser.show_url("about:blank") |
| run.runner.wait(duration) |
| |
| def run_page_once(self, run: Run, page: LivePage): |
| run.browser.show_url(page.url) |
| run.runner.wait(page.duration) |
| self._maybe_navigate_to_about_blank(run, page) |
| |
| def run_page_multiple_tabs(self, run: Run, tabs: TabController, |
| pages: Iterable[Page]): |
| # TODO: refactor possible logics to TabController. |
| browser = run.browser |
| for _ in tabs: |
| try: |
| for i, page in enumerate(pages): |
| # Create a new tab for the multiple_tab case. |
| if i > 0: |
| browser.switch_to_new_tab() |
| self._listener.handle_new_tab() |
| page.run_with(run, self, False) |
| self._listener.handle_page_run(run) |
| browser.switch_to_new_tab() |
| self._listener.handle_new_tab() |
| except Exception as e: |
| self._listener.handle_error(e) |
| raise |
| |
| def run_page(self, run: Run, page: LivePage, multiple_tabs: bool): |
| if multiple_tabs: |
| self.run_page_multiple_tabs(run, page.tabs, [page]) |
| else: |
| self.run_page_once(run, page) |
| |
| def run_combined_page(self, run: Run, page: CombinedPage, |
| multiple_tabs: bool): |
| if multiple_tabs: |
| self.run_page_multiple_tabs(run, page.tabs, page.pages) |
| else: |
| for sub_page in page.pages: |
| sub_page.run_with(run, self, False) |
| |
| def run_interactive_page(self, run: Run, page: InteractivePage, |
| multiple_tabs: bool): |
| # TODO(lsuhua): support multiple tabs for interactive page if needed. |
| if multiple_tabs: |
| raise NotImplementedError( |
| "Multiple tabs test for interactive page is not supported.") |
| try: |
| self.run_blocks(run, page, page.blocks) |
| self._maybe_navigate_to_about_blank(run, page) |
| except Exception: |
| page.failure_screenshot(run) |
| raise |
| |
| def run_setup(self, run: Run, page: InteractivePage, setup: ActionBlock): |
| try: |
| with exception.annotate("setup"): |
| setup.run_with(self, run, page) |
| except Exception: |
| page.failure_screenshot(run, "setup-failure") |
| raise |
| |
| def run_login(self, run: Run, page: InteractivePage, login: ActionBlock): |
| try: |
| with exception.annotate("login"): |
| login.run_with(self, run, page) |
| except Exception: |
| page.failure_screenshot(run, "login-failure") |
| raise |
| |
| def switch_tab(self, run: Run, action: i_action.SwitchTabAction): |
| raise ActionNotImplementedError(self, action) |