| #!/usr/bin/env python2 |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import print_function |
| from __future__ import absolute_import |
| from __future__ import unicode_literals |
| from __future__ import division |
| |
| import os |
| import copy |
| import json |
| import base64 |
| import logging |
| |
| import common |
| from autotest_lib.client.common_lib import hosts |
| from autotest_lib.server.cros.servo.topology import topology_constants as stc |
| |
| |
| class ServoTopologyError(Exception): |
| """ |
| Generic Exception for failures from ServoTopology object. |
| """ |
| pass |
| |
| |
| class MissingServoError(ServoTopologyError): |
| """ |
| Exception to throw when child servo type is missing. |
| """ |
| |
| def __init__(self, message, servo_type): |
| self._servo_type = servo_type |
| self.message = message |
| |
| def __str__(self): |
| return repr(self.message) |
| |
| |
| class ServoTopology(object): |
| """Class to read, generate and validate servo topology in the lab. |
| |
| The class support detection of servo listed in ST_PRODUCT_TYPES. |
| To save servo topology to host-info date passed two steps: |
| - convert to the json |
| - encode to base64 |
| """ |
| |
| def __init__(self, servo_host): |
| self._host = servo_host |
| self._topology = None |
| |
| def read(self, host_info): |
| """Reading servo-topology info.""" |
| logging.info('Reading servo topology info...') |
| self._topology = None |
| if not host_info: |
| logging.info('The host_info not provided. Skip reading.') |
| return |
| b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) |
| self._topology = _parse_string_as_topology(b64_val) |
| logging.debug('Loaded servo topology: %s', self._topology) |
| if self._topology: |
| logging.info('Servo topology loaded successfully.') |
| |
| def save(self, host_info_store): |
| """Saving servo-topology info.""" |
| if self.is_empty(): |
| logging.info('Topology is empty. Skip saving.') |
| return |
| if not host_info_store: |
| logging.info('The host_info_store not provided. Skip saving.') |
| return |
| logging.info('Saving servo topology info...') |
| data = _convert_topology_to_string(self._topology) |
| if not data: |
| logging.info('Servo topology fail to save data.' |
| ' Please file a bug.') |
| return |
| host_info = host_info_store.get() |
| prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) |
| if prev_value and prev_value == data: |
| logging.info('Servo topology was not changed. Skip saving.') |
| return |
| logging.debug('Previous saved topology: %s', prev_value) |
| host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data) |
| host_info_store.commit(host_info) |
| logging.info('Servo topology saved successfully.') |
| |
| def generate(self): |
| """Read servo data and create topology.""" |
| try: |
| self._topology = self._generate() |
| except Exception as e: |
| self._topology = None |
| logging.debug('(Not critical) %s', e) |
| logging.info('Fail to generate servo-topology') |
| if not self.is_empty(): |
| logging.info('Servo topology successfully generated.') |
| |
| def is_empty(self): |
| """If topology data was initialized.""" |
| return not bool(self._topology) |
| |
| def validate(self, raise_error=False, dual_set=False, compare=False): |
| """Validate topology against expected topology. |
| |
| Validation against: |
| - set-up expectation: min one child or 2 for DUAL_V4 |
| - last saved topology: check if any device missed |
| |
| @params raise_error: raise error if validate did not pass otherwise |
| return False. |
| @params dual_set: Check if servo expect DUAL_V4 setup. |
| @params compare: Validate against saved topology. |
| """ |
| new_st = self._generate() |
| if not new_st or not new_st.get(stc.ST_DEVICE_MAIN): |
| message = 'Main device is not detected' |
| return self._process_error(message, raise_error) |
| children = new_st.get(stc.ST_DEVICE_CHILDREN) |
| # basic setup has to have minimum one child. |
| if not children or len(children) < 1: |
| message = 'Each setup has at least one child' |
| return self._process_error(message, raise_error) |
| children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children] |
| # DUAL_V4 setup has to have cr50 and one more child. |
| if dual_set: |
| if stc.ST_CR50_TYPE not in children_types: |
| return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error) |
| if len(children) < 2: |
| message = 'Expected two children but have only one' |
| return self._process_error(message, raise_error) |
| if compare and not self.is_empty(): |
| main_device = new_st.get(stc.ST_DEVICE_MAIN) |
| t = self._topology |
| old_main = t.get(stc.ST_DEVICE_MAIN) |
| old_children = t.get(stc.ST_DEVICE_CHILDREN) |
| if not all([ |
| old_children, |
| old_main, |
| old_main.get(stc.ST_DEVICE_HUB_PORT), |
| ]): |
| # Old data is invalid for comparasing |
| return True |
| if not self._equal_item(old_main, main_device): |
| message = 'Main servo was changed' |
| return self._process_error(message, raise_error) |
| for child in old_children: |
| old_type = child.get(stc.ST_DEVICE_TYPE) |
| if old_type not in children_types: |
| return self._missing_servo_error(old_type, raise_error) |
| if len(children) < len(old_children): |
| message = 'Some child is missed' |
| return self._process_error(message, raise_error) |
| logging.info('Servo topology successfully verified.') |
| return True |
| |
| def _process_error(self, message, raise_error): |
| if not raise_error: |
| logging.info('Validate servo topology failed with: %s', message) |
| return False |
| raise ServoTopologyError(message) |
| |
| def _missing_servo_error(self, servo_type, raise_error): |
| message = 'Missed servo: %s!' % servo_type |
| if not raise_error: |
| logging.info('Validate servo topology failed with: %s', message) |
| return False |
| raise MissingServoError(message, servo_type) |
| |
| def _equal_item(self, old, new): |
| """Servo was replugged to another port""" |
| for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS: |
| if old.get(field) != new.get(field): |
| return False |
| return True |
| |
| def _generate(self): |
| """Generate and return topology structure. |
| |
| Read and generate topology structure with out update the state. |
| """ |
| logging.debug('Trying generate a servo-topology') |
| core_servo_serial = self._host.servo_serial |
| if not core_servo_serial: |
| logging.info('Servo serial is not provided.') |
| return None |
| logging.debug('Getting topology for core servo: %s', core_servo_serial) |
| # collect main device info |
| cmd_hub = 'servodtool device -s %s usb-path' % core_servo_serial |
| servo_path = self._read_line(cmd_hub) |
| logging.debug('Device -%s path: %s', core_servo_serial, servo_path) |
| if not servo_path: |
| logging.info('Core servo not detected.') |
| return None |
| if not self._is_expected_type(servo_path): |
| return None |
| main_device = self._read_device_info(servo_path) |
| if not main_device: |
| logging.debug('Core device missed some data') |
| return None |
| # collect child device info |
| children = [] |
| hub_path = servo_path[0:-2] |
| logging.debug('Core hub path: %s', hub_path) |
| devices_cmd = 'find %s/* -name serial' % hub_path |
| devices = self._read_multilines(devices_cmd) |
| core_device_port = main_device.get(stc.ST_DEVICE_HUB_PORT) |
| for device in devices: |
| logging.debug('Child device %s', device) |
| device_dir = os.path.dirname(device) |
| if not self._is_expected_type(device_dir): |
| # skip not expected device type like USB or hubs |
| continue |
| child = self._read_device_info(device_dir) |
| if not child: |
| logging.debug('Child missed some data.') |
| continue |
| if core_device_port == child.get(stc.ST_DEVICE_HUB_PORT): |
| logging.debug('Skip device if match with core device') |
| continue |
| children.append(child) |
| topology = { |
| stc.ST_DEVICE_MAIN: main_device, |
| stc.ST_DEVICE_CHILDREN: children |
| } |
| logging.debug('Servo topology: %s', topology) |
| return topology |
| |
| def _is_expected_type(self, path): |
| """Check if device type is known servo type. |
| |
| Please update ST_PRODUCT_TYPES to extend more servo types. |
| """ |
| product = self._read_file(path, 'product') |
| if bool(stc.ST_PRODUCT_TYPES.get(product)): |
| return True |
| logging.info('Unknown product: %s', product) |
| return False |
| |
| def _read_device_info(self, path): |
| """Read device details for topology. |
| |
| @params path: Absolute path to the device in FS. |
| """ |
| serial = self._read_file(path, 'serial') |
| product = self._read_file(path, 'product') |
| hub_path = self._read_file(path, 'devpath') |
| stype = stc.ST_PRODUCT_TYPES.get(product) |
| return self._create_item(serial, stype, product, hub_path) |
| |
| def _create_item(self, servo_serial, servo_type, product, hub_path): |
| """Create topology item. |
| |
| Return created item only if all details provided. |
| |
| @params servo_serial: Serial number of device. |
| @params servo_type: Product type code of the device. |
| @params product: Product name of the device. |
| @params hub_path: Device enumerated folder name. Show the |
| chain of used ports to connect the device. |
| """ |
| item = { |
| stc.ST_DEVICE_SERIAL: servo_serial, |
| stc.ST_DEVICE_TYPE: servo_type, |
| stc.ST_DEVICE_PRODUCT: product, |
| stc.ST_DEVICE_HUB_PORT: hub_path |
| } |
| if not (servo_serial and servo_type and product and hub_path): |
| logging.debug('Some data missing: %s', item) |
| return None |
| return item |
| |
| def _read_file(self, path, file_name): |
| """Read context of the file and return result as one line. |
| |
| If execution finished with error result will be empty string. |
| |
| @params path: Path to the folder where file located. |
| @params file_name: The file name to read. |
| """ |
| if not path or not file_name: |
| return '' |
| f = os.path.join(path, file_name) |
| return self._read_line('cat %s' % f) |
| |
| def _read_line(self, command): |
| """Execute terminal command and return result as one line. |
| |
| If execution finished with error result will be empty string. |
| |
| @params command: String to execute. |
| """ |
| r = self._host.run(command, ignore_status=True, timeout=30) |
| if r.exit_status == 0: |
| return r.stdout.strip() |
| return '' |
| |
| def _read_multilines(self, command): |
| """Execute terminal command and return result as multi-line. |
| |
| If execution finished with error result will be an empty array. |
| |
| @params command: String to execute. |
| """ |
| r = self._host.run(command, ignore_status=True, timeout=30) |
| if r.exit_status == 0: |
| return r.stdout.splitlines() |
| return [] |
| |
| |
| def _convert_topology_to_string(topology): |
| """Convert topology to the string respresentation. |
| |
| Convert topology to json and encode by Base64 for host-info file. |
| |
| @params topology: Servo topology data |
| @returns: topology representation in Base64 string |
| """ |
| if not topology: |
| return '' |
| try: |
| # generate json similar to golang to avoid extra updates |
| json_string = json.dumps(topology, separators=(',', ':')) |
| logging.debug('Servo topology (json): %s', json_string) |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Failed to convert topology to json') |
| return '' |
| try: |
| # recommended to convert to the bytes for python 3 |
| b64_string = base64.b64encode(json_string.encode("utf-8")) |
| logging.debug('Servo topology (b64): %s', b64_string) |
| return b64_string |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Failed to convert topology to base64') |
| return '' |
| |
| |
| def _parse_string_as_topology(src): |
| """Parse and load servo topology from string. |
| |
| Decode Base64 and load as json of servo-topology data. |
| |
| @params src: topology representation in Base64 string |
| @returns: servo topology data |
| """ |
| if not src: |
| logging.debug('Servo topology data not present in host-info.') |
| return None |
| try: |
| json_string = base64.b64decode(src) |
| logging.debug('Servo topology (json) from host-info: %s', json_string) |
| return json.loads(json_string) |
| except Exception as e: |
| logging.debug('(Not critical) %s', e) |
| logging.info('Fail to read servo-topology from host-info.') |
| return None |