| #!/usr/bin/python3 |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import logging |
| import logging.handlers |
| import multiprocessing |
| import six |
| |
| import common |
| |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes |
| from autotest_lib.client.cros import xmlrpc_server |
| from autotest_lib.client.cros import constants |
| from autotest_lib.client.cros import cros_ui |
| from autotest_lib.client.cros import tpm_store |
| from autotest_lib.client.cros.networking import shill_proxy |
| from autotest_lib.client.cros.networking import wifi_proxy |
| from autotest_lib.client.cros.power import sys_power |
| |
| |
| class ShillXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): |
| """Exposes methods called remotely during WiFi autotests. |
| |
| All instance methods of this object without a preceding '_' are exposed via |
| an XMLRPC server. This is not a stateless handler object, which means that |
| if you store state inside the delegate, that state will remain around for |
| future calls. |
| |
| """ |
| |
| DEFAULT_TEST_PROFILE_NAME = 'test' |
| DBUS_DEVICE = 'Device' |
| |
| def __init__(self): |
| self._wifi_proxy = wifi_proxy.WifiProxy() |
| self._tpm_store = tpm_store.TPMStore() |
| |
| |
| def __enter__(self): |
| super(ShillXmlRpcDelegate, self).__enter__() |
| if not cros_ui.stop(allow_fail=True): |
| logging.error('UI did not stop, there could be trouble ahead.') |
| self._tpm_store.__enter__() |
| |
| |
| def __exit__(self, exception, value, traceback): |
| super(ShillXmlRpcDelegate, self).__exit__(exception, value, traceback) |
| self._tpm_store.__exit__(exception, value, traceback) |
| self.enable_ui() |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def create_profile(self, profile_name): |
| """Create a shill profile. |
| |
| @param profile_name string name of profile to create. |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.CreateProfile(profile_name) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def push_profile(self, profile_name): |
| """Push a shill profile. |
| |
| @param profile_name string name of profile to push. |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.PushProfile(profile_name) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def pop_profile(self, profile_name): |
| """Pop a shill profile. |
| |
| @param profile_name string name of profile to pop. |
| @return True on success, False otherwise. |
| |
| """ |
| if profile_name is None: |
| self._wifi_proxy.manager.PopAnyProfile() |
| else: |
| self._wifi_proxy.manager.PopProfile(profile_name) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def remove_profile(self, profile_name): |
| """Remove a profile from disk. |
| |
| @param profile_name string name of profile to remove. |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.RemoveProfile(profile_name) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def clean_profiles(self): |
| """Pop and remove shill profiles above the default profile. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| while True: |
| active_profile = self._wifi_proxy.get_active_profile() |
| if six.PY2: |
| profile_props = active_profile.GetProperties(utf8_strings=True) |
| else: |
| profile_props = active_profile.GetProperties() |
| profile_name = self._wifi_proxy.dbus2primitive( |
| profile_props['Name']) |
| if profile_name == 'default': |
| return True |
| self._wifi_proxy.manager.PopProfile(profile_name) |
| self._wifi_proxy.manager.RemoveProfile(profile_name) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def configure_service_by_guid(self, raw_params): |
| """Configure a service referenced by a GUID. |
| |
| @param raw_params serialized ConfigureServiceParameters. |
| |
| """ |
| params = xmlrpc_datatypes.deserialize(raw_params) |
| shill = self._wifi_proxy |
| properties = {} |
| if params.autoconnect is not None: |
| properties[shill.SERVICE_PROPERTY_AUTOCONNECT] = params.autoconnect |
| if params.passphrase is not None: |
| properties[shill.SERVICE_PROPERTY_PASSPHRASE] = params.passphrase |
| if properties: |
| self._wifi_proxy.configure_service_by_guid(params.guid, properties) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def configure_wifi_service(self, raw_params): |
| """Configure a WiFi service |
| |
| @param raw_params serialized AssociationParameters. |
| @return True on success, False otherwise. |
| |
| """ |
| params = xmlrpc_datatypes.deserialize(raw_params) |
| return self._wifi_proxy.configure_wifi_service( |
| params.ssid, |
| params.security, |
| params.security_parameters, |
| save_credentials=params.save_credentials, |
| station_type=params.station_type, |
| hidden_network=params.is_hidden, |
| guid=params.guid, |
| autoconnect=params.autoconnect) |
| |
| |
| def connect_wifi(self, raw_params): |
| """Block and attempt to connect to wifi network. |
| |
| @param raw_params serialized AssociationParameters. |
| @return serialized AssociationResult |
| |
| """ |
| logging.debug('connect_wifi()') |
| params = xmlrpc_datatypes.deserialize(raw_params) |
| params.security_config.install_client_credentials(self._tpm_store) |
| wifi_if = params.bgscan_config.interface |
| if wifi_if is None: |
| logging.info('Using default interface for bgscan configuration') |
| interfaces = self.list_controlled_wifi_interfaces() |
| if not interfaces: |
| return xmlrpc_datatypes.AssociationResult( |
| failure_reason='No wifi interfaces found?') |
| |
| if len(interfaces) > 1: |
| logging.error('Defaulting to first interface of %r', interfaces) |
| wifi_if = interfaces[0] |
| if not self._wifi_proxy.configure_bgscan( |
| wifi_if, |
| method=params.bgscan_config.method, |
| short_interval=params.bgscan_config.short_interval, |
| long_interval=params.bgscan_config.long_interval, |
| signal=params.bgscan_config.signal): |
| return xmlrpc_datatypes.AssociationResult( |
| failure_reason='Failed to configure bgscan') |
| |
| raw = self._wifi_proxy.connect_to_wifi_network( |
| params.ssid, |
| params.security, |
| params.security_parameters, |
| params.save_credentials, |
| station_type=params.station_type, |
| hidden_network=params.is_hidden, |
| guid=params.guid, |
| discovery_timeout_seconds=params.discovery_timeout, |
| association_timeout_seconds=params.association_timeout, |
| configuration_timeout_seconds=params.configuration_timeout) |
| result = xmlrpc_datatypes.AssociationResult.from_dbus_proxy_output(raw) |
| return result |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def delete_entries_for_ssid(self, ssid): |
| """Delete a profile entry. |
| |
| @param ssid string of WiFi service for which to delete entries. |
| @return True on success, False otherwise. |
| |
| """ |
| shill = self._wifi_proxy |
| for profile in shill.get_profiles(): |
| if six.PY2: |
| profile_properties = profile.GetProperties(utf8_strings=True) |
| else: |
| profile_properties = profile.GetProperties() |
| profile_properties = shill.dbus2primitive(profile_properties) |
| entry_ids = profile_properties[shill.PROFILE_PROPERTY_ENTRIES] |
| for entry_id in entry_ids: |
| entry = profile.GetEntry(entry_id) |
| if shill.dbus2primitive(entry[shill.ENTRY_FIELD_NAME]) == ssid: |
| profile.DeleteEntry(entry_id) |
| return True |
| |
| |
| def init_test_network_state(self): |
| """Create a clean slate for tests with respect to remembered networks. |
| |
| For shill, this means popping and removing profiles, removing all WiFi |
| entries from the default profile, and pushing a 'test' profile. |
| |
| @return True iff operation succeeded, False otherwise. |
| |
| """ |
| self.clean_profiles() |
| self._wifi_proxy.remove_all_wifi_entries() |
| self.remove_profile(self.DEFAULT_TEST_PROFILE_NAME) |
| worked = self.create_profile(self.DEFAULT_TEST_PROFILE_NAME) |
| if worked: |
| worked = self.push_profile(self.DEFAULT_TEST_PROFILE_NAME) |
| return worked |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def list_controlled_wifi_interfaces(self): |
| """List WiFi interfaces controlled by shill. |
| |
| @return list of string WiFi device names (e.g. ['mlan0']) |
| |
| """ |
| ret = [] |
| devices = self._wifi_proxy.get_devices() |
| for device in devices: |
| if six.PY2: |
| properties = device.GetProperties(utf8_strings=True) |
| else: |
| properties = device.GetProperties() |
| properties = self._wifi_proxy.dbus2primitive(properties) |
| if properties[self._wifi_proxy.DEVICE_PROPERTY_TYPE] != 'wifi': |
| continue |
| ret.append(properties[self._wifi_proxy.DEVICE_PROPERTY_NAME]) |
| return ret |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def disconnect(self, ssid): |
| """Attempt to disconnect from the given ssid. |
| |
| Blocks until disconnected or operation has timed out. Returns True iff |
| disconnect was successful. |
| |
| @param ssid string network to disconnect from. |
| @return bool True on success, False otherwise. |
| |
| """ |
| logging.debug('disconnect()') |
| result = self._wifi_proxy.disconnect_from_wifi_network(ssid) |
| successful, duration, message = result |
| if successful: |
| level = logging.info |
| else: |
| level = logging.error |
| level('Disconnect result: %r, duration: %d, reason: %s', |
| successful, duration, message) |
| return successful is True |
| |
| |
| def wait_for_service_states(self, ssid, states, timeout_seconds): |
| """Wait for service to achieve one state out of a list of states. |
| |
| @param ssid string the network to connect to (e.g. 'GoogleGuest'). |
| @param states tuple the states for which to wait |
| @param timeout_seconds int seconds to wait for a state |
| |
| """ |
| return self._wifi_proxy.wait_for_service_states( |
| ssid, states, timeout_seconds) |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def get_service_order(self): |
| """Get the shill service order. |
| |
| @return string service order on success, None otherwise. |
| |
| """ |
| return str(self._wifi_proxy.manager.GetServiceOrder()) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_service_order(self, order): |
| """Set the shill service order. |
| |
| @param order string comma-delimited service order (eg. 'ethernet,wifi') |
| @return bool True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.SetServiceOrder(dbus.String(order)) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def get_service_properties(self, ssid): |
| """Get a dict of properties for a service. |
| |
| @param ssid string service to get properties for. |
| @return dict of Python friendly built-in types or None on failures. |
| |
| """ |
| discovery_params = {self._wifi_proxy.SERVICE_PROPERTY_TYPE: 'wifi', |
| self._wifi_proxy.SERVICE_PROPERTY_NAME: ssid} |
| service_path = self._wifi_proxy.manager.FindMatchingService( |
| discovery_params) |
| service_object = self._wifi_proxy.get_dbus_object( |
| self._wifi_proxy.DBUS_TYPE_SERVICE, service_path) |
| if six.PY2: |
| service_properties = service_object.GetProperties( |
| utf8_strings=True) |
| else: |
| service_properties = service_object.GetProperties() |
| return self._wifi_proxy.dbus2primitive(service_properties) |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def get_manager_properties(self): |
| if six.PY2: |
| manager_props = self._wifi_proxy.manager.GetProperties( |
| utf8_strings=True) |
| else: |
| manager_props = self._wifi_proxy.manager.GetProperties() |
| return self._wifi_proxy.dbus2primitive(manager_props) |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def get_manager_property(self, property_name): |
| prop_value = self._wifi_proxy.get_dbus_property( |
| self._wifi_proxy.manager, property_name) |
| return self._wifi_proxy.dbus2primitive(prop_value) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_manager_property(self, property_name, property_value): |
| self._wifi_proxy.set_dbus_property(self._wifi_proxy.manager, |
| property_name, property_value) |
| return True |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_optional_manager_property(self, property_name, property_value): |
| """Set optional manager property. |
| |
| @param property_name String name of property to set |
| @param property_value String value to set property to |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.set_optional_dbus_property( |
| self._wifi_proxy.manager, property_name, property_value) |
| return True |
| |
| @xmlrpc_server.dbus_safe(False) |
| def get_active_wifi_SSIDs(self): |
| """@return list of string SSIDs with at least one BSS we've scanned.""" |
| return self._wifi_proxy.get_active_wifi_SSIDs() |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_sched_scan(self, enable): |
| """Configure scheduled scan. |
| |
| @param enable bool flag indicating to enable/disable scheduled scan. |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.set_sched_scan(enable) |
| return True |
| |
| |
| def enable_ui(self): |
| """@return True iff the UI was successfully started.""" |
| return cros_ui.start(allow_fail=True, wait_for_login_prompt=False) == 0 |
| |
| |
| def sync_time_to(self, epoch_seconds): |
| """Sync time on the DUT to |epoch_seconds| from the epoch. |
| |
| @param epoch_seconds: float number of seconds from the epoch. |
| |
| """ |
| utils.run('date -u --set=@%f' % epoch_seconds) |
| return True |
| |
| |
| @staticmethod |
| def do_suspend(seconds): |
| """Suspend DUT using the power manager. |
| |
| @param seconds: The number of seconds to suspend the device. |
| |
| """ |
| return sys_power.do_suspend(seconds) |
| |
| |
| @staticmethod |
| def do_suspend_bg(seconds): |
| """Suspend DUT using the power manager - non-blocking. |
| |
| @param seconds int The number of seconds to suspend the device. |
| |
| """ |
| process = multiprocessing.Process(target=sys_power.do_suspend, |
| args=(seconds, 1)) |
| process.start() |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(None) |
| def get_dbus_property_on_device(self, wifi_interface, prop_name): |
| """Get a property for the given WiFi device. |
| |
| @param wifi_interface: string name of interface being queried. |
| @param prop_name: the name of the property. |
| @return the current value of the property. |
| |
| """ |
| dbus_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': wifi_interface}) |
| if dbus_object is None: |
| return None |
| |
| object_properties = dbus_object.GetProperties(utf8_strings=True) |
| if prop_name not in object_properties: |
| return None |
| |
| return self._wifi_proxy.dbus2primitive( |
| object_properties[prop_name]) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_dbus_property_on_device(self, wifi_interface, prop_name, value): |
| """Set a property on the given WiFi device. |
| |
| @param wifi_interface: the device to set a property for. |
| @param prop_name: the name of the property. |
| @param value: the desired value of the property. |
| @return True if successful, False otherwise. |
| |
| """ |
| device_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': wifi_interface}) |
| if device_object is None: |
| return False |
| |
| shill_proxy.ShillProxy.set_dbus_property(device_object, |
| prop_name, |
| value) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def request_roam_dbus(self, bssid, interface): |
| """Request that we roam to the specified BSSID. |
| |
| Note that this operation assumes that: |
| |
| 1) We're connected to an SSID for which |bssid| is a member. |
| 2) There is a BSS with an appropriate ID in our scan results. |
| |
| @param bssid: string BSSID of BSS to roam to. |
| @param interface: string name of interface to request roam for. |
| |
| """ |
| |
| device_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': interface}) |
| if device_object is None: |
| return False |
| device_object.RequestRoam(bssid) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_device_enabled(self, wifi_interface, enabled): |
| """Enable or disable the WiFi device. |
| |
| @param wifi_interface: string name of interface being modified. |
| @param enabled: boolean; true if this device should be enabled, |
| false if this device should be disabled. |
| @return True if it worked; false, otherwise |
| |
| """ |
| interface = {'Name': wifi_interface} |
| dbus_object = self._wifi_proxy.find_object(self.DBUS_DEVICE, |
| interface) |
| if dbus_object is None: |
| return False |
| |
| if enabled: |
| dbus_object.Enable() |
| else: |
| dbus_object.Disable() |
| return True |
| |
| @xmlrpc_server.dbus_safe(False) |
| def add_wake_packet_source(self, wifi_interface, source_ip): |
| """Set up the NIC to wake on packets from the given source IP. |
| |
| @param wifi_interface: string name of interface to establish WoWLAN on. |
| @param source_ip: string IP address of packet source, i.e. "127.0.0.1" |
| |
| @return True on success, False otherwise. |
| |
| """ |
| device_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': wifi_interface}) |
| if device_object is None: |
| return False |
| device_object.AddWakeOnPacketConnection(source_ip) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def remove_wake_packet_source(self, wifi_interface, source_ip): |
| """Stop waking on packets from the given source IP. |
| |
| @param wifi_interface: string name of interface to establish WoWLAN on. |
| @param source_ip: string IP address of packet source, i.e. "127.0.0.1" |
| |
| @return True on success, False otherwise. |
| |
| """ |
| device_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': wifi_interface}) |
| if device_object is None: |
| return False |
| device_object.RemoveWakeOnPacketConnection(source_ip) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def remove_all_wake_packet_sources(self, wifi_interface): |
| """Stop waking on packets from any IP. |
| |
| @param wifi_interface: string name of interface to establish WoWLAN on. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| device_object = self._wifi_proxy.find_object( |
| self.DBUS_DEVICE, {'Name': wifi_interface}) |
| if device_object is None: |
| return False |
| device_object.RemoveAllWakeOnPacketConnections() |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def request_scan(self): |
| """Request a scan from shill. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| self._wifi_proxy.manager.RequestScan('wifi') |
| return True |
| |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| handler = logging.handlers.SysLogHandler(address = '/dev/log') |
| formatter = logging.Formatter( |
| 'shill_xmlrpc_server: [%(levelname)s] %(message)s') |
| handler.setFormatter(formatter) |
| logging.getLogger().addHandler(handler) |
| logging.debug('shill_xmlrpc_server main...') |
| server = xmlrpc_server.XmlRpcServer('localhost', |
| constants.SHILL_XMLRPC_SERVER_PORT) |
| server.register_delegate(ShillXmlRpcDelegate()) |
| server.run() |