| # Lint as: python2, python3 |
| # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module provides cras audio utilities.""" |
| |
| import logging |
| import re |
| import subprocess |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.cros.audio import cmd_utils |
| |
| _CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' |
| |
| |
| class CrasUtilsError(Exception): |
| """Error in CrasUtils.""" |
| pass |
| |
| |
| def playback(blocking=True, stdin=None, *args, **kargs): |
| """A helper function to execute the playback_cmd. |
| |
| @param blocking: Blocks this call until playback finishes. |
| @param stdin: the standard input of playback process |
| @param args: args passed to playback_cmd. |
| @param kargs: kargs passed to playback_cmd. |
| |
| @returns: The process running the playback command. Note that if the |
| blocking parameter is true, this will return a finished process. |
| """ |
| process = cmd_utils.popen(playback_cmd(*args, **kargs), stdin=stdin) |
| if blocking: |
| cmd_utils.wait_and_check_returncode(process) |
| return process |
| |
| |
| def capture(*args, **kargs): |
| """A helper function to execute the capture_cmd. |
| |
| @param args: args passed to capture_cmd. |
| @param kargs: kargs passed to capture_cmd. |
| |
| """ |
| cmd_utils.execute(capture_cmd(*args, **kargs)) |
| |
| |
| def playback_cmd(playback_file, block_size=None, duration=None, |
| pin_device=None, channels=2, rate=48000): |
| """Gets a command to playback a file with given settings. |
| |
| @param playback_file: the name of the file to play. '-' indicates to |
| playback raw audio from the stdin. |
| @param pin_device: the device id to playback on. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to playback. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--playback_file', playback_file] |
| if pin_device is not None: |
| args += ['--pin_device', str(pin_device)] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def capture_cmd(capture_file, block_size=None, duration=10, |
| sample_format='S16_LE', |
| pin_device=None, channels=1, rate=48000): |
| """Gets a command to capture the audio into the file with given settings. |
| |
| @param capture_file: the name of file the audio to be stored in. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to record. If it is None, duration is not set, |
| and command will keep capturing audio until it is |
| terminated. |
| @param sample_format: the sample format; |
| possible choices: 'S16_LE', 'S24_LE', and 'S32_LE' |
| default to S16_LE: signed 16 bits/sample, |
| little endian |
| @param pin_device: the device id to record from. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--capture_file', capture_file] |
| if pin_device is not None: |
| args += ['--pin_device', str(pin_device)] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| args += ['--format', str(sample_format)] |
| return args |
| |
| |
| def listen_cmd( |
| capture_file, block_size=None, duration=10, channels=1, rate=48000): |
| """Gets a command to listen on hotword and record audio into the file with |
| given settings. |
| |
| @param capture_file: the name of file the audio to be stored in. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to record. If it is None, duration is not set, |
| and command will keep capturing audio until it is |
| terminated. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--listen_for_hotword', capture_file] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def loopback(*args, **kargs): |
| """A helper function to execute loopback_cmd. |
| |
| @param args: args passed to loopback_cmd. |
| @param kargs: kargs passed to loopback_cmd. |
| |
| """ |
| |
| cmd_utils.execute(loopback_cmd(*args, **kargs)) |
| |
| |
| def loopback_cmd(output_file, duration=10, channels=2, rate=48000): |
| """Gets a command to record the loopback. |
| |
| @param output_file: The name of the file the loopback to be stored in. |
| @param channels: The number of channels of the recorded audio. |
| @param duration: seconds to record. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--loopback_file', output_file] |
| args += ['--duration_seconds', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def get_cras_nodes_cmd(): |
| """Gets a command to query the nodes from Cras. |
| |
| @returns: The command to query nodes information from Cras using dbus-send. |
| |
| """ |
| return ('dbus-send --system --type=method_call --print-reply ' |
| '--dest=org.chromium.cras /org/chromium/cras ' |
| 'org.chromium.cras.Control.GetNodes') |
| |
| |
| def set_system_volume(volume): |
| """Set the system volume. |
| |
| @param volume: the system output vlume to be set(0 - 100). |
| |
| """ |
| get_cras_control_interface().SetOutputVolume(volume) |
| |
| |
| def set_node_volume(node_id, volume): |
| """Set the volume of the given output node. |
| |
| @param node_id: the id of the output node to be set the volume. |
| @param volume: the volume to be set(0-100). |
| |
| """ |
| get_cras_control_interface().SetOutputNodeVolume(node_id, volume) |
| |
| |
| def get_cras_control_interface(private=False): |
| """Gets Cras DBus control interface. |
| |
| @param private: Set to True to use a new instance for dbus.SystemBus |
| instead of the shared instance. |
| |
| @returns: A dBus.Interface object with Cras Control interface. |
| |
| @raises: ImportError if this is not called on Cros device. |
| |
| """ |
| try: |
| import dbus |
| except ImportError as e: |
| logging.exception( |
| 'Can not import dbus: %s. This method should only be ' |
| 'called on Cros device.', e) |
| raise |
| bus = dbus.SystemBus(private=private) |
| cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') |
| return dbus.Interface(cras_object, 'org.chromium.cras.Control') |
| |
| |
| def get_cras_nodes(): |
| """Gets nodes information from Cras. |
| |
| @returns: A dict containing information of each node. |
| |
| """ |
| return get_cras_control_interface().GetNodes() |
| |
| |
| def get_selected_nodes(): |
| """Gets selected output nodes and input nodes. |
| |
| @returns: A tuple (output_nodes, input_nodes) where each |
| field is a list of selected node IDs returned from Cras DBus API. |
| Note that there may be multiple output/input nodes being selected |
| at the same time. |
| |
| """ |
| output_nodes = [] |
| input_nodes = [] |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active']: |
| if node['IsInput']: |
| input_nodes.append(node['Id']) |
| else: |
| output_nodes.append(node['Id']) |
| return (output_nodes, input_nodes) |
| |
| |
| def set_selected_output_node_volume(volume): |
| """Sets the selected output node volume. |
| |
| @param volume: the volume to be set (0-100). |
| |
| """ |
| selected_output_node_ids, _ = get_selected_nodes() |
| for node_id in selected_output_node_ids: |
| set_node_volume(node_id, volume) |
| |
| |
| def get_active_stream_count(): |
| """Gets the number of active streams. |
| |
| @returns: The number of active streams. |
| |
| """ |
| return int(get_cras_control_interface().GetNumberOfActiveStreams()) |
| |
| |
| def set_system_mute(is_mute): |
| """Sets the system mute switch. |
| |
| @param is_mute: Set True to mute the system playback. |
| |
| """ |
| get_cras_control_interface().SetOutputMute(is_mute) |
| |
| |
| def set_capture_mute(is_mute): |
| """Sets the capture mute switch. |
| |
| @param is_mute: Set True to mute the capture. |
| |
| """ |
| get_cras_control_interface().SetInputMute(is_mute) |
| |
| |
| def node_type_is_plugged(node_type, nodes_info): |
| """Determine if there is any node of node_type plugged. |
| |
| This method is used in the AudioLoopbackDongleLabel class, where the |
| call is executed on autotest server. Use get_cras_nodes instead if |
| the call can be executed on Cros device. |
| |
| Since Cras only reports the plugged node in GetNodes, we can |
| parse the return value to see if there is any node with the given type. |
| For example, if INTERNAL_MIC is of intereset, the pattern we are |
| looking for is: |
| |
| dict entry( |
| string "Type" |
| variant string "INTERNAL_MIC" |
| ) |
| |
| @param node_type: A str representing node type defined in CRAS_NODE_TYPES. |
| @param nodes_info: A str containing output of command get_nodes_cmd. |
| |
| @returns: True if there is any node of node_type plugged. False otherwise. |
| |
| """ |
| match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, |
| nodes_info) |
| return True if match else False |
| |
| |
| # Cras node types reported from Cras DBus control API. |
| CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', |
| 'BLUETOOTH', 'LINEOUT', 'UNKNOWN', 'ALSA_LOOPBACK'] |
| CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', |
| 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', |
| 'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC', |
| 'ECHO_REFERENCE'] |
| CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES |
| |
| |
| def get_filtered_node_types(callback): |
| """Returns the pair of filtered output node types and input node types. |
| |
| @param callback: A callback function which takes a node as input parameter |
| and filter the node based on its return value. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of node types defined in CRAS_NODE_TYPES, |
| and their 'attribute_name' is True. |
| |
| """ |
| output_node_types = [] |
| input_node_types = [] |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if callback(node): |
| node_type = str(node['Type']) |
| if node_type not in CRAS_NODE_TYPES: |
| logging.warning('node type %s is not in known CRAS_NODE_TYPES', |
| node_type) |
| if node['IsInput']: |
| input_node_types.append(node_type) |
| else: |
| output_node_types.append(node_type) |
| return (output_node_types, input_node_types) |
| |
| |
| def get_selected_node_types(): |
| """Returns the pair of active output node types and input node types. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of selected node types defined in CRAS_NODE_TYPES. |
| |
| """ |
| def is_selected(node): |
| """Checks if a node is selected. |
| |
| A node is selected if its Active attribute is True. |
| |
| @returns: True is a node is selected, False otherwise. |
| |
| """ |
| return node['Active'] |
| |
| return get_filtered_node_types(is_selected) |
| |
| |
| def get_selected_input_device_name(): |
| """Returns the device name of the active input node. |
| |
| @returns: device name string. E.g. kbl_r5514_5663_max: :0,1 |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and node['IsInput']: |
| return node['DeviceName'] |
| return None |
| |
| |
| def get_selected_input_device_type(): |
| """Returns the device type of the active input node. |
| |
| @returns: device type string. E.g. INTERNAL_MICROPHONE |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and node['IsInput']: |
| return node['Type'] |
| return None |
| |
| |
| def get_selected_output_device_name(): |
| """Returns the device name of the active output node. |
| |
| @returns: device name string. E.g. mtk-rt5650: :0,0 |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and not node['IsInput']: |
| return node['DeviceName'] |
| return None |
| |
| |
| def get_selected_output_device_type(): |
| """Returns the device type of the active output node. |
| |
| @returns: device type string. E.g. INTERNAL_SPEAKER |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and not node['IsInput']: |
| return node['Type'] |
| return None |
| |
| |
| def get_plugged_node_types(): |
| """Returns the pair of plugged output node types and input node types. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of plugged node types defined in CRAS_NODE_TYPES. |
| |
| """ |
| def is_plugged(node): |
| """Checks if a node is plugged and is not unknown node. |
| |
| Cras DBus API only reports plugged node, so every node reported by Cras |
| DBus API is plugged. However, we filter out UNKNOWN node here because |
| the existence of unknown node depends on the number of redundant |
| playback/record audio device created on audio card. Also, the user of |
| Cras will ignore unknown nodes. |
| |
| @returns: True if a node is plugged and is not an UNKNOWN node. |
| |
| """ |
| return node['Type'] != 'UNKNOWN' |
| |
| return get_filtered_node_types(is_plugged) |
| |
| |
| def set_selected_node_types(output_node_types, input_node_types): |
| """Sets selected node types. |
| |
| @param output_node_types: A list of output node types. None to skip setting. |
| @param input_node_types: A list of input node types. None to skip setting. |
| |
| """ |
| if output_node_types is not None and len(output_node_types) == 1: |
| set_single_selected_output_node(output_node_types[0]) |
| elif output_node_types: |
| set_selected_output_nodes(output_node_types) |
| if input_node_types is not None and len(input_node_types) == 1: |
| set_single_selected_input_node(input_node_types[0]) |
| elif input_node_types: |
| set_selected_input_nodes(input_node_types) |
| |
| |
| def set_single_selected_output_node(node_type): |
| """Sets one selected output node. |
| |
| Note that Chrome UI uses SetActiveOutputNode of Cras DBus API |
| to select one output node. |
| |
| @param node_type: A node type. |
| |
| @returns: True if the output node type is found and set active. |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['IsInput']: |
| continue |
| if node['Type'] == node_type: |
| set_active_output_node(node['Id']) |
| return True |
| return False |
| |
| |
| def set_single_selected_input_node(node_type): |
| """Sets one selected input node. |
| |
| Note that Chrome UI uses SetActiveInputNode of Cras DBus API |
| to select one input node. |
| |
| @param node_type: A node type. |
| |
| @returns: True if the input node type is found and set active. |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if not node['IsInput']: |
| continue |
| if node['Type'] == node_type: |
| set_active_input_node(node['Id']) |
| return True |
| return False |
| |
| |
| def set_selected_output_nodes(types): |
| """Sets selected output node types. |
| |
| Note that Chrome UI uses SetActiveOutputNode of Cras DBus API |
| to select one output node. Here we use add/remove active output node |
| to support multiple nodes. |
| |
| @param types: A list of output node types. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['IsInput']: |
| continue |
| if node['Type'] in types: |
| add_active_output_node(node['Id']) |
| elif node['Active']: |
| remove_active_output_node(node['Id']) |
| |
| |
| def set_selected_input_nodes(types): |
| """Sets selected input node types. |
| |
| Note that Chrome UI uses SetActiveInputNode of Cras DBus API |
| to select one input node. Here we use add/remove active input node |
| to support multiple nodes. |
| |
| @param types: A list of input node types. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if not node['IsInput']: |
| continue |
| if node['Type'] in types: |
| add_active_input_node(node['Id']) |
| elif node['Active']: |
| remove_active_input_node(node['Id']) |
| |
| |
| def set_active_input_node(node_id): |
| """Sets one active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().SetActiveInputNode(node_id) |
| |
| |
| def set_active_output_node(node_id): |
| """Sets one active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().SetActiveOutputNode(node_id) |
| |
| |
| def add_active_output_node(node_id): |
| """Adds an active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().AddActiveOutputNode(node_id) |
| |
| |
| def add_active_input_node(node_id): |
| """Adds an active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().AddActiveInputNode(node_id) |
| |
| |
| def remove_active_output_node(node_id): |
| """Removes an active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().RemoveActiveOutputNode(node_id) |
| |
| |
| def remove_active_input_node(node_id): |
| """Removes an active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().RemoveActiveInputNode(node_id) |
| |
| |
| def get_node_id_from_node_type(node_type, is_input): |
| """Gets node id from node type. |
| |
| @param types: A node type defined in CRAS_NODE_TYPES. |
| @param is_input: True if the node is input. False otherwise. |
| |
| @returns: A string for node id. |
| |
| @raises: CrasUtilsError: if unique node id can not be found. |
| |
| """ |
| nodes = get_cras_nodes() |
| find_ids = [] |
| for node in nodes: |
| if node['Type'] == node_type and node['IsInput'] == is_input: |
| find_ids.append(node['Id']) |
| if len(find_ids) != 1: |
| raise CrasUtilsError( |
| 'Can not find unique node id from node type %s' % node_type) |
| return find_ids[0] |
| |
| |
| def get_device_id_of(node_id): |
| """Gets the device id of the node id. |
| |
| The conversion logic is replicated from the CRAS's type definition at |
| third_party/adhd/cras/src/common/cras_types.h. |
| |
| @param node_id: A string for node id. |
| |
| @returns: A string for device id. |
| |
| @raise: CrasUtilsError: if device id is invalid. |
| """ |
| device_id = str(int(node_id) >> 32) |
| if device_id == "0": |
| raise CrasUtilsError('Got invalid device_id: 0') |
| return device_id |
| |
| |
| def get_device_id_from_node_type(node_type, is_input): |
| """Gets device id from node type. |
| |
| @param types: A node type defined in CRAS_NODE_TYPES. |
| @param is_input: True if the node is input. False otherwise. |
| |
| @returns: A string for device id. |
| |
| """ |
| node_id = get_node_id_from_node_type(node_type, is_input) |
| return get_device_id_of(node_id) |
| |
| |
| def get_active_node_volume(): |
| """Returns volume from active node. |
| |
| @returns: int for volume |
| |
| @raises: CrasUtilsError: if node volume cannot be found. |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] == 1 and node['IsInput'] == 0: |
| return int(node['NodeVolume']) |
| raise CrasUtilsError('Cannot find active node volume from nodes.') |
| |
| |
| def get_active_output_node_max_supported_channels(): |
| """Returns max supported channels from active output node. |
| |
| @returns: int for max supported channels. |
| |
| @raises: CrasUtilsError: if node cannot be found. |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] == 1 and node['IsInput'] == 0: |
| return int(node['MaxSupportedChannels']) |
| raise CrasUtilsError('Cannot find active output node.') |
| |
| |
| class CrasTestClient(object): |
| """An object to perform cras_test_client functions.""" |
| |
| BLOCK_SIZE = None |
| PIN_DEVICE = None |
| SAMPLE_FORMAT = 'S16_LE' |
| DURATION = 10 |
| CHANNELS = 2 |
| RATE = 48000 |
| |
| |
| def __init__(self): |
| self._proc = None |
| self._capturing_proc = None |
| self._playing_proc = None |
| self._capturing_msg = 'capturing audio file' |
| self._playing_msg = 'playing audio file' |
| self._wbs_cmd = '%s --set_wbs_enabled ' % _CRAS_TEST_CLIENT |
| self._enable_wbs_cmd = ('%s 1' % self._wbs_cmd).split() |
| self._disable_wbs_cmd = ('%s 0' % self._wbs_cmd).split() |
| self._info_cmd = [_CRAS_TEST_CLIENT,] |
| self._select_input_cmd = '%s --select_input ' % _CRAS_TEST_CLIENT |
| |
| |
| def start_subprocess(self, proc, proc_cmd, filename, proc_msg): |
| """Start a capture or play subprocess |
| |
| @param proc: the process |
| @param proc_cmd: the process command and its arguments |
| @param filename: the file name to capture or play |
| @param proc_msg: the message to display in logging |
| |
| @returns: True if the process is started successfully |
| """ |
| if proc is None: |
| try: |
| self._proc = subprocess.Popen(proc_cmd) |
| logging.debug('Start %s %s on the DUT', proc_msg, filename) |
| except Exception as e: |
| logging.error('Failed to popen: %s (%s)', proc_msg, e) |
| return False |
| else: |
| logging.error('cannot run the command twice: %s', proc_msg) |
| return False |
| return True |
| |
| |
| def stop_subprocess(self, proc, proc_msg): |
| """Stop a subprocess |
| |
| @param proc: the process to stop |
| @param proc_msg: the message to display in logging |
| |
| @returns: True if the process is stopped successfully |
| """ |
| if proc is None: |
| logging.error('cannot run stop %s before starting it.', proc_msg) |
| return False |
| |
| proc.terminate() |
| try: |
| utils.poll_for_condition( |
| condition=lambda: proc.poll() is not None, |
| exception=CrasUtilsError, |
| timeout=10, |
| sleep_interval=0.5, |
| desc='Waiting for subprocess to terminate') |
| except Exception: |
| logging.warn('Killing subprocess due to timeout') |
| proc.kill() |
| proc.wait() |
| |
| logging.debug('stop %s on the DUT', proc_msg) |
| return True |
| |
| |
| def start_capturing_subprocess(self, capture_file, block_size=BLOCK_SIZE, |
| duration=DURATION, pin_device=PIN_DEVICE, |
| sample_format=SAMPLE_FORMAT, |
| channels=CHANNELS, rate=RATE): |
| """Start capturing in a subprocess. |
| |
| @param capture_file: the name of file the audio to be stored in |
| @param block_size: the number of frames per callback(dictates latency) |
| @param duration: seconds to record. If it is None, duration is not set, |
| and will keep capturing audio until terminated |
| @param sample_format: the sample format |
| @param pin_device: the device id to record from |
| @param channels: number of channels |
| @param rate: the sampling rate |
| |
| @returns: True if the process is started successfully |
| """ |
| proc_cmd = capture_cmd(capture_file, block_size=block_size, |
| duration=duration, sample_format=sample_format, |
| pin_device=pin_device, channels=channels, |
| rate=rate) |
| result = self.start_subprocess(self._capturing_proc, proc_cmd, |
| capture_file, self._capturing_msg) |
| if result: |
| self._capturing_proc = self._proc |
| return result |
| |
| |
| def stop_capturing_subprocess(self): |
| """Stop the capturing subprocess.""" |
| result = self.stop_subprocess(self._capturing_proc, self._capturing_msg) |
| if result: |
| self._capturing_proc = None |
| return result |
| |
| |
| def start_playing_subprocess(self, audio_file, block_size=BLOCK_SIZE, |
| duration=DURATION, pin_device=PIN_DEVICE, |
| channels=CHANNELS, rate=RATE): |
| """Start playing the audio file in a subprocess. |
| |
| @param audio_file: the name of audio file to play |
| @param block_size: the number of frames per callback(dictates latency) |
| @param duration: seconds to play. If it is None, duration is not set, |
| and will keep playing audio until terminated |
| @param pin_device: the device id to play to |
| @param channels: number of channels |
| @param rate: the sampling rate |
| |
| @returns: True if the process is started successfully |
| """ |
| proc_cmd = playback_cmd(audio_file, block_size, duration, pin_device, |
| channels, rate) |
| result = self.start_subprocess(self._playing_proc, proc_cmd, |
| audio_file, self._playing_msg) |
| if result: |
| self._playing_proc = self._proc |
| return result |
| |
| |
| def stop_playing_subprocess(self): |
| """Stop the playing subprocess.""" |
| result = self.stop_subprocess(self._playing_proc, self._playing_msg) |
| if result: |
| self._playing_proc = None |
| return result |
| |
| |
| def play(self, audio_file, block_size=BLOCK_SIZE, duration=DURATION, |
| pin_device=PIN_DEVICE, channels=CHANNELS, rate=RATE): |
| """Play the audio file. |
| |
| This method will get blocked until it has completed playing back. |
| If you do not want to get blocked, use start_playing_subprocess() |
| above instead. |
| |
| @param audio_file: the name of audio file to play |
| @param block_size: the number of frames per callback(dictates latency) |
| @param duration: seconds to play. If it is None, duration is not set, |
| and will keep playing audio until terminated |
| @param pin_device: the device id to play to |
| @param channels: number of channels |
| @param rate: the sampling rate |
| |
| @returns: True if the process is started successfully |
| """ |
| proc_cmd = playback_cmd(audio_file, block_size, duration, pin_device, |
| channels, rate) |
| try: |
| self._proc = subprocess.call(proc_cmd) |
| logging.debug('call "%s" on the DUT', proc_cmd) |
| except Exception as e: |
| logging.error('Failed to call: %s (%s)', proc_cmd, e) |
| return False |
| return True |
| |
| |
| def enable_wbs(self, value): |
| """Enable or disable wideband speech (wbs) per the value. |
| |
| @param value: True to enable wbs. |
| |
| @returns: True if the operation succeeds. |
| """ |
| cmd = self._enable_wbs_cmd if value else self._disable_wbs_cmd |
| logging.debug('call "%s" on the DUT', cmd) |
| if subprocess.call(cmd): |
| logging.error('Failed to call: %s (%s)', cmd) |
| return False |
| return True |
| |
| |
| def select_input_device(self, device_name): |
| """Select the audio input device. |
| |
| @param device_name: the name of the Bluetooth peer device |
| |
| @returns: True if the operation succeeds. |
| """ |
| logging.debug('to select input device for device_name: %s', device_name) |
| try: |
| info = subprocess.check_output(self._info_cmd) |
| logging.debug('info: %s', info) |
| except Exception as e: |
| logging.error('Failed to call: %s (%s)', self._info_cmd, e) |
| return False |
| |
| flag_input_nodes = False |
| audio_input_node = None |
| for line in info.decode().splitlines(): |
| if 'Input Nodes' in line: |
| flag_input_nodes = True |
| elif 'Attached clients' in line: |
| flag_input_nodes = False |
| |
| if flag_input_nodes: |
| if device_name in line: |
| audio_input_node = line.split()[1] |
| logging.debug('%s', audio_input_node) |
| break |
| |
| if audio_input_node is None: |
| logging.error('Failed to find audio input node: %s', device_name) |
| return False |
| |
| select_input_cmd = (self._select_input_cmd + audio_input_node).split() |
| if subprocess.call(select_input_cmd): |
| logging.error('Failed to call: %s (%s)', select_input_cmd, e) |
| return False |
| |
| logging.debug('call "%s" on the DUT', select_input_cmd) |
| return True |
| |
| |
| def set_player_playback_status(self, status): |
| """Set playback status for the registered media player. |
| |
| @param status: playback status in string. |
| |
| """ |
| try: |
| get_cras_control_interface().SetPlayerPlaybackStatus(status) |
| except Exception as e: |
| logging.error('Failed to set player playback status: %s', e) |
| return False |
| |
| return True |
| |
| |
| def set_player_position(self, position): |
| """Set media position for the registered media player. |
| |
| @param position: position in micro seconds. |
| |
| """ |
| try: |
| get_cras_control_interface().SetPlayerPosition(position) |
| except Exception as e: |
| logging.error('Failed to set player position: %s', e) |
| return False |
| |
| return True |
| |
| |
| def set_player_metadata(self, metadata): |
| """Set title, artist, and album for the registered media player. |
| |
| @param metadata: dictionary of media metadata. |
| |
| """ |
| try: |
| get_cras_control_interface().SetPlayerMetadata(metadata) |
| except Exception as e: |
| logging.error('Failed to set player metadata: %s', e) |
| return False |
| |
| return True |
| |
| |
| def set_player_length(self, length): |
| """Set metadata length for the registered media player. |
| |
| Media length is a part of metadata information. However, without |
| specify its type to int64. dbus-python will guess the variant type to |
| be int32 by default. Separate it from the metadata function to help |
| prepare the data differently. |
| |
| @param metadata: DBUS dictionary that contains a variant of int64. |
| |
| """ |
| try: |
| get_cras_control_interface().SetPlayerMetadata(length) |
| except Exception as e: |
| logging.error('Failed to set player length: %s', e) |
| return False |
| |
| return True |