| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| Provides graphics related utils, like capturing screenshots or checking on |
| the state of the graphics driver. |
| """ |
| |
| import collections |
| import contextlib |
| import fcntl |
| import glob |
| import logging |
| import os |
| import re |
| import struct |
| import sys |
| import time |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros.input_playback import input_playback |
| from autotest_lib.client.cros.power import power_utils |
| from functools import wraps |
| |
| # The uinput module might not be available at SDK test time. |
| try: |
| from autotest_lib.client.cros.graphics import graphics_uinput |
| except ImportError: |
| graphics_uinput = None |
| |
| |
| class GraphicsTest(test.test): |
| """Base class for graphics test. |
| |
| GraphicsTest is the base class for graphics tests. |
| Every subclass of GraphicsTest should call GraphicsTests initialize/cleanup |
| method as they will do GraphicsStateChecker as well as report states to |
| Chrome Perf dashboard. |
| |
| Attributes: |
| _test_failure_description(str): Failure name reported to chrome perf |
| dashboard. (Default: Failures) |
| _test_failure_report_enable(bool): Enable/Disable reporting |
| failures to chrome perf dashboard |
| automatically. (Default: True) |
| _test_failure_report_subtest(bool): Enable/Disable reporting |
| subtests failure to chrome perf |
| dashboard automatically. |
| (Default: False) |
| """ |
| version = 1 |
| _GSC = None |
| |
| _test_failure_description = "Failures" |
| _test_failure_report_enable = True |
| _test_failure_report_subtest = False |
| |
| def __init__(self, *args, **kwargs): |
| """Initialize flag setting.""" |
| super(GraphicsTest, self).__init__(*args, **kwargs) |
| self._failures_by_description = {} |
| self._player = None |
| |
| def initialize(self, raise_error_on_hang=False, *args, **kwargs): |
| """Initial state checker and report initial value to perf dashboard.""" |
| self._GSC = GraphicsStateChecker( |
| raise_error_on_hang=raise_error_on_hang, |
| run_on_sw_rasterizer=utils.is_virtual_machine()) |
| |
| self.output_perf_value( |
| description='Timeout_Reboot', |
| value=1, |
| units='count', |
| higher_is_better=False, |
| replace_existing_values=True |
| ) |
| |
| if hasattr(super(GraphicsTest, self), "initialize"): |
| utils.cherry_pick_call(super(GraphicsTest, self).initialize, |
| *args, **kwargs) |
| |
| def input_check(self): |
| """Check if it exists and initialize input player.""" |
| if self._player is None: |
| self._player = input_playback.InputPlayback() |
| self._player.emulate(input_type='keyboard') |
| self._player.find_connected_inputs() |
| |
| def cleanup(self, *args, **kwargs): |
| """Finalize state checker and report values to perf dashboard.""" |
| if self._GSC: |
| self._GSC.finalize() |
| |
| self._output_perf() |
| if self._player: |
| self._player.close() |
| |
| if hasattr(super(GraphicsTest, self), "cleanup"): |
| utils.cherry_pick_call(super(GraphicsTest, self).cleanup, |
| *args, **kwargs) |
| |
| @contextlib.contextmanager |
| def failure_report(self, name, subtest=None): |
| """Record the failure of an operation to self._failures_by_description. |
| |
| Records if the operation taken inside executed normally or not. |
| If the operation taken inside raise unexpected failure, failure named |
| |name|, will be added to the self._failures_by_description dictionary |
| and reported to the chrome perf dashboard in the cleanup stage. |
| |
| Usage: |
| # Record failure of doSomething |
| with failure_report('doSomething'): |
| doSomething() |
| """ |
| # Assume failed at the beginning |
| self.add_failures(name, subtest=subtest) |
| try: |
| yield {} |
| self.remove_failures(name, subtest=subtest) |
| except (error.TestWarn, error.TestNAError) as e: |
| self.remove_failures(name, subtest=subtest) |
| raise e |
| |
| @classmethod |
| def failure_report_decorator(cls, name, subtest=None): |
| """Record the failure if the function failed to finish. |
| This method should only decorate to functions of GraphicsTest. |
| In addition, functions with this decorator should be called with no |
| unnamed arguments. |
| Usage: |
| @GraphicsTest.test_run_decorator('graphics_test') |
| def Foo(self, bar='test'): |
| return doStuff() |
| |
| is equivalent to |
| |
| def Foo(self, bar): |
| with failure_reporter('graphics_test'): |
| return doStuff() |
| |
| # Incorrect usage. |
| @GraphicsTest.test_run_decorator('graphics_test') |
| def Foo(self, bar='test'): |
| pass |
| self.Foo('test_name', bar='test_name') # call Foo with named args |
| |
| # Incorrect usage. |
| @GraphicsTest.test_run_decorator('graphics_test') |
| def Foo(self, bar='test'): |
| pass |
| self.Foo('test_name') # call Foo with unnamed args |
| """ |
| def decorator(fn): |
| @wraps(fn) |
| def wrapper(*args, **kwargs): |
| if len(args) > 1: |
| raise error.TestError('Unnamed arguments is not accepted. ' |
| 'Please apply this decorator to ' |
| 'function without unnamed args.') |
| # A member function of GraphicsTest is decorated. The first |
| # argument is the instance itself. |
| instance = args[0] |
| with instance.failure_report(name, subtest): |
| # Cherry pick the arguments for the wrapped function. |
| d_args, d_kwargs = utils.cherry_pick_args(fn, args, kwargs) |
| return fn(instance, *d_args, **d_kwargs) |
| return wrapper |
| return decorator |
| |
| def add_failures(self, name, subtest=None): |
| """ |
| Add a record to failures list which will report back to chrome perf |
| dashboard at cleanup stage. |
| Args: |
| name: failure name. |
| subtest: subtest which will appears in cros-perf. If None is |
| specified, use name instead. |
| """ |
| target = self._get_failure(name, subtest=subtest) |
| if target: |
| target['names'].append(name) |
| else: |
| target = { |
| 'description': self._get_failure_description(name, subtest), |
| 'unit': 'count', |
| 'higher_is_better': False, |
| 'graph': self._get_failure_graph_name(), |
| 'names': [name], |
| } |
| self._failures_by_description[target['description']] = target |
| return target |
| |
| def remove_failures(self, name, subtest=None): |
| """ |
| Remove a record from failures list which will report back to chrome perf |
| dashboard at cleanup stage. |
| Args: |
| name: failure name. |
| subtest: subtest which will appears in cros-perf. If None is |
| specified, use name instead. |
| """ |
| target = self._get_failure(name, subtest=subtest) |
| if name in target['names']: |
| target['names'].remove(name) |
| |
| |
| def _output_perf(self): |
| """Report recorded failures back to chrome perf.""" |
| self.output_perf_value( |
| description='Timeout_Reboot', |
| value=0, |
| units='count', |
| higher_is_better=False, |
| replace_existing_values=True |
| ) |
| |
| if not self._test_failure_report_enable: |
| return |
| |
| total_failures = 0 |
| # Report subtests failures |
| for failure in self._failures_by_description.values(): |
| if len(failure['names']) > 0: |
| logging.debug('GraphicsTest failure: %s' % failure['names']) |
| total_failures += len(failure['names']) |
| |
| if not self._test_failure_report_subtest: |
| continue |
| |
| self.output_perf_value( |
| description=failure['description'], |
| value=len(failure['names']), |
| units=failure['unit'], |
| higher_is_better=failure['higher_is_better'], |
| graph=failure['graph'] |
| ) |
| |
| # Report the count of all failures |
| self.output_perf_value( |
| description=self._get_failure_graph_name(), |
| value=total_failures, |
| units='count', |
| higher_is_better=False, |
| ) |
| |
| def _get_failure_graph_name(self): |
| return self._test_failure_description |
| |
| def _get_failure_description(self, name, subtest): |
| return subtest or name |
| |
| def _get_failure(self, name, subtest): |
| """Get specific failures.""" |
| description = self._get_failure_description(name, subtest=subtest) |
| return self._failures_by_description.get(description, None) |
| |
| def get_failures(self): |
| """ |
| Get currently recorded failures list. |
| """ |
| return [name for failure in self._failures_by_description.values() |
| for name in failure['names']] |
| |
| def open_vt1(self): |
| """Switch to VT1 with keyboard.""" |
| self.input_check() |
| self._player.blocking_playback_of_default_file( |
| input_type='keyboard', filename='keyboard_ctrl+alt+f1') |
| time.sleep(5) |
| |
| def open_vt2(self): |
| """Switch to VT2 with keyboard.""" |
| self.input_check() |
| self._player.blocking_playback_of_default_file( |
| input_type='keyboard', filename='keyboard_ctrl+alt+f2') |
| time.sleep(5) |
| |
| def wake_screen_with_keyboard(self): |
| """Use the vt1 keyboard shortcut to bring the devices screen back on. |
| |
| This is useful if you want to take screenshots of the UI. If you try |
| to take them while the screen is off, it will fail. |
| """ |
| self.open_vt1() |
| |
| |
| def screen_disable_blanking(): |
| """ Called from power_Backlight to disable screen blanking. """ |
| # We don't have to worry about unexpected screensavers or DPMS here. |
| return |
| |
| |
| def screen_disable_energy_saving(): |
| """ Called from power_Consumption to immediately disable energy saving. """ |
| # All we need to do here is enable displays via Chrome. |
| power_utils.set_display_power(power_utils.DISPLAY_POWER_ALL_ON) |
| return |
| |
| |
| def screen_toggle_fullscreen(): |
| """Toggles fullscreen mode.""" |
| press_keys(['KEY_F11']) |
| |
| |
| def screen_toggle_mirrored(): |
| """Toggles the mirrored screen.""" |
| press_keys(['KEY_LEFTCTRL', 'KEY_F4']) |
| |
| |
| def hide_cursor(): |
| """Hides mouse cursor.""" |
| # Send a keystroke to hide the cursor. |
| press_keys(['KEY_UP']) |
| |
| |
| def hide_typing_cursor(): |
| """Hides typing cursor.""" |
| # Press the tab key to move outside the typing bar. |
| press_keys(['KEY_TAB']) |
| |
| |
| def screen_wakeup(): |
| """Wake up the screen if it is dark.""" |
| # Move the mouse a little bit to wake up the screen. |
| device = graphics_uinput.get_device_mouse_rel() |
| graphics_uinput.emit(device, 'REL_X', 1) |
| graphics_uinput.emit(device, 'REL_X', -1) |
| |
| |
| def switch_screen_on(on): |
| """ |
| Turn the touch screen on/off. |
| |
| @param on: On or off. |
| """ |
| raise error.TestFail('switch_screen_on is not implemented.') |
| |
| |
| def press_keys(key_list): |
| """Presses the given keys as one combination. |
| |
| Please do not leak uinput dependencies outside of the file. |
| |
| @param key: A list of key strings, e.g. ['LEFTCTRL', 'F4'] |
| """ |
| graphics_uinput.emit_combo(graphics_uinput.get_device_keyboard(), key_list) |
| |
| |
| def click_mouse(): |
| """Just click the mouse. |
| Presumably only hacky tests use this function. |
| """ |
| logging.info('click_mouse()') |
| # Move a little to make the cursor appear. |
| device = graphics_uinput.get_device_mouse_rel() |
| graphics_uinput.emit(device, 'REL_X', 1) |
| # Some sleeping is needed otherwise events disappear. |
| time.sleep(0.1) |
| # Move cursor back to not drift. |
| graphics_uinput.emit(device, 'REL_X', -1) |
| time.sleep(0.1) |
| # Click down. |
| graphics_uinput.emit(device, 'BTN_LEFT', 1) |
| time.sleep(0.2) |
| # Release click. |
| graphics_uinput.emit(device, 'BTN_LEFT', 0) |
| |
| |
| # TODO(ihf): this function is broken. Make it work. |
| def activate_focus_at(rel_x, rel_y): |
| """Clicks with the mouse at screen position (x, y). |
| |
| This is a pretty hacky method. Using this will probably lead to |
| flaky tests as page layout changes over time. |
| @param rel_x: relative horizontal position between 0 and 1. |
| @param rel_y: relattive vertical position between 0 and 1. |
| """ |
| width, height = get_internal_resolution() |
| device = graphics_uinput.get_device_touch() |
| graphics_uinput.emit(device, 'ABS_MT_SLOT', 0, syn=False) |
| graphics_uinput.emit(device, 'ABS_MT_TRACKING_ID', 1, syn=False) |
| graphics_uinput.emit(device, 'ABS_MT_POSITION_X', int(rel_x * width), |
| syn=False) |
| graphics_uinput.emit(device, 'ABS_MT_POSITION_Y', int(rel_y * height), |
| syn=False) |
| graphics_uinput.emit(device, 'BTN_TOUCH', 1, syn=True) |
| time.sleep(0.2) |
| graphics_uinput.emit(device, 'BTN_TOUCH', 0, syn=True) |
| |
| |
| def take_screenshot(resultsdir, fname_prefix): |
| """Take screenshot and save to a new file in the results dir. |
| Args: |
| @param resultsdir: Directory to store the output in. |
| @param fname_prefix: Prefix for the output fname. |
| Returns: |
| the path of the saved screenshot file |
| """ |
| |
| old_exc_type = sys.exc_info()[0] |
| |
| next_index = len(glob.glob( |
| os.path.join(resultsdir, '%s-*.png' % fname_prefix))) |
| screenshot_file = os.path.join( |
| resultsdir, '%s-%d.png' % (fname_prefix, next_index)) |
| logging.info('Saving screenshot to %s.', screenshot_file) |
| |
| try: |
| utils.run('screenshot "%s"' % screenshot_file) |
| except Exception as err: |
| # Do not raise an exception if the screenshot fails while processing |
| # another exception. |
| if old_exc_type is None: |
| raise |
| logging.error(err) |
| |
| return screenshot_file |
| |
| |
| def take_screenshot_crop(fullpath, box=None, crtc_id=None): |
| """ |
| Take a screenshot using import tool, crop according to dim given by the box. |
| @param fullpath: path, full path to save the image to. |
| @param box: 4-tuple giving the upper left and lower right pixel coordinates. |
| @param crtc_id: if set, take a screen shot of the specified CRTC. |
| """ |
| cmd = 'screenshot' |
| if crtc_id is not None: |
| cmd += ' --crtc-id=%d' % crtc_id |
| else: |
| cmd += ' --internal' |
| if box: |
| x, y, r, b = box |
| w = r - x |
| h = b - y |
| cmd += ' --crop=%dx%d+%d+%d' % (w, h, x, y) |
| cmd += ' "%s"' % fullpath |
| utils.run(cmd) |
| return fullpath |
| |
| |
| # id encoder status name size (mm) modes encoders |
| # 39 0 connected eDP-1 256x144 1 38 |
| _MODETEST_CONNECTOR_PATTERN = re.compile( |
| r'^(\d+)\s+(\d+)\s+(connected|disconnected)\s+(\S+)\s+\d+x\d+\s+\d+\s+\d+') |
| |
| # id crtc type possible crtcs possible clones |
| # 38 0 TMDS 0x00000002 0x00000000 |
| _MODETEST_ENCODER_PATTERN = re.compile( |
| r'^(\d+)\s+(\d+)\s+\S+\s+0x[0-9a-fA-F]+\s+0x[0-9a-fA-F]+') |
| |
| # Group names match the drmModeModeInfo struct |
| _MODETEST_MODE_PATTERN = re.compile( |
| r'\s+(?P<name>.+)' |
| r'\s+(?P<vrefresh>\d+)' |
| r'\s+(?P<hdisplay>\d+)' |
| r'\s+(?P<hsync_start>\d+)' |
| r'\s+(?P<hsync_end>\d+)' |
| r'\s+(?P<htotal>\d+)' |
| r'\s+(?P<vdisplay>\d+)' |
| r'\s+(?P<vsync_start>\d+)' |
| r'\s+(?P<vsync_end>\d+)' |
| r'\s+(?P<vtotal>\d+)' |
| r'\s+(?P<clock>\d+)' |
| r'\s+flags:.+type:' |
| r' preferred') |
| |
| _MODETEST_CRTCS_START_PATTERN = re.compile(r'^id\s+fb\s+pos\s+size') |
| |
| _MODETEST_CRTC_PATTERN = re.compile( |
| r'^(\d+)\s+(\d+)\s+\((\d+),(\d+)\)\s+\((\d+)x(\d+)\)') |
| |
| _MODETEST_PLANES_START_PATTERN = re.compile( |
| r'^id\s+crtc\s+fb\s+CRTC\s+x,y\s+x,y\s+gamma\s+size\s+possible\s+crtcs') |
| |
| _MODETEST_PLANE_PATTERN = re.compile( |
| r'^(\d+)\s+(\d+)\s+(\d+)\s+(\d+),(\d+)\s+(\d+),(\d+)\s+(\d+)\s+(0x)(\d+)') |
| |
| Connector = collections.namedtuple( |
| 'Connector', [ |
| 'cid', # connector id (integer) |
| 'eid', # encoder id (integer) |
| 'ctype', # connector type, e.g. 'eDP', 'HDMI-A', 'DP' |
| 'connected', # boolean |
| 'size', # current screen size in mm, e.g. (256, 144) |
| 'encoder', # encoder id (integer) |
| # list of resolution tuples, e.g. [(1920,1080), (1600,900), ...] |
| 'modes', |
| ]) |
| |
| Encoder = collections.namedtuple( |
| 'Encoder', [ |
| 'eid', # encoder id (integer) |
| 'crtc_id', # CRTC id (integer) |
| ]) |
| |
| CRTC = collections.namedtuple( |
| 'CRTC', [ |
| 'id', # crtc id |
| 'fb', # fb id |
| 'pos', # position, e.g. (0,0) |
| 'size', # size, e.g. (1366,768) |
| 'is_internal', # True if for the internal display |
| ]) |
| |
| Plane = collections.namedtuple( |
| 'Plane', [ |
| 'id', # plane id |
| 'possible_crtcs', # possible associated CRTC indexes. |
| ]) |
| |
| def get_display_resolution(): |
| """ |
| Parses output of modetest to determine the display resolution of the dut. |
| @return: tuple, (w,h) resolution of device under test. |
| """ |
| connectors = get_modetest_connectors() |
| for connector in connectors: |
| if connector.connected: |
| return connector.size |
| return None |
| |
| |
| def _get_num_outputs_connected(): |
| """ |
| Parses output of modetest to determine the number of connected displays |
| @return: The number of connected displays |
| """ |
| connected = 0 |
| connectors = get_modetest_connectors() |
| for connector in connectors: |
| if connector.connected: |
| connected = connected + 1 |
| |
| return connected |
| |
| |
| def get_num_outputs_on(): |
| """ |
| Retrieves the number of connected outputs that are on. |
| |
| Return value: integer value of number of connected outputs that are on. |
| """ |
| |
| return _get_num_outputs_connected() |
| |
| |
| def get_modetest_connectors(): |
| """ |
| Retrieves a list of Connectors using modetest. |
| |
| Return value: List of Connectors. |
| """ |
| connectors = [] |
| modetest_output = utils.system_output('modetest -c') |
| for line in modetest_output.splitlines(): |
| # First search for a new connector. |
| connector_match = re.match(_MODETEST_CONNECTOR_PATTERN, line) |
| if connector_match is not None: |
| cid = int(connector_match.group(1)) |
| eid = int(connector_match.group(2)) |
| connected = False |
| if connector_match.group(3) == 'connected': |
| connected = True |
| ctype = connector_match.group(4) |
| size = (-1, -1) |
| encoder = -1 |
| modes = None |
| connectors.append( |
| Connector(cid, eid, ctype, connected, size, encoder, modes)) |
| else: |
| # See if we find corresponding line with modes, sizes etc. |
| mode_match = re.match(_MODETEST_MODE_PATTERN, line) |
| if mode_match is not None: |
| size = (int(mode_match.group('hdisplay')), |
| int(mode_match.group('vdisplay'))) |
| # Update display size of last connector in list. |
| c = connectors.pop() |
| connectors.append( |
| Connector( |
| c.cid, c.eid, c.ctype, c.connected, size, c.encoder, |
| c.modes)) |
| return connectors |
| |
| |
| def get_modetest_encoders(): |
| """ |
| Retrieves a list of Encoders using modetest. |
| |
| Return value: List of Encoders. |
| """ |
| encoders = [] |
| modetest_output = utils.system_output('modetest -e') |
| for line in modetest_output.splitlines(): |
| encoder_match = re.match(_MODETEST_ENCODER_PATTERN, line) |
| if encoder_match is None: |
| continue |
| |
| eid = int(encoder_match.group(1)) |
| crtc_id = int(encoder_match.group(2)) |
| encoders.append(Encoder(eid, crtc_id)) |
| return encoders |
| |
| |
| def find_eid_from_crtc_id(crtc_id): |
| """ |
| Finds the integer Encoder ID matching a CRTC ID. |
| |
| @param crtc_id: The integer CRTC ID. |
| |
| @return: The integer Encoder ID or None. |
| """ |
| encoders = get_modetest_encoders() |
| for encoder in encoders: |
| if encoder.crtc_id == crtc_id: |
| return encoder.eid |
| return None |
| |
| |
| def find_connector_from_eid(eid): |
| """ |
| Finds the Connector object matching an Encoder ID. |
| |
| @param eid: The integer Encoder ID. |
| |
| @return: The Connector object or None. |
| """ |
| connectors = get_modetest_connectors() |
| for connector in connectors: |
| if connector.eid == eid: |
| return connector |
| return None |
| |
| |
| def get_modetest_crtcs(): |
| """ |
| Returns a list of CRTC data. |
| |
| Sample: |
| [CRTC(id=19, fb=50, pos=(0, 0), size=(1366, 768)), |
| CRTC(id=22, fb=54, pos=(0, 0), size=(1920, 1080))] |
| """ |
| crtcs = [] |
| modetest_output = utils.system_output('modetest -p') |
| found = False |
| for line in modetest_output.splitlines(): |
| if found: |
| crtc_match = re.match(_MODETEST_CRTC_PATTERN, line) |
| if crtc_match is not None: |
| crtc_id = int(crtc_match.group(1)) |
| fb = int(crtc_match.group(2)) |
| x = int(crtc_match.group(3)) |
| y = int(crtc_match.group(4)) |
| width = int(crtc_match.group(5)) |
| height = int(crtc_match.group(6)) |
| # CRTCs with fb=0 are disabled, but lets skip anything with |
| # trivial width/height just in case. |
| if not (fb == 0 or width == 0 or height == 0): |
| eid = find_eid_from_crtc_id(crtc_id) |
| connector = find_connector_from_eid(eid) |
| if connector is None: |
| is_internal = False |
| else: |
| is_internal = (connector.ctype == |
| get_internal_connector_name()) |
| crtcs.append(CRTC(crtc_id, fb, (x, y), (width, height), |
| is_internal)) |
| elif line and not line[0].isspace(): |
| return crtcs |
| if re.match(_MODETEST_CRTCS_START_PATTERN, line) is not None: |
| found = True |
| return crtcs |
| |
| |
| def get_modetest_planes(): |
| """ |
| Returns a list of planes information. |
| |
| Sample: |
| [Plane(id=26, possible_crtcs=1), |
| Plane(id=29, possible_crtcs=1)] |
| """ |
| planes = [] |
| modetest_output = utils.system_output('modetest -p') |
| found = False |
| for line in modetest_output.splitlines(): |
| if found: |
| plane_match = re.match(_MODETEST_PLANE_PATTERN, line) |
| if plane_match is not None: |
| plane_id = int(plane_match.group(1)) |
| possible_crtcs = int(plane_match.group(10)) |
| if not (plane_id == 0 or possible_crtcs == 0): |
| planes.append(Plane(plane_id, possible_crtcs)) |
| elif line and not line[0].isspace(): |
| return planes |
| if re.match(_MODETEST_PLANES_START_PATTERN, line) is not None: |
| found = True |
| return planes |
| |
| |
| def is_nv12_supported_by_drm_planes(): |
| """ |
| Returns if the planes information mention NV12 format or not. |
| |
| This is a crude way to figure out if the device will not be able to promote |
| video frames to overlays at all, which happens for example on Broadwell. |
| """ |
| modetest_output = utils.system_output('modetest -p') |
| return "nv12" in modetest_output.lower() |
| |
| def get_modetest_output_state(): |
| """ |
| Reduce the output of get_modetest_connectors to a dictionary of connector/active states. |
| """ |
| connectors = get_modetest_connectors() |
| outputs = {} |
| for connector in connectors: |
| # TODO(ihf): Figure out why modetest output needs filtering. |
| if connector.connected: |
| outputs[connector.ctype] = connector.connected |
| return outputs |
| |
| |
| def get_output_rect(output): |
| """Gets the size and position of the given output on the screen buffer. |
| |
| @param output: The output name as a string. |
| |
| @return A tuple of the rectangle (width, height, fb_offset_x, |
| fb_offset_y) of ints. |
| """ |
| connectors = get_modetest_connectors() |
| for connector in connectors: |
| if connector.ctype == output: |
| # Concatenate two 2-tuples to 4-tuple. |
| return connector.size + (0, 0) # TODO(ihf): Should we use CRTC.pos? |
| return (0, 0, 0, 0) |
| |
| |
| def get_internal_crtc(): |
| for crtc in get_modetest_crtcs(): |
| if crtc.is_internal: |
| return crtc |
| return None |
| |
| |
| def get_external_crtc(index=0): |
| for crtc in get_modetest_crtcs(): |
| if not crtc.is_internal: |
| if index == 0: |
| return crtc |
| index -= 1 |
| return None |
| |
| |
| def get_internal_resolution(): |
| crtc = get_internal_crtc() |
| if crtc: |
| return crtc.size |
| return (-1, -1) |
| |
| |
| def has_internal_display(): |
| """Checks whether the DUT is equipped with an internal display. |
| |
| @return True if internal display is present; False otherwise. |
| """ |
| return bool(get_internal_connector_name()) |
| |
| |
| def get_external_resolution(): |
| """Gets the resolution of the external display. |
| |
| @return A tuple of (width, height) or None if no external display is |
| connected. |
| """ |
| crtc = get_external_crtc() |
| if crtc: |
| return crtc.size |
| return None |
| |
| |
| def get_display_output_state(): |
| """ |
| Retrieves output status of connected display(s). |
| |
| Return value: dictionary of connected display states. |
| """ |
| return get_modetest_output_state() |
| |
| |
| def set_modetest_output(output_name, enable): |
| # TODO(ihf): figure out what to do here. Don't think this is the right command. |
| # modetest -s <connector_id>[,<connector_id>][@<crtc_id>]:<mode>[-<vrefresh>][@<format>] set a mode |
| pass |
| |
| |
| def set_display_output(output_name, enable): |
| """ |
| Sets the output given by |output_name| on or off. |
| """ |
| set_modetest_output(output_name, enable) |
| |
| |
| # TODO(ihf): Fix this for multiple external connectors. |
| def get_external_crtc_id(index=0): |
| crtc = get_external_crtc(index) |
| if crtc is not None: |
| return crtc.id |
| return -1 |
| |
| |
| def get_internal_crtc_id(): |
| crtc = get_internal_crtc() |
| if crtc is not None: |
| return crtc.id |
| return -1 |
| |
| |
| # TODO(ihf): Fix this for multiple external connectors. |
| def get_external_connector_name(): |
| """Gets the name of the external output connector. |
| |
| @return The external output connector name as a string, if any. |
| Otherwise, return False. |
| """ |
| outputs = get_display_output_state() |
| for output in outputs.iterkeys(): |
| if outputs[output] and (output.startswith('HDMI') |
| or output.startswith('DP') |
| or output.startswith('DVI') |
| or output.startswith('VGA')): |
| return output |
| return False |
| |
| |
| def get_internal_connector_name(): |
| """Gets the name of the internal output connector. |
| |
| @return The internal output connector name as a string, if any. |
| Otherwise, return False. |
| """ |
| outputs = get_display_output_state() |
| for output in outputs.iterkeys(): |
| # reference: chromium_org/chromeos/display/output_util.cc |
| if (output.startswith('eDP') |
| or output.startswith('LVDS') |
| or output.startswith('DSI')): |
| return output |
| return False |
| |
| |
| def wait_output_connected(output): |
| """Wait for output to connect. |
| |
| @param output: The output name as a string. |
| |
| @return: True if output is connected; False otherwise. |
| """ |
| def _is_connected(output): |
| """Helper function.""" |
| outputs = get_display_output_state() |
| if output not in outputs: |
| return False |
| return outputs[output] |
| |
| return utils.wait_for_value(lambda: _is_connected(output), |
| expected_value=True) |
| |
| |
| def set_content_protection(output_name, state): |
| """ |
| Sets the content protection to the given state. |
| |
| @param output_name: The output name as a string. |
| @param state: One of the states 'Undesired', 'Desired', or 'Enabled' |
| |
| """ |
| raise error.TestFail('freon: set_content_protection not implemented') |
| |
| |
| def get_content_protection(output_name): |
| """ |
| Gets the state of the content protection. |
| |
| @param output_name: The output name as a string. |
| @return: A string of the state, like 'Undesired', 'Desired', or 'Enabled'. |
| False if not supported. |
| |
| """ |
| raise error.TestFail('freon: get_content_protection not implemented') |
| |
| |
| def is_sw_rasterizer(): |
| """Return true if OpenGL is using a software rendering.""" |
| cmd = utils.wflinfo_cmd() + ' | grep "OpenGL renderer string"' |
| output = utils.run(cmd) |
| result = output.stdout.splitlines()[0] |
| logging.info('wflinfo: %s', result) |
| # TODO(ihf): Find exhaustive error conditions (especially ARM). |
| return 'llvmpipe' in result.lower() or 'soft' in result.lower() |
| |
| |
| def get_gles_version(): |
| cmd = utils.wflinfo_cmd() |
| wflinfo = utils.system_output(cmd, retain_output=False, ignore_status=False) |
| # OpenGL version string: OpenGL ES 3.0 Mesa 10.5.0-devel |
| version = re.findall(r'OpenGL version string: ' |
| r'OpenGL ES ([0-9]+).([0-9]+)', wflinfo) |
| if version: |
| version_major = int(version[0][0]) |
| version_minor = int(version[0][1]) |
| return (version_major, version_minor) |
| return (None, None) |
| |
| |
| def get_egl_version(): |
| cmd = 'eglinfo' |
| eglinfo = utils.system_output(cmd, retain_output=False, ignore_status=False) |
| # EGL version string: 1.4 (DRI2) |
| version = re.findall(r'EGL version string: ([0-9]+).([0-9]+)', eglinfo) |
| if version: |
| version_major = int(version[0][0]) |
| version_minor = int(version[0][1]) |
| return (version_major, version_minor) |
| return (None, None) |
| |
| |
| class GraphicsKernelMemory(object): |
| """ |
| Reads from sysfs to determine kernel gem objects and memory info. |
| """ |
| # These are sysfs fields that will be read by this test. For different |
| # architectures, the sysfs field paths are different. The "paths" are given |
| # as lists of strings because the actual path may vary depending on the |
| # system. This test will read from the first sysfs path in the list that is |
| # present. |
| # e.g. ".../memory" vs ".../gpu_memory" -- if the system has either one of |
| # these, the test will read from that path. |
| amdgpu_fields = { |
| 'gem_objects': ['/sys/kernel/debug/dri/0/amdgpu_gem_info'], |
| 'memory': ['/sys/kernel/debug/dri/0/amdgpu_gtt_mm'], |
| } |
| arm_fields = {} |
| exynos_fields = { |
| 'gem_objects': ['/sys/kernel/debug/dri/?/exynos_gem_objects'], |
| 'memory': ['/sys/class/misc/mali0/device/memory', |
| '/sys/class/misc/mali0/device/gpu_memory'], |
| } |
| mediatek_fields = {} |
| # TODO(crosbug.com/p/58189) Add mediatek GPU memory nodes |
| qualcomm_fields = {} |
| # TODO(b/119269602) Add qualcomm GPU memory nodes once GPU patches land |
| rockchip_fields = {} |
| tegra_fields = { |
| 'memory': ['/sys/kernel/debug/memblock/memory'], |
| } |
| i915_fields = { |
| 'gem_objects': ['/sys/kernel/debug/dri/0/i915_gem_objects'], |
| 'memory': ['/sys/kernel/debug/dri/0/i915_gem_gtt'], |
| } |
| # In Linux Kernel 5, i915_gem_gtt merged into i915_gem_objects |
| i915_fields_kernel_5 = { |
| 'gem_objects': ['/sys/kernel/debug/dri/0/i915_gem_objects'], |
| } |
| cirrus_fields = {} |
| virtio_fields = {} |
| |
| arch_fields = { |
| 'amdgpu': amdgpu_fields, |
| 'arm': arm_fields, |
| 'cirrus': cirrus_fields, |
| 'exynos5': exynos_fields, |
| 'i915': i915_fields, |
| 'i915_kernel_5': i915_fields_kernel_5, |
| 'mediatek': mediatek_fields, |
| 'qualcomm': qualcomm_fields, |
| 'rockchip': rockchip_fields, |
| 'tegra': tegra_fields, |
| 'virtio': virtio_fields, |
| } |
| |
| |
| num_errors = 0 |
| |
| def __init__(self): |
| self._initial_memory = self.get_memory_keyvals() |
| |
| def get_memory_difference_keyvals(self): |
| """ |
| Reads the graphics memory values and return the difference between now |
| and the memory usage at initialization stage as keyvals. |
| """ |
| current_memory = self.get_memory_keyvals() |
| return {key: self._initial_memory[key] - current_memory[key] |
| for key in self._initial_memory} |
| |
| def get_memory_keyvals(self): |
| """ |
| Reads the graphics memory values and returns them as keyvals. |
| """ |
| keyvals = {} |
| |
| # Get architecture type and list of sysfs fields to read. |
| soc = utils.get_cpu_soc_family() |
| |
| arch = utils.get_cpu_arch() |
| kernel_version = utils.get_kernel_version()[0:4].rstrip(".") |
| if arch == 'x86_64' or arch == 'i386': |
| pci_vga_device = utils.run("lspci | grep VGA").stdout.rstrip('\n') |
| if "Advanced Micro Devices" in pci_vga_device: |
| soc = 'amdgpu' |
| elif "Intel Corporation" in pci_vga_device: |
| soc = 'i915' |
| if utils.compare_versions(kernel_version, "4.19") > 0: |
| soc = 'i915_kernel_5' |
| elif "Cirrus Logic" in pci_vga_device: |
| # Used on qemu with kernels 3.18 and lower. Limited to 800x600 |
| # resolution. |
| soc = 'cirrus' |
| else: |
| pci_vga_device = utils.run('lshw -c video').stdout.rstrip() |
| groups = re.search('configuration:.*driver=(\S*)', |
| pci_vga_device) |
| if groups and 'virtio' in groups.group(1): |
| soc = 'virtio' |
| |
| if not soc in self.arch_fields: |
| raise error.TestFail('Error: Architecture "%s" not yet supported.' % soc) |
| fields = self.arch_fields[soc] |
| |
| for field_name in fields: |
| possible_field_paths = fields[field_name] |
| field_value = None |
| for path in possible_field_paths: |
| if utils.system('ls %s' % path, ignore_status=True): |
| continue |
| field_value = utils.system_output('cat %s' % path) |
| break |
| |
| if not field_value: |
| logging.error('Unable to find any sysfs paths for field "%s"', |
| field_name) |
| self.num_errors += 1 |
| continue |
| |
| parsed_results = GraphicsKernelMemory._parse_sysfs(field_value) |
| |
| for key in parsed_results: |
| keyvals['%s_%s' % (field_name, key)] = parsed_results[key] |
| |
| if 'bytes' in parsed_results and parsed_results['bytes'] == 0: |
| logging.error('%s reported 0 bytes', field_name) |
| self.num_errors += 1 |
| |
| keyvals['meminfo_MemUsed'] = (utils.read_from_meminfo('MemTotal') - |
| utils.read_from_meminfo('MemFree')) |
| keyvals['meminfo_SwapUsed'] = (utils.read_from_meminfo('SwapTotal') - |
| utils.read_from_meminfo('SwapFree')) |
| return keyvals |
| |
| @staticmethod |
| def _parse_sysfs(output): |
| """ |
| Parses output of graphics memory sysfs to determine the number of |
| buffer objects and bytes. |
| |
| Arguments: |
| output Unprocessed sysfs output |
| Return value: |
| Dictionary containing integer values of number bytes and objects. |
| They may have the keys 'bytes' and 'objects', respectively. However |
| the result may not contain both of these values. |
| """ |
| results = {} |
| labels = ['bytes', 'objects'] |
| |
| # First handle i915_gem_objects in 5.x kernels. Example: |
| # 296 shrinkable [0 free] objects, 274833408 bytes |
| # frecon: 3 objects, 72192000 bytes (0 active, 0 inactive, 0 unbound, 0 closed) |
| # chrome: 6 objects, 74629120 bytes (0 active, 0 inactive, 376832 unbound, 0 closed) |
| # <snip> |
| i915_gem_objects_pattern = re.compile( |
| r'(?P<objects>\d*) shrinkable.*objects, (?P<bytes>\d*) bytes') |
| i915_gem_objects_match = i915_gem_objects_pattern.match(output) |
| if i915_gem_objects_match is not None: |
| results['bytes'] = int(i915_gem_objects_match.group('bytes')) |
| results['objects'] = int(i915_gem_objects_match.group('objects')) |
| return results |
| |
| for line in output.split('\n'): |
| # Strip any commas to make parsing easier. |
| line_words = line.replace(',', '').split() |
| |
| prev_word = None |
| for word in line_words: |
| # When a label has been found, the previous word should be the |
| # value. e.g. "3200 bytes" |
| if word in labels and word not in results and prev_word: |
| logging.info(prev_word) |
| results[word] = int(prev_word) |
| |
| prev_word = word |
| |
| # Once all values has been parsed, return. |
| if len(results) == len(labels): |
| return results |
| |
| return results |
| |
| |
| class GraphicsStateChecker(object): |
| """ |
| Analyzes the state of the GPU and log history. Should be instantiated at the |
| beginning of each graphics_* test. |
| """ |
| dirty_writeback_centisecs = 0 |
| existing_hangs = {} |
| |
| _BROWSER_VERSION_COMMAND = '/opt/google/chrome/chrome --version' |
| _HANGCHECK = ['drm:i915_hangcheck_elapsed', 'drm:i915_hangcheck_hung', |
| 'Hangcheck timer elapsed...', |
| 'drm/i915: Resetting chip after gpu hang'] |
| _HANGCHECK_WARNING = ['render ring idle'] |
| _MESSAGES_FILE = '/var/log/messages' |
| |
| def __init__(self, raise_error_on_hang=True, run_on_sw_rasterizer=False): |
| """ |
| Analyzes the initial state of the GPU and log history. |
| """ |
| # Attempt flushing system logs every second instead of every 10 minutes. |
| self.dirty_writeback_centisecs = utils.get_dirty_writeback_centisecs() |
| utils.set_dirty_writeback_centisecs(100) |
| self._raise_error_on_hang = raise_error_on_hang |
| logging.info(utils.get_board_with_frequency_and_memory()) |
| self.graphics_kernel_memory = GraphicsKernelMemory() |
| self._run_on_sw_rasterizer = run_on_sw_rasterizer |
| |
| if utils.get_cpu_arch() != 'arm': |
| if not self._run_on_sw_rasterizer and is_sw_rasterizer(): |
| raise error.TestFail('Refusing to run on SW rasterizer.') |
| logging.info('Initialize: Checking for old GPU hangs...') |
| messages = open(self._MESSAGES_FILE, 'r') |
| for line in messages: |
| for hang in self._HANGCHECK: |
| if hang in line: |
| logging.info(line) |
| self.existing_hangs[line] = line |
| messages.close() |
| |
| def finalize(self): |
| """ |
| Analyzes the state of the GPU, log history and emits warnings or errors |
| if the state changed since initialize. Also makes a note of the Chrome |
| version for later usage in the perf-dashboard. |
| """ |
| utils.set_dirty_writeback_centisecs(self.dirty_writeback_centisecs) |
| new_gpu_hang = False |
| new_gpu_warning = False |
| if utils.get_cpu_arch() != 'arm': |
| logging.info('Cleanup: Checking for new GPU hangs...') |
| messages = open(self._MESSAGES_FILE, 'r') |
| for line in messages: |
| for hang in self._HANGCHECK: |
| if hang in line: |
| if not line in self.existing_hangs.keys(): |
| logging.info(line) |
| for warn in self._HANGCHECK_WARNING: |
| if warn in line: |
| new_gpu_warning = True |
| logging.warning( |
| 'Saw GPU hang warning during test.') |
| else: |
| logging.warning('Saw GPU hang during test.') |
| new_gpu_hang = True |
| messages.close() |
| |
| if not self._run_on_sw_rasterizer and is_sw_rasterizer(): |
| logging.warning('Finished test on SW rasterizer.') |
| raise error.TestFail('Finished test on SW rasterizer.') |
| if self._raise_error_on_hang and new_gpu_hang: |
| raise error.TestError('Detected GPU hang during test.') |
| if new_gpu_hang: |
| raise error.TestWarn('Detected GPU hang during test.') |
| if new_gpu_warning: |
| raise error.TestWarn('Detected GPU warning during test.') |
| |
| def get_memory_access_errors(self): |
| """ Returns the number of errors while reading memory stats. """ |
| return self.graphics_kernel_memory.num_errors |
| |
| def get_memory_difference_keyvals(self): |
| return self.graphics_kernel_memory.get_memory_difference_keyvals() |
| |
| def get_memory_keyvals(self): |
| """ Returns memory stats. """ |
| return self.graphics_kernel_memory.get_memory_keyvals() |
| |
| class GraphicsApiHelper(object): |
| """ |
| Report on the available graphics APIs. |
| Ex. gles2, gles3, gles31, and vk |
| """ |
| _supported_apis = [] |
| |
| DEQP_BASEDIR = os.path.join('/usr', 'local', 'deqp') |
| DEQP_EXECUTABLE = { |
| 'gles2': os.path.join('modules', 'gles2', 'deqp-gles2'), |
| 'gles3': os.path.join('modules', 'gles3', 'deqp-gles3'), |
| 'gles31': os.path.join('modules', 'gles31', 'deqp-gles31'), |
| 'vk': os.path.join('external', 'vulkancts', 'modules', |
| 'vulkan', 'deqp-vk') |
| } |
| |
| def __init__(self): |
| # Determine which executable should be run. Right now never egl. |
| major, minor = get_gles_version() |
| logging.info('Found gles%d.%d.', major, minor) |
| if major is None or minor is None: |
| raise error.TestFail( |
| 'Failed: Could not get gles version information (%d, %d).' % |
| (major, minor) |
| ) |
| if major >= 2: |
| self._supported_apis.append('gles2') |
| if major >= 3: |
| self._supported_apis.append('gles3') |
| if major > 3 or minor >= 1: |
| self._supported_apis.append('gles31') |
| |
| # If libvulkan is installed, then assume the board supports vulkan. |
| has_libvulkan = False |
| for libdir in ('/usr/lib', '/usr/lib64', |
| '/usr/local/lib', '/usr/local/lib64'): |
| if os.path.exists(os.path.join(libdir, 'libvulkan.so')): |
| has_libvulkan = True |
| |
| if has_libvulkan: |
| executable_path = os.path.join( |
| self.DEQP_BASEDIR, |
| self.DEQP_EXECUTABLE['vk'] |
| ) |
| if os.path.exists(executable_path): |
| self._supported_apis.append('vk') |
| else: |
| logging.warning('Found libvulkan.so but did not find deqp-vk ' |
| 'binary for testing.') |
| |
| def get_supported_apis(self): |
| """Return the list of supported apis. eg. gles2, gles3, vk etc. |
| @returns: a copy of the supported api list will be returned |
| """ |
| return list(self._supported_apis) |
| |
| def get_deqp_executable(self, api): |
| """Return the path to the api executable.""" |
| if api not in self.DEQP_EXECUTABLE: |
| raise KeyError( |
| "%s is not a supported api for GraphicsApiHelper." % api |
| ) |
| |
| executable = os.path.join( |
| self.DEQP_BASEDIR, |
| self.DEQP_EXECUTABLE[api] |
| ) |
| return executable |
| |
| # Possible paths of the kernel DRI debug text file. |
| _DRI_DEBUG_FILE_PATH_0 = "/sys/kernel/debug/dri/0/state" |
| _DRI_DEBUG_FILE_PATH_1 = "/sys/kernel/debug/dri/1/state" |
| _DRI_DEBUG_FILE_PATH_2 = "/sys/kernel/debug/dri/2/state" |
| |
| # The DRI debug file will have a lot of information, including the position and |
| # sizes of each plane. Some planes might be disabled but have some lingering |
| # crtc-pos information, those are skipped. |
| _CRTC_PLANE_START_PATTERN = re.compile(r'plane\[') |
| _CRTC_DISABLED_PLANE = re.compile(r'crtc=\(null\)') |
| _CRTC_POS_AND_SIZE_PATTERN = re.compile(r'crtc-pos=(?!0x0\+0\+0)') |
| |
| def get_num_hardware_overlays(): |
| """ |
| Counts the amount of hardware overlay planes in use. There's always at |
| least 2 overlays active: the whole screen and the cursor -- unless the |
| cursor has never moved (e.g. in autotests), and it's not present. |
| |
| Raises: RuntimeError if the DRI debug file is not present. |
| OSError/IOError if the file cannot be open()ed or read(). |
| """ |
| file_path = _DRI_DEBUG_FILE_PATH_0; |
| if os.path.exists(_DRI_DEBUG_FILE_PATH_0): |
| file_path = _DRI_DEBUG_FILE_PATH_0; |
| elif os.path.exists(_DRI_DEBUG_FILE_PATH_1): |
| file_path = _DRI_DEBUG_FILE_PATH_1; |
| elif os.path.exists(_DRI_DEBUG_FILE_PATH_2): |
| file_path = _DRI_DEBUG_FILE_PATH_2; |
| else: |
| raise RuntimeError('No DRI debug file exists (%s, %s)' % |
| (_DRI_DEBUG_FILE_PATH_0, _DRI_DEBUG_FILE_PATH_1)) |
| |
| filetext = open(file_path).read() |
| logging.debug(filetext) |
| |
| matches = [] |
| # Split the debug output by planes, skip the disabled ones and extract those |
| # with correct position and size information. |
| planes = re.split(_CRTC_PLANE_START_PATTERN, filetext) |
| for plane in planes: |
| if len(plane) == 0: |
| continue; |
| if len(re.findall(_CRTC_DISABLED_PLANE, plane)) > 0: |
| continue; |
| |
| matches.append(re.findall(_CRTC_POS_AND_SIZE_PATTERN, plane)) |
| |
| # TODO(crbug.com/865112): return also the sizes/locations. |
| return len(matches) |
| |
| def is_drm_debug_supported(): |
| """ |
| @returns true if either of the DRI debug files are present. |
| """ |
| return (os.path.exists(_DRI_DEBUG_FILE_PATH_0) or |
| os.path.exists(_DRI_DEBUG_FILE_PATH_1) or |
| os.path.exists(_DRI_DEBUG_FILE_PATH_2)) |
| |
| # Path and file name regex defining the filesystem location for DRI devices. |
| _DEV_DRI_FOLDER_PATH = '/dev/dri' |
| _DEV_DRI_CARD_PATH = '/dev/dri/card?' |
| |
| # IOCTL code and associated parameter to set the atomic cap. Defined originally |
| # in the kernel's include/uapi/drm/drm.h file. |
| _DRM_IOCTL_SET_CLIENT_CAP = 0x4010640d |
| _DRM_CLIENT_CAP_ATOMIC = 3 |
| |
| def is_drm_atomic_supported(): |
| """ |
| @returns true if there is at least a /dev/dri/card? file that seems to |
| support drm_atomic mode (accepts a _DRM_IOCTL_SET_CLIENT_CAP ioctl). |
| """ |
| if not os.path.isdir(_DEV_DRI_FOLDER_PATH): |
| # This should never ever happen. |
| raise error.TestError('path %s inexistent', _DEV_DRI_FOLDER_PATH); |
| |
| for dev_path in glob.glob(_DEV_DRI_CARD_PATH): |
| try: |
| logging.debug('trying device %s', dev_path); |
| with open(dev_path, 'rw') as dev: |
| # Pack a struct drm_set_client_cap: two u64. |
| drm_pack = struct.pack("QQ", _DRM_CLIENT_CAP_ATOMIC, 1) |
| result = fcntl.ioctl(dev, _DRM_IOCTL_SET_CLIENT_CAP, drm_pack) |
| |
| if result is None or len(result) != len(drm_pack): |
| # This should never ever happen. |
| raise error.TestError('ioctl failure') |
| |
| logging.debug('%s supports atomic', dev_path); |
| |
| if not is_drm_debug_supported(): |
| raise error.TestError('platform supports DRM but there ' |
| ' are no debug files for it') |
| return True |
| except IOError as err: |
| logging.warning('ioctl failed on %s: %s', dev_path, str(err)); |
| |
| logging.debug('No dev files seems to support atomic'); |
| return False |
| |
| def get_max_num_available_drm_planes(): |
| """ |
| @returns The maximum number of DRM planes available in the system |
| (associated to the same CRTC), or 0 if something went wrong (e.g. modetest |
| failed, etc). |
| """ |
| |
| planes = get_modetest_planes() |
| if len(planes) == 0: |
| return 0; |
| packed_possible_crtcs = [plane.possible_crtcs for plane in planes] |
| # |packed_possible_crtcs| is actually a bit field of possible CRTCs, e.g. |
| # 0x6 (b1001) means the plane can be associated with CRTCs index 0 and 3 but |
| # not with index 1 nor 2. Unpack those into |possible_crtcs|, an array of |
| # binary arrays. |
| possible_crtcs = [[int(bit) for bit in bin(crtc)[2:].zfill(16)] |
| for crtc in packed_possible_crtcs] |
| # Accumulate the CRTCs indexes and return the maximum number of 'votes'. |
| return max(map(sum, zip(*possible_crtcs))) |