| # Lint as: python2, python3 |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import dbus |
| import logging |
| import six |
| import time |
| |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.client.cros.networking import shill_proxy |
| |
| |
| class WifiProxy(shill_proxy.ShillProxy): |
| """Wrapper around shill dbus interface used by wifi tests.""" |
| |
| |
| def set_logging_for_wifi_test(self): |
| """Set the logging in shill for a test of wifi technology. |
| |
| Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes |
| to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for |
| |ShillProxy.TECHNOLOGY_WIFI|. |
| |
| """ |
| self.set_logging_for_test(self.TECHNOLOGY_WIFI) |
| |
| |
| def remove_all_wifi_entries(self): |
| """Iterate over all pushed profiles and remove WiFi entries.""" |
| profiles = self.get_profiles() |
| for profile in profiles: |
| profile_properties = profile.GetProperties() |
| entries = profile_properties[self.PROFILE_PROPERTY_ENTRIES] |
| for entry_id in entries: |
| try: |
| entry = profile.GetEntry(entry_id) |
| except dbus.exceptions.DBusException as e: |
| logging.error('Unable to retrieve entry %s:%r', entry_id, e) |
| continue |
| if entry[self.ENTRY_FIELD_TYPE] == 'wifi': |
| profile.DeleteEntry(entry_id) |
| |
| |
| def configure_wifi_service(self, ssid, security, security_parameters=None, |
| save_credentials=True, station_type=None, |
| hidden_network=False, guid=None, |
| autoconnect=None): |
| """Configure a WiFi service. |
| |
| @param ssid string name of network to connect to. |
| @param security string type of security used in network (e.g. psk) |
| @param security_parameters dict of service property/value pairs that |
| make up the credentials and settings for the given security |
| type (e.g. the passphrase for psk security). |
| @param save_credentials bool True if we should save EAP credentials. |
| @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. |
| @param hidden_network bool True when the SSID is not broadcasted. |
| @param guid string unique identifier for network. |
| @param autoconnect bool or None. None indicates that this should not |
| be set one way or the other, while a boolean indicates a desired |
| value. |
| |
| """ |
| # |mode| is derived from the station type we're attempting to join. It |
| # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a |
| # shill connection mode. |
| mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] |
| |
| if security_parameters is None: |
| security_parameters = {} |
| |
| config_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', |
| self.SERVICE_PROPERTY_HIDDEN: hidden_network, |
| self.SERVICE_PROPERTY_SSID: ssid, |
| self.SERVICE_PROPERTY_SECURITY_CLASS: security, |
| self.SERVICE_PROPERTY_MODE: mode} |
| if autoconnect is not None: |
| config_params[self.SERVICE_PROPERTY_AUTOCONNECT] = autoconnect |
| config_params.update(security_parameters) |
| if guid is not None: |
| config_params[self.SERVICE_PROPERTY_GUID] = guid |
| try: |
| self.configure_service(config_params) |
| except dbus.exceptions.DBusException as e: |
| logging.error('Caught an error while configuring a WiFi ' |
| 'service: %r', e) |
| return False |
| |
| logging.info('Configured service: %s', ssid) |
| return True |
| |
| |
| def connect_to_wifi_network(self, |
| ssid, |
| security, |
| security_parameters, |
| save_credentials, |
| station_type=None, |
| hidden_network=False, |
| guid=None, |
| autoconnect=None, |
| discovery_timeout_seconds=15, |
| association_timeout_seconds=15, |
| configuration_timeout_seconds=15): |
| """ |
| Connect to a WiFi network with the given association parameters. |
| |
| @param ssid string name of network to connect to. |
| @param security string type of security used in network (e.g. psk) |
| @param security_parameters dict of service property/value pairs that |
| make up the credentials and settings for the given security |
| type (e.g. the passphrase for psk security). |
| @param save_credentials bool True if we should save EAP credentials. |
| @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. |
| @param hidden_network bool True when the SSID is not broadcasted. |
| @param guid string unique identifier for network. |
| @param discovery_timeout_seconds float timeout for service discovery. |
| @param association_timeout_seconds float timeout for service |
| association. |
| @param configuration_timeout_seconds float timeout for DHCP |
| negotiations. |
| @param autoconnect: bool or None. None indicates that this should not |
| be set one way or the other, while a boolean indicates a desired |
| value. |
| @return (successful, discovery_time, association_time, |
| configuration_time, reason) |
| where successful is True iff the operation succeeded, *_time is |
| the time spent waiting for each transition, and reason is a string |
| which may contain a meaningful description of failures. |
| |
| """ |
| logging.info('Attempting to connect to %s', ssid) |
| start_time = time.time() |
| discovery_time = -1.0 |
| association_time = -1.0 |
| configuration_time = -1.0 |
| if station_type not in self.SUPPORTED_WIFI_STATION_TYPES: |
| return (False, discovery_time, association_time, |
| configuration_time, |
| 'FAIL(Invalid station type specified.)') |
| |
| # |mode| is derived from the station type we're attempting to join. It |
| # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a |
| # shill connection mode. |
| mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] |
| |
| if hidden_network: |
| logging.info('Configuring %s as a hidden network.', ssid) |
| if not self.configure_wifi_service( |
| ssid, security, save_credentials=save_credentials, |
| station_type=station_type, hidden_network=True, |
| autoconnect=autoconnect): |
| return (False, discovery_time, association_time, |
| configuration_time, |
| 'FAIL(Failed to configure hidden SSID)') |
| |
| logging.info('Configured hidden service: %s', ssid) |
| |
| |
| logging.info('Discovering...') |
| discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', |
| self.SERVICE_PROPERTY_NAME: ssid, |
| self.SERVICE_PROPERTY_SECURITY_CLASS: security, |
| self.SERVICE_PROPERTY_MODE: mode} |
| while time.time() - start_time < discovery_timeout_seconds: |
| discovery_time = time.time() - start_time |
| service_object = self.find_matching_service(discovery_params) |
| if service_object: |
| try: |
| service_properties = service_object.GetProperties() |
| except dbus.exceptions.DBusException: |
| # This usually means the service handle has become invalid. |
| # Which is sort of like not getting a handle back from |
| # find_matching_service in the first place. |
| continue |
| strength = self.dbus2primitive( |
| service_properties[self.SERVICE_PROPERTY_STRENGTH]) |
| if strength > 0: |
| logging.info('Discovered service: %s. Strength: %r.', |
| ssid, strength) |
| break |
| |
| # This is spammy, but shill handles that for us. |
| self.manager.RequestScan('wifi') |
| time.sleep(self.POLLING_INTERVAL_SECONDS) |
| else: |
| return (False, discovery_time, association_time, |
| configuration_time, 'FAIL(Discovery timed out)') |
| |
| # At this point, we know |service| is in the service list. Attempt |
| # to connect it, and watch the states roll by. |
| logging.info('Connecting...') |
| try: |
| for service_property, value in six.iteritems(security_parameters): |
| service_object.SetProperty(service_property, value) |
| if guid is not None: |
| service_object.SetProperty(self.SERVICE_PROPERTY_GUID, guid) |
| if autoconnect is not None: |
| service_object.SetProperty(self.SERVICE_PROPERTY_AUTOCONNECT, |
| autoconnect) |
| service_object.Connect() |
| logging.info('Called connect on service') |
| except dbus.exceptions.DBusException as e: |
| logging.error('Caught an error while trying to connect: %s', |
| e.get_dbus_message()) |
| return (False, discovery_time, association_time, |
| configuration_time, 'FAIL(Failed to call connect)') |
| |
| logging.info('Associating...') |
| result = self.wait_for_property_in( |
| service_object, |
| self.SERVICE_PROPERTY_STATE, |
| self.SERVICE_CONNECTED_STATES + ['configuration'], |
| association_timeout_seconds) |
| (successful, _, association_time) = result |
| if not successful: |
| return (False, discovery_time, association_time, |
| configuration_time, 'FAIL(Association timed out)') |
| |
| logging.info('Associated with service: %s', ssid) |
| |
| logging.info('Configuring...') |
| result = self.wait_for_property_in( |
| service_object, |
| self.SERVICE_PROPERTY_STATE, |
| self.SERVICE_CONNECTED_STATES, |
| configuration_timeout_seconds) |
| (successful, _, configuration_time) = result |
| if not successful: |
| return (False, discovery_time, association_time, |
| configuration_time, 'FAIL(Configuration timed out)') |
| |
| logging.info('Configured service: %s', ssid) |
| |
| # Great success! |
| logging.info('Connected to WiFi service.') |
| return (True, discovery_time, association_time, configuration_time, |
| 'SUCCESS(Connection successful)') |
| |
| |
| def disconnect_from_wifi_network(self, ssid, timeout=None): |
| """Disconnect from the specified WiFi network. |
| |
| Method will succeed if it observes the specified network in the idle |
| state after calling Disconnect. |
| |
| @param ssid string name of network to disconnect. |
| @param timeout float number of seconds to wait for idle. |
| @return tuple(success, duration, reason) where: |
| success is a bool (True on success). |
| duration is a float number of seconds the operation took. |
| reason is a string containing an informative error on failure. |
| |
| """ |
| if timeout is None: |
| timeout = self.SERVICE_DISCONNECT_TIMEOUT |
| service_description = {self.SERVICE_PROPERTY_TYPE: 'wifi', |
| self.SERVICE_PROPERTY_NAME: ssid} |
| service = self.find_matching_service(service_description) |
| if service is None: |
| return (False, |
| 0.0, |
| 'Failed to disconnect from %s, service not found.' % ssid) |
| |
| service.Disconnect() |
| result = self.wait_for_property_in(service, |
| self.SERVICE_PROPERTY_STATE, |
| ('idle',), |
| timeout) |
| (successful, final_state, duration) = result |
| message = 'Success.' |
| if not successful: |
| message = ('Failed to disconnect from %s, ' |
| 'timed out in state: %s.' % (ssid, final_state)) |
| return (successful, duration, message) |
| |
| |
| def configure_bgscan(self, interface, method=None, short_interval=None, |
| long_interval=None, signal=None): |
| """Configures bgscan parameters for shill and wpa_supplicant. |
| |
| @param interface string name of interface to configure (e.g. 'mlan0'). |
| @param method string bgscan method (e.g. 'none'). |
| @param short_interval int short scanning interval. |
| @param long_interval int normal scanning interval. |
| @param signal int signal threshold. |
| |
| """ |
| device = self.find_object('Device', {'Name': interface}) |
| if device is None: |
| logging.error('No device found with name: %s', interface) |
| return False |
| |
| attributes = {'ScanInterval': (dbus.UInt16, long_interval), |
| 'BgscanMethod': (dbus.String, method), |
| 'BgscanShortInterval': (dbus.UInt16, short_interval), |
| 'BgscanSignalThreshold': (dbus.Int32, signal)} |
| for k, (type_cast, value) in six.iteritems(attributes): |
| if value is None: |
| continue |
| |
| # 'default' is defined in: |
| # client/common_lib/cros/network/xmlrpc_datatypes.py |
| # but we don't have access to that file here. |
| if value == 'default': |
| device.ClearProperty(k) |
| else: |
| device.SetProperty(k, type_cast(value)) |
| return True |
| |
| |
| def get_active_wifi_SSIDs(self): |
| """@return list of string SSIDs with at least one BSS we've scanned.""" |
| properties = self.manager.GetProperties() |
| services = [self.get_dbus_object(self.DBUS_TYPE_SERVICE, path) |
| for path in properties[self.MANAGER_PROPERTY_SERVICES]] |
| wifi_services = [] |
| for service in services: |
| try: |
| service_properties = self.dbus2primitive( |
| service.GetProperties()) |
| except dbus.exceptions.DBusException: |
| pass # Probably the service disappeared before GetProperties(). |
| logging.debug('Considering service with properties: %r', |
| service_properties) |
| service_type = service_properties[self.SERVICE_PROPERTY_TYPE] |
| strength = service_properties[self.SERVICE_PROPERTY_STRENGTH] |
| if service_type == 'wifi' and strength > 0: |
| # Note that this may cause terrible things if the SSID |
| # is not a valid ASCII string. |
| ssid = service_properties[self.SERVICE_PROPERTY_HEX_SSID] |
| logging.info('Found active WiFi service: %s', ssid) |
| wifi_services.append(six.ensure_text(ssid, 'hex')) |
| return wifi_services |
| |
| |
| def wait_for_service_states(self, ssid, states, timeout_seconds): |
| """Wait for a service (ssid) to achieve one of a number of states. |
| |
| @param ssid string name of network for whose state we're waiting. |
| @param states tuple states for which to wait. |
| @param timeout_seconds seconds to wait for property to be achieved |
| @return tuple(successful, final_value, duration) |
| where successful is True iff we saw one of |states|, final_value |
| is the final state we saw, and duration is how long we waited to |
| see that value. |
| |
| """ |
| discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', |
| self.SERVICE_PROPERTY_NAME: ssid} |
| start_time = time.time() |
| try: |
| # Find a matching service in any state (only_visible=False) to |
| # make it possible to detect the state of services that are not |
| # visible because they're not in a connected state. |
| service_object = utils.poll_for_condition( |
| condition=lambda: self.find_matching_service( |
| discovery_params, only_visible=False), |
| timeout=timeout_seconds, |
| sleep_interval=self.POLLING_INTERVAL_SECONDS, |
| desc='Find a matching service to the discovery params') |
| |
| return self.wait_for_property_in( |
| service_object, |
| self.SERVICE_PROPERTY_STATE, |
| states, |
| timeout_seconds - (time.time() - start_time)) |
| |
| # poll_for_condition timed out |
| except utils.TimeoutError: |
| logging.error('Timed out waiting for %s states', ssid) |
| return False, 'unknown', timeout_seconds |