| # Lint as: python2, python3 |
| # Copyright 2021 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import subprocess |
| import time |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros.audio import audio_helper |
| from autotest_lib.client.cros.audio import sox_utils |
| |
| |
| class audio_CrasAec(test.test): |
| """Verifies echo cancellation functions well.""" |
| version = 1 |
| |
| INT_SPK_CRAS_NODE_TYPE = 'INTERNAL_SPEAKER' |
| INT_MIC_CRAS_NODE_TYPE = 'INTERNAL_MIC' |
| |
| # (sample rate, channels, rms threshold) |
| # The rms_threshold value is determined by experiments. |
| TEST_DATA = [ |
| (48000, 1, 0.015), |
| (44100, 1, 0.015), |
| (16000, 1, 0.015), |
| (44100, 2, 0.015), |
| (48000, 2, 0.015), |
| (16000, 2, 0.015), |
| ] |
| |
| def play_sound(self): |
| """Plays the given audio content.""" |
| cmd = [ |
| 'cras_test_client', '--playback_file', |
| os.path.join(self.bindir, 'human-voice.raw') |
| ] |
| self._play_sound_proc = subprocess.Popen(cmd) |
| |
| def record_aec(self, rate, channels): |
| """Records the looped audio with AEC processing. """ |
| file_name = os.path.join(self.resultsdir, |
| 'record-%d-ch%d.raw' % (rate, channels)) |
| cmd = [ |
| 'cras_test_client', '--loopback_file', file_name, '--effects', |
| 'aec', '--rate', |
| str(rate), '--post_dsp', '2', '--num_channels', |
| str(channels) |
| ] |
| self._record_aec_proc = subprocess.Popen(cmd) |
| return file_name |
| |
| def aecdump(self, stream_id, rate, channels): |
| """Do the AEC dump parallelly.""" |
| |
| file_name = os.path.join(self.resultsdir, |
| 'aecdump-%d-ch%d.raw' % (rate, channels)) |
| cmd = [ |
| 'cras_test_client', '--aecdump', file_name, '--stream_id', |
| str(stream_id), '--duration', |
| str(10) |
| ] |
| self._dump_aec_proc = subprocess.Popen(cmd) |
| |
| def setup_test_procs(self): |
| """Initializes process variables for this test.""" |
| self._dump_aec_proc = None |
| self._record_aec_proc = None |
| self._play_sound_proc = None |
| |
| def cleanup_test_procs(self): |
| """Cleans up all cras_test_client processes used in test.""" |
| if self._dump_aec_proc: |
| self._dump_aec_proc.kill() |
| if self._record_aec_proc: |
| self._record_aec_proc.kill() |
| if self._play_sound_proc: |
| self._play_sound_proc.kill() |
| |
| def get_aec_stream_id(self): |
| """Gets the first AEC stream id in decimal. """ |
| proc = subprocess.Popen(['cras_test_client', '--dump_a'], |
| stdout=subprocess.PIPE) |
| output, err = proc.communicate() |
| lines = output.decode().split('\n') |
| # Filter through the summary lines by effects 0x0001 to find |
| # the stream id. |
| for line in lines: |
| words = line.split(' ') |
| if words[0] != 'Summary:': |
| continue |
| |
| logging.debug("audio dump summaries: %s", line) |
| if words[8] == '0x0001': |
| return int(words[3], 16) |
| |
| return None |
| |
| def test_sample_rate_and_channels(self, rate, channels): |
| """ |
| Configures CRAS to use aloop as input and output option. |
| Plays the given audio content then record through aloop. |
| Expects the AEC cancels well because the two-way data |
| are the same except scaling and time shift. |
| |
| @param rarte: the sample rate to create capture stream |
| @param channels: the number of channels to create capture stream |
| |
| @returns: the rms value reported by sox util. |
| """ |
| self.setup_test_procs() |
| |
| try: |
| self.play_sound() |
| recorded_file = self.record_aec(rate, channels) |
| |
| # Wait at most 2 seconds for AEC stream to be ready for aecdump. |
| stream_id = utils.poll_for_condition(self.get_aec_stream_id, |
| timeout=2, |
| sleep_interval=0.1) |
| |
| self.aecdump(stream_id, rate, channels) |
| time.sleep(3) |
| except utils.TimeoutError: |
| # Possibly error has occurred in capture proess. |
| audio_helper.dump_audio_diagnostics( |
| os.path.join(self.resultsdir, "audio_diagnostics.txt")) |
| raise error.TestFail("Fail to find aec stream's id") |
| finally: |
| self.cleanup_test_procs() |
| |
| sox_stat = sox_utils.get_stat(recorded_file, |
| channels=channels, |
| rate=rate) |
| return sox_stat.rms |
| |
| def run_once(self): |
| """Entry point of this test.""" |
| rms_results = [] |
| test_pass = True |
| try: |
| for sample_rate, channels, rms_threshold in self.TEST_DATA: |
| rms = self.test_sample_rate_and_channels(sample_rate, channels) |
| if rms > rms_threshold: |
| test_pass = False |
| rms_results.append(rms) |
| finally: |
| logging.debug("rms results: %s", rms_results) |
| |
| if not test_pass: |
| raise error.TestFail("rms too high in at least one case %s" % |
| rms_results) |