| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import print_function |
| from __future__ import absolute_import |
| from __future__ import unicode_literals |
| from __future__ import division |
| |
| import os |
| import json |
| import base64 |
| import logging |
| |
| import common |
| from autotest_lib.server.cros.servo.topology import topology_constants as stc |
| |
| |
| class ServoTopologyError(Exception): |
| """ |
| Generic Exception for failures from ServoTopology object. |
| """ |
| pass |
| |
| |
| class MissingServoError(ServoTopologyError): |
| """ |
| Exception to throw when child servo type is missing. |
| """ |
| |
| def __init__(self, message, servo_type): |
| self._servo_type = servo_type |
| self.message = message |
| |
| def __str__(self): |
| return repr(self.message) |
| |
| |
| class ServoTopology(object): |
| """Class to read, generate and validate servo topology in the lab. |
| |
| The class support detection of servo listed in VID_PID_SERVO_TYPES. |
| To save servo topology to host-info date passed two steps: |
| - convert to the json |
| - encode to base64 |
| """ |
| # Command to get usb-path to device |
| SERVOD_TOOL_USB_PATH = 'servodtool device -s %s usb-path' |
| |
| # Base folder where all servo devices will be enumerated. |
| SERVOS_BASE_PATH = '/sys/bus/usb/devices' |
| |
| # Minimal length of usb-path for servo devices connected to the host. |
| MIN_SERVO_PATH = len(SERVOS_BASE_PATH + '/X') |
| |
| def __init__(self, servo_host): |
| self._host = servo_host |
| self.reset() |
| |
| def read(self, host_info): |
| """Reading servo-topology info.""" |
| logging.info('Reading servo topology info...') |
| self.reset() |
| if not host_info: |
| logging.info('The host_info not provided. Skip reading.') |
| return |
| b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) |
| self._topology = _parse_string_as_topology(b64_val) |
| logging.debug('Loaded servo topology: %s', self._topology) |
| if self._topology: |
| logging.info('Servo topology loaded successfully.') |
| |
| def save(self, host_info_store): |
| """Saving servo-topology info.""" |
| if self.is_empty(): |
| logging.info('Topology is empty. Skip saving.') |
| return |
| if not host_info_store: |
| logging.info('The host_info_store not provided. Skip saving.') |
| return |
| logging.info('Saving servo topology info...') |
| data = _convert_topology_to_string(self._topology) |
| if not data: |
| logging.info('Servo topology fail to save data.' |
| ' Please file a bug.') |
| return |
| host_info = host_info_store.get() |
| prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) |
| if prev_value and prev_value == data: |
| logging.info('Servo topology was not changed. Skip saving.') |
| return |
| logging.debug('Previous saved topology: %s', prev_value) |
| host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data) |
| host_info_store.commit(host_info) |
| logging.info('Servo topology saved successfully.') |
| |
| def reset(self): |
| """Reset topology to the initialize state. |
| |
| All cash will be reset to empty state. |
| """ |
| self._topology = None |
| |
| def generate(self): |
| """Read servo data and create topology.""" |
| self.reset() |
| try: |
| self._topology = self._generate() |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Fail to generate servo-topology') |
| if not self.is_empty(): |
| logging.info('Servo topology successfully generated.') |
| |
| def is_empty(self): |
| """If topology data was initialized.""" |
| return not bool(self._topology) |
| |
| def validate(self, raise_error=False, dual_set=False, compare=False): |
| """Validate topology against expected topology. |
| |
| Validation against: |
| - set-up expectation: min one child or 2 for DUAL_V4 |
| - last saved topology: check if any device missed |
| |
| Update topology cache if validation passed successfully. |
| |
| @params raise_error: raise error if validate did not pass otherwise |
| return False. |
| @params dual_set: Check if servo expect DUAL_V4 setup. |
| @params compare: Validate against saved topology. |
| """ |
| new_st = self._generate() |
| logging.debug("Generate topology: %s", new_st) |
| if not new_st or not new_st.get(stc.ST_DEVICE_MAIN): |
| message = 'Main device is not detected' |
| return self._process_error(message, raise_error) |
| children = new_st.get(stc.ST_DEVICE_CHILDREN) |
| # basic setup has to have minimum one child. |
| if not children or len(children) < 1: |
| message = 'Each setup has at least one child' |
| return self._process_error(message, raise_error) |
| children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children] |
| # DUAL_V4 setup has to have cr50 and one more child. |
| if dual_set: |
| if stc.ST_CR50_TYPE not in children_types: |
| return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error) |
| if len(children) < 2: |
| message = 'Expected two children but have only one' |
| return self._process_error(message, raise_error) |
| if compare and not self.is_empty(): |
| main_device = new_st.get(stc.ST_DEVICE_MAIN) |
| t = self._topology |
| old_main = t.get(stc.ST_DEVICE_MAIN) |
| old_children = t.get(stc.ST_DEVICE_CHILDREN) |
| if not all([ |
| old_children, |
| old_main, |
| old_main.get(stc.ST_DEVICE_HUB_PORT), |
| ]): |
| # Old data is invalid for comparasing |
| return True |
| if not self._equal_item(old_main, main_device): |
| message = 'Main servo was changed' |
| return self._process_error(message, raise_error) |
| for child in old_children: |
| old_type = child.get(stc.ST_DEVICE_TYPE) |
| if old_type not in children_types: |
| return self._missing_servo_error(old_type, raise_error) |
| if len(children) < len(old_children): |
| message = 'Some child is missed' |
| return self._process_error(message, raise_error) |
| logging.info('Servo topology successfully verified.') |
| self._topology = new_st |
| return True |
| |
| def is_servo_serial_provided(self): |
| """Verify that root servo serial is provided.""" |
| root_servo_serial = self._host.servo_serial |
| if not root_servo_serial: |
| logging.info('Root servo serial is not provided.') |
| return False |
| logging.debug('Root servo serial: %s', root_servo_serial) |
| return True |
| |
| def _process_error(self, message, raise_error): |
| if not raise_error: |
| logging.info('Validate servo topology failed with: %s', message) |
| return False |
| raise ServoTopologyError(message) |
| |
| def _missing_servo_error(self, servo_type, raise_error): |
| message = 'Missed servo: %s!' % servo_type |
| if not raise_error: |
| logging.info('Validate servo topology failed with: %s', message) |
| return False |
| raise MissingServoError(message, servo_type) |
| |
| def _equal_item(self, old, new): |
| """Servo was replugged to another port""" |
| for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS: |
| if old.get(field) != new.get(field): |
| return False |
| return True |
| |
| def _generate(self): |
| """Generate and return topology structure. |
| |
| Read and generate topology structure with out update the state. |
| """ |
| logging.debug('Trying generate a servo-topology') |
| if not self.is_servo_serial_provided(): |
| return |
| root_servo_serial = self._host.servo_serial |
| root_servo = None |
| children = [] |
| devices = self.get_list_of_devices() |
| for device in devices: |
| if not device.is_good(): |
| logging.info('Skip %s as missing some data', device) |
| continue |
| if device.get_serial_number() == root_servo_serial: |
| root_servo = device.get_topology_item() |
| else: |
| children.append(device.get_topology_item()) |
| if not root_servo: |
| logging.debug('Root servo missed!') |
| return None |
| topology = { |
| stc.ST_DEVICE_MAIN: root_servo, |
| stc.ST_DEVICE_CHILDREN: children |
| } |
| logging.debug('Servo topology: %s', topology) |
| return topology |
| |
| def _get_servo_hub_path(self, servo_serial): |
| """Get path to the servo hub. |
| |
| The root servo is connected directly to the servo-hub. To find other |
| servos connected to the hub we need find the path to the servo-hub. |
| The servod-tool always return direct path to the servo, like: |
| /sys/bus/usb/devices/1-3.2.1 |
| base path: /sys/bus/usb/devices/ |
| root-servo: 1-3.2.1 |
| the alternative path is '/sys/bus/usb/devices/1-3.2/1-3.2.1/' |
| where '1-3.2' is path to servo-hub. To extract path to servo-hub |
| logic parse parse and remove last digit of the port where root servo |
| connected to the servo-hub. |
| base path: /sys/bus/usb/devices/ |
| servo-hub: 1-3.2 |
| root-servo: .1 |
| After we will join only base path with servo-hub. |
| |
| @params servo_serial Serial number of the servo connected to hub |
| @returns: A string representation of fs-path to servo-hub device |
| """ |
| logging.debug('Try to find a hub-path for servo:%s', servo_serial) |
| cmd_hub = self.SERVOD_TOOL_USB_PATH % servo_serial |
| servo_path = self._read_line(cmd_hub) |
| logging.debug('Servo %s path: %s', servo_serial, servo_path) |
| if not servo_path or len(servo_path) < self.MIN_SERVO_PATH: |
| logging.info('Servo not detected.') |
| return None |
| base_path = os.path.dirname(servo_path) |
| root_servo_tail = os.path.basename(servo_path) |
| # Removing last port as |
| servo_hub_tail = '.'.join(root_servo_tail.split('.')[:-1]) |
| return os.path.join(base_path, servo_hub_tail) |
| |
| def get_root_servo(self): |
| """Get root servo device. |
| |
| @returns: ConnectedServo if device found. |
| """ |
| logging.debug('Try to find a root servo') |
| if not self.is_servo_serial_provided(): |
| return None |
| # Find the path to the servo-hub folder. |
| root_servo_serial = self._host.servo_serial |
| cmd_hub = self.SERVOD_TOOL_USB_PATH % root_servo_serial |
| servo_path = self._read_line(cmd_hub) |
| logging.debug('Servo %s path: %s', root_servo_serial, servo_path) |
| if not servo_path or len(servo_path) < self.MIN_SERVO_PATH: |
| logging.info('Servo not detected.') |
| return None |
| device = self._get_device(servo_path) |
| if device and device.is_good(): |
| return device |
| return None |
| |
| def get_root_servo_from_cache(self): |
| """Get root servo device based on topology cache data. |
| |
| First we try to find servo based on topology info. |
| |
| @returns: ConnectedServo if device found. |
| """ |
| logging.info('Trying to find root device from topology cache!') |
| if (not self._topology or not self._topology.get(stc.ST_DEVICE_MAIN)): |
| logging.info('Topology cache is empty or not present') |
| return None |
| devpath = self._topology.get( |
| stc.ST_DEVICE_MAIN)[stc.ST_DEVICE_HUB_PORT] |
| logging.debug('devpath=%s', devpath) |
| if not devpath: |
| return None |
| # devpath represent sequence of ports used to detect device |
| device_fs_port = '1-%s' % devpath |
| logging.debug('device_fs_port=%s', device_fs_port) |
| device_path = os.path.join(self.SERVOS_BASE_PATH, device_fs_port) |
| device = self._get_device(device_path) |
| logging.info('device=%s', device) |
| if device and device.is_good(): |
| return device |
| logging.debug('Trying to verify present of the hub!') |
| hub_folder = '.'.join(device_fs_port.split('.')[:-1]) |
| logging.debug('servo_hub_folder=%s', hub_folder) |
| hub_product = os.path.join(self.SERVOS_BASE_PATH, hub_folder, |
| 'product') |
| logging.debug('hub_product=%s', hub_product) |
| hub_name = self._read_line('cat %s' % hub_product) |
| logging.debug('hub_name=%s', hub_name) |
| if hub_name: |
| raise ServoTopologyError( |
| 'Root servo hardware potentially missing!') |
| raise ServoTopologyError( |
| 'No USB device on expected port for the servo!') |
| |
| def get_list_of_devices(self): |
| """Generate list of devices with serials. |
| |
| Logic based on detecting all device enumerated under servo-hub device. |
| |
| @returns: Collection of detected device connected to the servo-hub. |
| """ |
| logging.debug('Trying generate device-a servo-topology') |
| if not self.is_servo_serial_provided(): |
| return [] |
| # Find the path to the servo-hub folder. |
| hub_path = self._get_servo_hub_path(self._host.servo_serial) |
| logging.debug('Servo hub path: %s', hub_path) |
| if not hub_path: |
| return [] |
| |
| # Find all serial filed of devices under servo-hub. Each device |
| # has to have serial number. |
| devices_cmd = 'find %s/* -name serial' % hub_path |
| devices = self._read_multilines(devices_cmd) |
| children = [] |
| for device in devices: |
| logging.debug('Child device %s', device) |
| device_dir = os.path.dirname(device) |
| child = self._get_device(device_dir) |
| if not child: |
| logging.debug('Child missed some data.') |
| continue |
| children.append(child) |
| logging.debug('Detected devices: %s', len(children)) |
| return children |
| |
| def update_servo_version(self, device=None): |
| """Update version of servo device. |
| |
| @params device: ConnectedServo instance. |
| """ |
| if not device: |
| logging.debug('Device is not provided') |
| return |
| device._version = self._read_file(device.get_path(), 'configuration') |
| logging.debug('New servo version: %s', device.get_version()) |
| |
| def get_list_available_servos(self): |
| """List all servos enumerated on the host.""" |
| logging.debug('Started process to collect all devices on the host.') |
| devices = [] |
| # Looking only devices with Google vendor-id (18d1). |
| cmd = 'grep -s -R "18d1" %s/*/idVendor' % self.SERVOS_BASE_PATH |
| result_paths = self._read_multilines(cmd) |
| for path in result_paths: |
| idVendor_path = path.split(':')[0] |
| if not idVendor_path: |
| logging.debug('Cannot extract path to file from: %s', path) |
| continue |
| base_path = os.path.dirname(idVendor_path) |
| if not base_path: |
| logging.debug('Cannot extract base path from: %s', |
| idVendor_path) |
| continue |
| device = self._get_device(base_path) |
| if not device: |
| logging.debug('Not found device under: %s', base_path) |
| continue |
| devices.append(device) |
| return devices |
| |
| def _get_vid_pid(self, path): |
| """Read VID and PID of the device. |
| |
| @params path Absolute path to the device in FS. |
| @returns: A string representation VID:PID of device. |
| """ |
| vid = self._read_file(path, 'idVendor') |
| pid = self._read_file(path, 'idProduct') |
| if not vid or not pid: |
| return None |
| vid_pid = '%s:%s' % (vid, pid) |
| logging.debug("VID/PID of device device: '%s'", vid_pid) |
| return vid_pid |
| |
| def _get_device(self, path): |
| """Create device representation. |
| |
| @params path: Absolute path to the device in FS. |
| @returns: ConnectedServo if VID/PID present. |
| """ |
| vid_pid = self._get_vid_pid(path) |
| if not vid_pid: |
| return None |
| serial = self._read_file(path, 'serial') |
| product = self._read_file(path, 'product') |
| hub_path = self._read_file(path, 'devpath') |
| configuration = self._read_file(path, 'configuration') |
| servo_type = stc.VID_PID_SERVO_TYPES.get(vid_pid) |
| if not servo_type: |
| return None |
| return ConnectedServo(device_path=path, |
| device_product=product, |
| device_serial=serial, |
| device_type=servo_type, |
| device_vid_pid=vid_pid, |
| device_hub_path=hub_path, |
| device_version=configuration) |
| |
| def _read_file(self, path, file_name): |
| """Read context of the file and return result as one line. |
| |
| If execution finished with error result will be empty string. |
| |
| @params path: Path to the folder where file located. |
| @params file_name: The file name to read. |
| """ |
| if not path or not file_name: |
| return '' |
| f = os.path.join(path, file_name) |
| return self._read_line('cat %s' % f) |
| |
| def _read_line(self, command): |
| """Execute terminal command and return result as one line. |
| |
| If execution finished with error result will be empty string. |
| |
| @params command: String to execute. |
| """ |
| r = self._host.run(command, ignore_status=True, timeout=30) |
| if r.exit_status == 0: |
| return r.stdout.strip() |
| return '' |
| |
| def _read_multilines(self, command): |
| """Execute terminal command and return result as multi-line. |
| |
| If execution finished with error result will be an empty array. |
| |
| @params command: String to execute. |
| """ |
| r = self._host.run(command, ignore_status=True, timeout=30) |
| if r.exit_status == 0: |
| return r.stdout.splitlines() |
| return [] |
| |
| |
| class ConnectedServo(object): |
| """Class to hold info about connected detected.""" |
| |
| def __init__(self, |
| device_path=None, |
| device_product=None, |
| device_serial=None, |
| device_type=None, |
| device_vid_pid=None, |
| device_hub_path=None, |
| device_version=None): |
| self._path = device_path |
| self._product = device_product |
| self._serial = device_serial |
| self._type = device_type |
| self._vid_pid = device_vid_pid |
| self._hub_path = device_hub_path |
| self._version = device_version |
| |
| def get_topology_item(self): |
| """Extract as topology item.""" |
| return { |
| stc.ST_DEVICE_SERIAL: self._serial, |
| stc.ST_DEVICE_TYPE: self._type, |
| stc.ST_DEVICE_PRODUCT: self._product, |
| stc.ST_DEVICE_HUB_PORT: self._hub_path |
| } |
| |
| def is_good(self): |
| """Check if minimal data for topology item is present.""" |
| return self._serial and self._type and self._hub_path |
| |
| def get_type(self): |
| """Servo type.""" |
| return self._type |
| |
| def get_path(self): |
| """Path to servo folder in sysfs.""" |
| return self._path |
| |
| def get_serial_number(self): |
| """Servo serial number.""" |
| return self._serial |
| |
| def get_version(self): |
| """Get servo version.""" |
| return self._version |
| |
| def __str__(self): |
| return ("Device %s:%s (%s, %s) version: %s" % |
| (self._type, self._serial, self._vid_pid, self._hub_path, |
| self._version)) |
| |
| |
| def _convert_topology_to_string(topology): |
| """Convert topology to the string respresentation. |
| |
| Convert topology to json and encode by Base64 for host-info file. |
| |
| @params topology: Servo topology data |
| @returns: topology representation in Base64 string |
| """ |
| if not topology: |
| return '' |
| try: |
| # generate json similar to golang to avoid extra updates |
| json_string = json.dumps(topology, separators=(',', ':')) |
| logging.debug('Servo topology (json): %s', json_string) |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Failed to convert topology to json') |
| return '' |
| try: |
| # recommended to convert to the bytes for python 3 |
| b64_string = base64.b64encode(json_string.encode("utf-8")) |
| logging.debug('Servo topology (b64): %s', b64_string) |
| return b64_string.decode() |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Failed to convert topology to base64') |
| return '' |
| |
| |
| def _parse_string_as_topology(src): |
| """Parse and load servo topology from string. |
| |
| Decode Base64 and load as json of servo-topology data. |
| |
| @params src: topology representation in Base64 string |
| @returns: servo topology data |
| """ |
| if not src: |
| logging.debug('Servo topology data not present in host-info.') |
| return None |
| try: |
| json_string = base64.b64decode(src) |
| logging.debug('Servo topology (json) from host-info: %s', json_string) |
| return json.loads(json_string) |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Fail to read servo-topology from host-info.') |
| return None |