| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import base64 |
| import json |
| import logging |
| import logging.handlers |
| |
| import common |
| from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_sdp_socket |
| from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket |
| from autotest_lib.client.cros import constants |
| from autotest_lib.client.cros import xmlrpc_server |
| |
| |
| class BluetoothTesterXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): |
| """Exposes Tester methods called remotely during Bluetooth autotests. |
| |
| All instance methods of this object without a preceding '_' are exposed via |
| an XML-RPC server. This is not a stateless handler object, which means that |
| if you store state inside the delegate, that state will remain around for |
| future calls. |
| """ |
| |
| BR_EDR_LE_PROFILE = ( |
| bluetooth_socket.MGMT_SETTING_POWERED | |
| bluetooth_socket.MGMT_SETTING_CONNECTABLE | |
| bluetooth_socket.MGMT_SETTING_PAIRABLE | |
| bluetooth_socket.MGMT_SETTING_SSP | |
| bluetooth_socket.MGMT_SETTING_BREDR | |
| bluetooth_socket.MGMT_SETTING_LE) |
| |
| LE_PROFILE = ( |
| bluetooth_socket.MGMT_SETTING_POWERED | |
| bluetooth_socket.MGMT_SETTING_CONNECTABLE | |
| bluetooth_socket.MGMT_SETTING_PAIRABLE | |
| bluetooth_socket.MGMT_SETTING_LE) |
| |
| PROFILE_SETTINGS = { |
| 'computer': BR_EDR_LE_PROFILE, |
| 'peripheral': LE_PROFILE |
| } |
| |
| PROFILE_CLASS = { |
| 'computer': 0x000104, |
| 'peripheral': None |
| } |
| |
| PROFILE_NAMES = { |
| 'computer': ('ChromeOS Bluetooth Tester', 'Tester'), |
| 'peripheral': ('ChromeOS Bluetooth Tester', 'Tester') |
| } |
| |
| |
| def __init__(self): |
| super(BluetoothTesterXmlRpcDelegate, self).__init__() |
| |
| # Open the Bluetooth Control socket to the kernel which provides us |
| # the needed raw management access to the Bluetooth Host Subsystem. |
| self._control = bluetooth_socket.BluetoothControlSocket() |
| # Open the Bluetooth SDP socket to the kernel which provides us the |
| # needed interface to use SDP commands. |
| self._sdp = bluetooth_sdp_socket.BluetoothSDPSocket() |
| # This is almost a constant, but it might not be forever. |
| self.index = 0 |
| |
| |
| def setup(self, profile): |
| """Set up the tester with the given profile. |
| |
| @param profile: Profile to use for this test, valid values are: |
| computer - a standard computer profile |
| |
| @return True on success, False otherwise. |
| |
| """ |
| profile_settings = self.PROFILE_SETTINGS[profile] |
| profile_class = self.PROFILE_CLASS[profile] |
| (profile_name, profile_short_name) = self.PROFILE_NAMES[profile] |
| |
| # Make sure the controller actually exists. |
| if self.index not in self._control.read_index_list(): |
| logging.warning('Bluetooth Controller missing on tester') |
| return False |
| |
| # Make sure all of the settings are supported by the controller. |
| ( address, bluetooth_version, manufacturer_id, |
| supported_settings, current_settings, class_of_device, |
| name, short_name ) = self._control.read_info(self.index) |
| if profile_settings & supported_settings != profile_settings: |
| logging.warning('Controller does not support requested settings') |
| logging.debug('Supported: %b; Requested: %b', supported_settings, |
| profile_settings) |
| return False |
| |
| # The high 8 bits of class_of_device is part of Class of Service field |
| # which are not actually updated in kernel with |
| # self._control.set_device_class() below due to a bug in kernel |
| # mgmt.c: set_dev_class(). Hence, we should keep these bits in |
| # profile_class to make the test setup correctly. |
| # Refer to this link about Class of Device/Service bits. |
| # https://www.bluetooth.com/specifications/assigned-numbers/baseband |
| # Since the class of device is used as an indication only and is not |
| # practically useful in autotest, the service class bits are just |
| # copied from previous self._control.read_info() request. |
| # Refer to Bluetooth Spec. 4.2, "Vol 3, Part C, 3.2.4.4 Usage" about |
| # why it is not actually important. |
| # Refer to "Vol 2. Part E, 7.3.26 Write Class of Device Command" about |
| # the correct parameters to pass to set_device_class() which require |
| # 3 bytes instead of 2 bytes. |
| # Remove the following statement which modifies profile_class only |
| # when kernel is fixed and 3 bytes are passed in set_dev_class(). |
| # However, this is of very low priority. |
| profile_class = ((class_of_device & 0xFF0000) | |
| (profile_class & 0x00FFFF)) |
| |
| # Before beginning, force the adapter power off, even if it's already |
| # off; this is enough to persuade an AP-mode Intel chip to accept |
| # settings. |
| if not self._control.set_powered(self.index, False): |
| logging.warning('Failed to power off adapter to accept settings') |
| return False |
| |
| # Set the controller up as either BR/EDR only, LE only or Dual Mode. |
| # This is a bit tricky because it rejects commands outright unless |
| # it's in dual mode, so we actually have to figure out what changes |
| # we have to make, and we have to turn things on before we turn them |
| # off. |
| turn_on = (current_settings ^ profile_settings) & profile_settings |
| if turn_on & bluetooth_socket.MGMT_SETTING_BREDR: |
| if self._control.set_bredr(self.index, True) is None: |
| logging.warning('Failed to enable BR/EDR') |
| return False |
| if turn_on & bluetooth_socket.MGMT_SETTING_LE: |
| if self._control.set_le(self.index, True) is None: |
| logging.warning('Failed to enable LE') |
| return False |
| |
| turn_off = (current_settings ^ profile_settings) & current_settings |
| if turn_off & bluetooth_socket.MGMT_SETTING_BREDR: |
| if self._control.set_bredr(self.index, False) is None: |
| logging.warning('Failed to disable BR/EDR') |
| return False |
| if turn_off & bluetooth_socket.MGMT_SETTING_LE: |
| if self._control.set_le(self.index, False) is None: |
| logging.warning('Failed to disable LE') |
| return False |
| if turn_off & bluetooth_socket.MGMT_SETTING_SECURE_CONNECTIONS: |
| if self._control.set_secure_connections(self.index, False) is None: |
| logging.warning('Failed to disable secure connections') |
| return False |
| |
| # Adjust settings that are BR/EDR specific that we need to set before |
| # powering on the adapter, and would be rejected otherwise. |
| if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR: |
| if (self._control.set_link_security( |
| self.index, |
| (profile_settings & |
| bluetooth_socket.MGMT_SETTING_LINK_SECURITY)) |
| is None): |
| logging.warning('Failed to set link security setting') |
| return False |
| if (self._control.set_ssp( |
| self.index, |
| profile_settings & bluetooth_socket.MGMT_SETTING_SSP) |
| is None): |
| logging.warning('Failed to set SSP setting') |
| return False |
| if (self._control.set_hs( |
| self.index, |
| profile_settings & bluetooth_socket.MGMT_SETTING_HS) |
| is None): |
| logging.warning('Failed to set High Speed setting') |
| return False |
| |
| # Split our the major and minor class; it's listed as a kernel bug |
| # that we supply these to the kernel without shifting the bits over |
| # to take out the CoD format field, so this might have to change |
| # one day. |
| major_class = (profile_class & 0x00ff00) >> 8 |
| minor_class = profile_class & 0x0000ff |
| if (self._control.set_device_class( |
| self.index, major_class, minor_class) |
| is None): |
| logging.warning('Failed to set device class') |
| return False |
| |
| # Setup generic settings that apply to either BR/EDR, LE or dual-mode |
| # that still require the power to be off. |
| if (self._control.set_connectable( |
| self.index, |
| profile_settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE) |
| is None): |
| logging.warning('Failed to set connectable setting') |
| return False |
| if (self._control.set_pairable( |
| self.index, |
| profile_settings & bluetooth_socket.MGMT_SETTING_PAIRABLE) |
| is None): |
| logging.warning('Failed to set pairable setting') |
| return False |
| |
| if (self._control.set_local_name( |
| self.index, profile_name, profile_short_name) |
| is None): |
| logging.warning('Failed to set local name') |
| return False |
| |
| # Check and set discoverable property |
| if ((profile_settings ^ current_settings) & |
| bluetooth_socket.MGMT_SETTING_DISCOVERABLE): |
| logging.debug('Set discoverable to %x ', |
| profile_settings & |
| bluetooth_socket.MGMT_SETTING_DISCOVERABLE) |
| if self._control.set_discoverable( |
| self.index, |
| profile_settings & |
| bluetooth_socket.MGMT_SETTING_DISCOVERABLE) is None: |
| logging.warning('Failed to set discoverable setting') |
| return False |
| |
| # Now the settings have been set, power up the adapter. |
| if not self._control.set_powered( |
| self.index, |
| profile_settings & bluetooth_socket.MGMT_SETTING_POWERED): |
| logging.warning('Failed to set powered setting') |
| return False |
| |
| # Fast connectable can only be set once the controller is powered, |
| # and only when BR/EDR is enabled. |
| if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR: |
| # Wait for the device class set event, this happens after the |
| # power up "command complete" event when we've pre-set the class |
| # even though it's a side-effect of doing that. |
| self._control.wait_for_events( |
| self.index, |
| ( bluetooth_socket.MGMT_EV_CLASS_OF_DEV_CHANGED, )) |
| |
| if (self._control.set_fast_connectable( |
| self.index, |
| profile_settings & |
| bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE) |
| is None): |
| logging.warning('Failed to set fast connectable setting') |
| return False |
| |
| # Fetch the settings again and make sure they're all set correctly, |
| # including the BR/EDR flag. |
| ( address, bluetooth_version, manufacturer_id, |
| supported_settings, current_settings, class_of_device, |
| name, short_name ) = self._control.read_info(self.index) |
| |
| # Check generic settings. |
| if profile_settings != current_settings: |
| logging.warning('Controller settings did not match those set: ' |
| '%x != %x', current_settings, profile_settings) |
| return False |
| if name != profile_name: |
| logging.warning('Local name did not match that set: "%s" != "%s"', |
| name, profile_name) |
| return False |
| elif short_name != profile_short_name: |
| logging.warning('Short name did not match that set: "%s" != "%s"', |
| short_name, profile_short_name) |
| return False |
| |
| # Check BR/EDR specific settings. |
| if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR: |
| if class_of_device != profile_class: |
| if class_of_device & 0x00ffff == profile_class & 0x00ffff: |
| logging.warning('Class of device matched that set, but ' |
| 'Service Class field did not: %x != %x ' |
| 'Reboot Tester? ', |
| class_of_device, profile_class) |
| else: |
| logging.warning('Class of device did not match that set: ' |
| '%x != %x', class_of_device, profile_class) |
| return False |
| |
| return True |
| |
| |
| def set_discoverable(self, discoverable, timeout=0): |
| """Set the discoverable state of the controller. |
| |
| @param discoverable: Whether controller should be discoverable. |
| @param timeout: Timeout in seconds before disabling discovery again, |
| ignored when discoverable is False, must not be zero when |
| discoverable is True. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| settings = self._control.set_discoverable(self.index, |
| discoverable, timeout) |
| return settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE |
| |
| |
| def read_info(self): |
| """Read the adapter information from the Kernel. |
| |
| @return the information as a JSON-encoded tuple of: |
| ( address, bluetooth_version, manufacturer_id, |
| supported_settings, current_settings, class_of_device, |
| name, short_name ) |
| |
| """ |
| return json.dumps(self._control.read_info(self.index)) |
| |
| |
| def set_advertising(self, advertising): |
| """Set the whether the controller is advertising via LE. |
| |
| @param advertising: Whether controller should advertise via LE. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| settings = self._control.set_advertising(self.index, advertising) |
| return settings & bluetooth_socket.MGMT_SETTING_ADVERTISING |
| |
| |
| def discover_devices(self, br_edr=True, le_public=True, le_random=True): |
| """Discover remote devices. |
| |
| Activates device discovery and collects the set of devices found, |
| returning them as a list. |
| |
| @param br_edr: Whether to detect BR/EDR devices. |
| @param le_public: Whether to detect LE Public Address devices. |
| @param le_random: Whether to detect LE Random Address devices. |
| |
| @return List of devices found as JSON-encoded tuples with the format |
| (address, address_type, rssi, flags, base64-encoded eirdata), |
| or False if discovery could not be started. |
| |
| """ |
| address_type = 0 |
| if br_edr: |
| address_type |= 0x1 |
| if le_public: |
| address_type |= 0x2 |
| if le_random: |
| address_type |= 0x4 |
| |
| set_type = self._control.start_discovery(self.index, address_type) |
| if set_type != address_type: |
| logging.warning('Discovery address type did not match that set: ' |
| '%x != %x', set_type, address_type) |
| return False |
| |
| devices = self._control.get_discovered_devices(self.index) |
| return json.dumps([ |
| (address, address_type, rssi, flags, |
| base64.encodestring(eirdata)) |
| for address, address_type, rssi, flags, eirdata in devices |
| ]) |
| |
| |
| def connect(self, address): |
| """Connect to device with the given address |
| |
| @param address: Bluetooth address. |
| |
| """ |
| self._sdp.connect(address) |
| return True |
| |
| |
| def service_search_request(self, uuids, max_rec_cnt, preferred_size=32, |
| forced_pdu_size=None, invalid_request=False): |
| """Send a Service Search Request |
| |
| @param uuids: List of UUIDs (as integers) to look for. |
| @param max_rec_cnt: Maximum count of returned service records. |
| @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128). |
| @param forced_pdu_size: Use certain PDU size parameter instead of |
| calculating actual length of sequence. |
| @param invalid_request: Whether to send request with intentionally |
| invalid syntax for testing purposes (bool flag). |
| |
| @return list of found services' service record handles or Error Code |
| |
| """ |
| return json.dumps( |
| self._sdp.service_search_request( |
| uuids, max_rec_cnt, preferred_size, forced_pdu_size, |
| invalid_request) |
| ) |
| |
| |
| def service_attribute_request(self, handle, max_attr_byte_count, attr_ids, |
| forced_pdu_size=None, invalid_request=None): |
| """Send a Service Attribute Request |
| |
| @param handle: service record from which attribute values are to be |
| retrieved. |
| @param max_attr_byte_count: maximum number of bytes of attribute data to |
| be returned in the response to this request. |
| @param attr_ids: a list, where each element is either an attribute ID |
| or a range of attribute IDs. |
| @param forced_pdu_size: Use certain PDU size parameter instead of |
| calculating actual length of sequence. |
| @param invalid_request: Whether to send request with intentionally |
| invalid syntax for testing purposes (string with raw request). |
| |
| @return list of found attributes IDs and their values or Error Code |
| |
| """ |
| return json.dumps( |
| self._sdp.service_attribute_request( |
| handle, max_attr_byte_count, attr_ids, forced_pdu_size, |
| invalid_request) |
| ) |
| |
| |
| def service_search_attribute_request(self, uuids, max_attr_byte_count, |
| attr_ids, preferred_size=32, |
| forced_pdu_size=None, |
| invalid_request=None): |
| """Send a Service Search Attribute Request |
| |
| @param uuids: list of UUIDs (as integers) to look for. |
| @param max_attr_byte_count: maximum number of bytes of attribute data to |
| be returned in the response to this request. |
| @param attr_ids: a list, where each element is either an attribute ID |
| or a range of attribute IDs. |
| @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128). |
| @param forced_pdu_size: Use certain PDU size parameter instead of |
| calculating actual length of sequence. |
| @param invalid_request: Whether to send request with intentionally |
| invalid syntax for testing purposes (string to be prepended |
| to correct request). |
| |
| @return list of found attributes IDs and their values or Error Code |
| |
| """ |
| return json.dumps( |
| self._sdp.service_search_attribute_request( |
| uuids, max_attr_byte_count, attr_ids, preferred_size, |
| forced_pdu_size, invalid_request) |
| ) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| handler = logging.handlers.SysLogHandler(address = '/dev/log') |
| formatter = logging.Formatter( |
| 'bluetooth_tester_xmlrpc_server: [%(levelname)s] %(message)s') |
| handler.setFormatter(formatter) |
| logging.getLogger().addHandler(handler) |
| logging.debug('bluetooth_tester_xmlrpc_server main...') |
| server = xmlrpc_server.XmlRpcServer( |
| 'localhost', |
| constants.BLUETOOTH_TESTER_XMLRPC_SERVER_PORT) |
| server.register_delegate(BluetoothTesterXmlRpcDelegate()) |
| server.run() |