| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging, os, re, time |
| |
| from autotest_lib.server import test |
| from autotest_lib.client.common_lib import error |
| |
| _WAIT_DELAY = 25 |
| _USB_DIR = '/sys/bus/usb/devices' |
| |
| class kernel_ExternalUsbPeripheralsDetectionTest(test.test): |
| """Uses servo to repeatedly connect/remove USB devices during boot.""" |
| version = 1 |
| |
| |
| def set_hub_power(self, on=True): |
| """Setting USB hub power status |
| |
| @param: on To power on the servo-usb hub or not |
| |
| """ |
| reset = 'off' |
| if not on: |
| reset = 'on' |
| self.host.servo.set('dut_hub1_rst1', reset) |
| self.pluged_status = on |
| time.sleep(_WAIT_DELAY) |
| |
| |
| def check_usb_peripherals_details(self): |
| """Checks the effect from plugged in USB peripherals. |
| |
| @returns True if command line output is matched successfuly; Else False |
| """ |
| failed = list() |
| for cmd in self.usb_checks.keys(): |
| out_match_list = self.usb_checks.get(cmd) |
| logging.info('Running %s', cmd) |
| |
| # Run the usb check command |
| cmd_out_lines = (self.host.run(cmd, ignore_status=True). |
| stdout.strip().split('\n')) |
| for out_match in out_match_list: |
| match_result = False |
| for cmd_out_line in cmd_out_lines: |
| match_result = (match_result or |
| re.search(out_match, cmd_out_line) != None) |
| if not match_result: |
| failed.append((cmd,out_match)) |
| return failed |
| |
| |
| def get_usb_device_dirs(self): |
| """Gets the usb device dirs from _USB_DIR path. |
| |
| @returns list with number of device dirs else None |
| """ |
| usb_dir_list = [] |
| cmd = 'ls -1 %s' % _USB_DIR |
| tmp = self.host.run(cmd).stdout.strip().split('\n') |
| for d in tmp: |
| usb_dir_list.append(os.path.join(_USB_DIR, d)) |
| return usb_dir_list |
| |
| |
| def get_vendor_id_dict_from_dut(self, dir_list): |
| """Finds the vendor id from provided dir list. |
| |
| @param dir_list: full path of directories |
| @returns dict of all vendor ids vs file path |
| """ |
| vendor_id_dict = dict() |
| for d in dir_list: |
| file_name = os.path.join(d, 'idVendor') |
| if self._exists_on(file_name): |
| vendor_id = self.host.run('cat %s' % file_name).stdout.strip() |
| if vendor_id: |
| vendor_id_dict[vendor_id] = d |
| logging.info('%s', vendor_id_dict) |
| return vendor_id_dict |
| |
| |
| def _exists_on(self, path): |
| """Checks if file exists on host or not. |
| |
| @returns True or False |
| """ |
| return self.host.run('ls %s' % path, |
| ignore_status=True).exit_status == 0 |
| |
| |
| |
| def run_once(self, host, usb_checks=None, |
| vendor_id_dict_control_file=None): |
| """Main function to run the autotest. |
| |
| @param host: name of the host |
| @param usb_checks: dictionary defined in control file |
| @param vendor_id_list: dictionary defined in control file |
| """ |
| self.host = host |
| self.usb_checks = usb_checks |
| |
| self.host.servo.switch_usbkey('dut') |
| self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey') |
| time.sleep(_WAIT_DELAY) |
| |
| self.set_hub_power(False) |
| # Collect the USB devices directories before switching on hub |
| usb_list_dir_off = self.get_usb_device_dirs() |
| |
| self.set_hub_power(True) |
| # Collect the USB devices directories after switching on hub |
| usb_list_dir_on = self.get_usb_device_dirs() |
| |
| diff_list = list(set(usb_list_dir_on).difference(set(usb_list_dir_off))) |
| if len(diff_list) == 0: |
| # Fail if no devices detected after |
| raise error.TestError('No connected devices were detected. Make ' |
| 'sure the devices are connected to USB_KEY ' |
| 'and DUT_HUB1_USB on the servo board.') |
| logging.debug('Connected devices list: %s', diff_list) |
| |
| # Test 1: check USB peripherals info in detail |
| failed = self.check_usb_peripherals_details() |
| if len(failed)> 0: |
| raise error.TestError('USB device not detected %s', str(failed)) |
| |
| # Test 2: check USB device dir under /sys/bus/usb/devices |
| vendor_ids = {} |
| # Gets a dict idVendor: dir_path |
| vendor_ids = self.get_vendor_id_dict_from_dut(diff_list) |
| for vid in vendor_id_dict_control_file.keys(): |
| peripheral = vendor_id_dict_control_file[vid] |
| if vid not in vendor_ids.keys(): |
| raise error.TestFail('%s is not detected at %s dir' |
| % (peripheral, _USB_DIR)) |
| else: |
| # Test 3: check driver symlink and dir for each USB device |
| tmp_list = [device_dir for device_dir in |
| self.host.run('ls -1 %s' % vendor_ids[vid], |
| ignore_status=True).stdout.split('\n') |
| if re.match(r'\d-\d.*:\d\.\d', device_dir)] |
| if not tmp_list: |
| raise error.TestFail('No driver created/loaded for %s' |
| % peripheral) |
| logging.info('---- Drivers for %s ----', peripheral) |
| flag = False |
| for device_dir in tmp_list: |
| driver_path = os.path.join(vendor_ids[vid], |
| '%s/driver' % device_dir) |
| if self._exists_on(driver_path): |
| flag = True |
| link = (self.host.run('ls -l %s | grep ^l' |
| '| grep driver' |
| % driver_path, ignore_status=True) |
| .stdout.strip()) |
| logging.info('%s', link) |
| if not flag: |
| raise error.TestFail('Driver for %s is not loaded - %s' |
| % (peripheral, driver_path)) |