| #!/usr/bin/env python3 |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| import common |
| from autotest_lib.client.common_lib import utils as client_utils |
| from autotest_lib.server.cros.servo.topology import topology_constants |
| |
| try: |
| from autotest_lib.utils.frozen_chromite.lib import metrics |
| except ImportError: |
| metrics = client_utils.metrics_mock |
| |
| |
| class ServoFwVersionMissedError(Exception): |
| """Raised when Available version is not detected.""" |
| |
| |
| class ServoUpdaterError(Exception): |
| """Raised when detected issue with servo_updater.""" |
| |
| |
| class _BaseUpdateServoFw(object): |
| """Base class to update firmware on servo""" |
| |
| # Commands to kill active servo_updater fail with timeout |
| ACTIVE_UPDATER_CORE = 'ps aux | grep -ie [s]ervo_updater |grep "%s" ' |
| ACTIVE_UPDATER_PRINT = ACTIVE_UPDATER_CORE + "| awk '{print $2}' " |
| ACTIVE_UPDATER_KILL = ACTIVE_UPDATER_PRINT + "| xargs kill -9 " |
| |
| # Command to update FW for servo. Always reboot servo after update. |
| UPDATER_TAIL = '-b %s -s "%s" -c %s --reboot' |
| UPDATER_CMD = 'servo_updater ' + UPDATER_TAIL |
| UPDATER_CONTAINER_CMD = 'python /update_servo_firmware.py ' + UPDATER_TAIL |
| |
| # Command to get servo firmware version for requested board and channel. |
| LATEST_VERSION_CMD = 'servo_updater -p -b "%s" -c %s | grep firmware' |
| |
| # Default firmware channel. |
| DEFAULT_FW_CHANNEL = 'stable' |
| |
| def __init__(self, servo_host, device): |
| """Init servo-updater instance. |
| |
| @params servo_host: ServoHost instance to run terminal commands |
| @params device: ConnectedServo instance provided servo info |
| """ |
| self._host = servo_host |
| self._device = device |
| |
| def need_update(self, ignore_version=False, channel=None): |
| """Verify that servo_update is required. |
| |
| @params ignore_version: Do not check the version on the device. |
| @params channel: Channel for servo firmware. Supported from |
| version R90. Possible values: stable, prev, |
| dev, alpha. |
| |
| @returns: True if update required, False if not |
| """ |
| if not channel: |
| channel = self.DEFAULT_FW_CHANNEL |
| if not self._host: |
| logging.debug('Skip update as host is provided.') |
| return False |
| elif not self.get_serial_number(): |
| logging.debug('Skip update as servo serial is empty.') |
| return False |
| elif not (self._host.is_labstation() |
| or self._host.is_containerized_servod()): |
| logging.debug('Skip as we run onlu from labstation and container.') |
| return False |
| elif not ignore_version: |
| if not self._is_outdated_version(channel=channel): |
| logging.debug('Skip as version is up today') |
| return False |
| return True |
| |
| def update(self, force_update=False, ignore_version=False, channel=None): |
| """Update firmware on the servo. |
| |
| Steps: |
| 1) Verify servo is not updated by checking the versions. |
| 2) Try to get serial number for the servo. |
| 3) Updating firmware. |
| |
| @params force_update: Run updater with force option. |
| @params ignore_version: Do not check the version on the device. |
| @params channel: Channel for servo firmware. Supported from |
| version R90. Possible values: stable, prev, |
| dev, alpha. |
| """ |
| if not channel: |
| channel = self.DEFAULT_FW_CHANNEL |
| if not self.need_update(ignore_version, channel=channel): |
| logging.info("The board %s doesn't need update.", self.get_board()) |
| return |
| if not self.get_serial_number(): |
| logging.info('Serial number is not detected. It means no update' |
| ' will be performed on servo.') |
| return |
| if self._device.get_type() != self.get_board(): |
| logging.info('Attempt use incorrect updater for %s. Expected: %s.', |
| self._device.get_type(), self.get_board()) |
| return |
| self._update_firmware(force_update, channel) |
| |
| def get_board(self): |
| """Return servo type supported by updater.""" |
| raise NotImplementedError('Please implement method to return' |
| ' servo type') |
| |
| def get_device(self): |
| """Return ConnectedServo instance""" |
| return self._device |
| |
| def get_serial_number(self): |
| """Return serial number for servo device""" |
| return self._device.get_serial_number() |
| |
| def _get_updater_cmd(self, force_update, channel): |
| """Return command to run firmware updater for the servo device. |
| |
| @params force_update: Run updater with force option. |
| @params channel: Channel for servo firmware. |
| """ |
| if self._host.is_containerized_servod(): |
| cmd = self.UPDATER_CONTAINER_CMD |
| else: |
| cmd = self.UPDATER_CMD |
| board = self.get_board() |
| serial_number = self.get_serial_number() |
| cmd = cmd % (board, serial_number, channel.lower()) |
| if force_update: |
| cmd += ' --force ' |
| return cmd |
| |
| def _update_firmware(self, force_update, channel): |
| """Execute firmware updater command. |
| |
| @params force_update: Run updater with force option. |
| @params channel: UpdateCompare version from special firmware channel |
| """ |
| cmd = self._get_updater_cmd(force_update, channel) |
| logging.info('Try to update servo fw update by running: %s', cmd) |
| try: |
| res = self._host.run(cmd, timeout=120) |
| logging.debug('Servo fw update finished; %s', res.stdout.strip()) |
| logging.info('Servo fw update finished') |
| finally: |
| self._kill_active_update_process() |
| |
| def _kill_active_update_process(self): |
| """Kill active servo_update processes when stuck after attempt.""" |
| try: |
| cmd = self.ACTIVE_UPDATER_KILL % self.get_serial_number() |
| self._host.run(cmd, timeout=30, ignore_status=True) |
| except Exception as e: |
| logging.debug('Fail kill active processes; %s', e) |
| |
| def _current_version(self): |
| """Get current version on servo device""" |
| return self._device.get_version() |
| |
| def _latest_version(self, channel): |
| """Get latest available version from servo_updater. |
| |
| @params channel: Compare version from special firmware channel |
| """ |
| cmd = self.LATEST_VERSION_CMD % (self.get_board(), channel.lower()) |
| re = self._host.run(cmd, ignore_status=True) |
| if re.exit_status == 0: |
| result = re.stdout.strip().split(':') |
| if len(result) == 2: |
| return result[-1].strip() |
| return None |
| |
| def _is_outdated_version(self, channel): |
| """Compare version to determine request to update the Servo or not. |
| |
| @params channel: Compare version from special firmware channel |
| """ |
| current_version = self._current_version() |
| logging.debug('Servo fw on the device: "%s"', current_version) |
| latest_version = self._latest_version(channel) |
| logging.debug('Latest servo fw: "%s"', latest_version) |
| if not current_version: |
| return True |
| if not latest_version: |
| raise ServoFwVersionMissedError() |
| if current_version == latest_version: |
| return False |
| return True |
| |
| |
| class UpdateServoV4Fw(_BaseUpdateServoFw): |
| """Servo firmware updater for servo_v4.""" |
| |
| def get_board(self): |
| """Return servo type supported by updater""" |
| return topology_constants.ST_V4_TYPE |
| |
| |
| class UpdateServoV4p1Fw(_BaseUpdateServoFw): |
| """Servo firmware updater for servo_v4p1.""" |
| |
| def get_board(self): |
| """Return servo type supported by updater""" |
| return topology_constants.ST_V4P1_TYPE |
| |
| |
| class UpdateServoMicroFw(_BaseUpdateServoFw): |
| """Servo firmware updater for servo_micro.""" |
| |
| def get_board(self): |
| """Return servo type supported by updater""" |
| return topology_constants.ST_SERVO_MICRO_TYPE |
| |
| |
| class UpdateC2D2Fw(_BaseUpdateServoFw): |
| """Servo firmware updater for c2d2.""" |
| |
| def get_board(self): |
| """Return servo type supported by updater""" |
| return topology_constants.ST_C2D2_TYPE |
| |
| |
| class UpdateSweetberryFw(_BaseUpdateServoFw): |
| """Servo firmware updater for sweetberry.""" |
| |
| def get_board(self): |
| """Return servo type supported by updater""" |
| return topology_constants.ST_SWEETBERRY_TYPE |
| |
| |
| # List servo firmware updaters mapped to the type |
| SERVO_UPDATERS = { |
| topology_constants.ST_V4_TYPE: UpdateServoV4Fw, |
| topology_constants.ST_V4P1_TYPE: UpdateServoV4p1Fw, |
| topology_constants.ST_SERVO_MICRO_TYPE: UpdateServoMicroFw, |
| topology_constants.ST_C2D2_TYPE: UpdateC2D2Fw, |
| topology_constants.ST_SWEETBERRY_TYPE: UpdateSweetberryFw, |
| } |
| |
| # List known, tracking issue related to servo_updater. |
| SERVO_UPDATER_ISSUE_MSGS = ('Configuration not set', ) |
| |
| |
| def _run_update_attempt(updater, topology, try_count, force_update, |
| ignore_version, channel): |
| """Run servo update attempt. |
| |
| @params updater: Servo updater instance. |
| @params topology: ServoTopology instance to update version. |
| @params try_count: Count of attempt to run update. |
| @params force_update: Run updater with force option. |
| @params ignore_version: Do not check the version on the device. |
| @params channel: Request servo firmware from special channel |
| |
| @returns: True is finished without any error, False - with error |
| """ |
| board = updater.get_board() |
| success = False |
| for a in range(try_count): |
| msg = 'Starting attempt: %d (of %d) to update "%s".' |
| if force_update: |
| msg += ' with force' |
| logging.info(msg, a + 1, try_count, board) |
| try: |
| updater.update(force_update=force_update, |
| ignore_version=ignore_version, |
| channel=channel) |
| topology.update_servo_version(updater.get_device()) |
| if not updater.need_update(ignore_version=ignore_version, |
| channel=channel): |
| success = True |
| except Exception as er: |
| error_message = str(er) |
| logging.debug('(Not critical) fail to update %s; %s', board, |
| error_message) |
| for message in SERVO_UPDATER_ISSUE_MSGS: |
| if message in error_message: |
| raise ServoUpdaterError() |
| if success: |
| break |
| return success |
| |
| |
| def any_servo_needs_firmware_update(host): |
| """Verify if any servo requires firmware update. |
| |
| @params host: ServoHost instance to run required commands |
| and access to topology. |
| @returns: True if any servo requires an update. |
| """ |
| if not host: |
| raise ValueError('ServoHost is not provided.') |
| |
| has_servo_requires_update = False |
| for device in host.get_topology().get_list_of_devices(): |
| # Verify that device can provide serial and servo_type. |
| if not device.is_good(): |
| continue |
| board = device.get_type() |
| updater_type = SERVO_UPDATERS.get(board, None) |
| if not updater_type: |
| logging.debug('No specified updater for %s', board) |
| continue |
| logging.debug('Specified updater found for %s', board) |
| # Creating update instance |
| updater = updater_type(host, device) |
| if updater.need_update(ignore_version=False, |
| channel=host.servo_fw_channel): |
| logging.info('The servo: %s requires firmware update!', board) |
| has_servo_requires_update = True |
| else: |
| logging.info('The servo: %s does not require firmware update!', |
| board) |
| return has_servo_requires_update |
| |
| |
| def update_servo_firmware(host, |
| boards=None, |
| try_attempt_count=1, |
| force_update=False, |
| try_force_update=False, |
| ignore_version=False): |
| """Update firmware on servo devices. |
| |
| @params host: ServoHost instance to run required commands |
| and access to topology. |
| @params try_attempt_count: Count of attempts to update servo. For force |
| option the count attempts is always 1 (one). |
| @params try_force_update: Try force force option if fail to update in |
| normal mode. |
| @params force_update: Run updater with force option. Override |
| try_force_update option. |
| @params ignore_version: Do not check the version on the device. |
| |
| @returns: True is all servos updated or does not need it, |
| False if any device could not updated. |
| """ |
| if boards is None: |
| boards = [] |
| if ignore_version: |
| logging.info('Running servo_updater with ignore_version=True') |
| |
| if not host: |
| raise ValueError('ServoHost is not provided.') |
| |
| # Use force option as first attempt |
| use_force_option_as_first_attempt = False |
| # If requested to update with force then first attempt will be with force |
| # and there no second attempt. |
| if force_update: |
| try_attempt_count = 1 |
| try_force_update = False |
| use_force_option_as_first_attempt = True |
| # to run updater we need make sure the servod is not running |
| host.stop_servod() |
| if host.is_containerized_servod(): |
| # Starting container as servo_updated located in it. |
| # Starting without servod as it can block access to the servos. |
| host.start_containerized_servod(with_servod=False) |
| |
| # Collection to count which board failed to update |
| fail_boards = [] |
| |
| servo_topology = host.get_topology() |
| # Get list connected servos |
| for device in servo_topology.get_list_of_devices(): |
| # Verify that device can provide serial and servo_type. |
| if not device.is_good(): |
| continue |
| board = device.get_type() |
| if len(boards) > 0 and board not in boards: |
| logging.info('The %s is not requested for update', board) |
| continue |
| updater_type = SERVO_UPDATERS.get(board, None) |
| if not updater_type: |
| logging.info('No specified updater for %s', board) |
| continue |
| # Creating update instance |
| updater = updater_type(host, device) |
| is_success_update = _run_update_attempt( |
| updater=updater, |
| topology=servo_topology, |
| try_count=try_attempt_count, |
| force_update=use_force_option_as_first_attempt, |
| ignore_version=ignore_version, |
| channel=host.servo_fw_channel) |
| # If fail to update and we got requested to try force option then |
| # run second time with force. |
| if not is_success_update and try_force_update: |
| is_success_update = _run_update_attempt( |
| updater=updater, |
| topology=servo_topology, |
| try_count=1, |
| force_update=True, |
| ignore_version=ignore_version, |
| channel=host.servo_fw_channel) |
| if not is_success_update: |
| logging.info('Fail update firmware for %s', board) |
| hostname = host.get_dut_hostname() or host.hostname |
| metrics.Counter('chromeos/autotest/servo/fw_update_fail' |
| ).increment(fields={'host': hostname}) |
| fail_boards.append(board) |
| |
| # Need stop containr without servod we started above. |
| if host.is_containerized_servod(): |
| host.stop_servod() |
| |
| if len(fail_boards) == 0: |
| logging.info('Successfull updated all requested servos.') |
| return True |
| return False |