| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # pylint: disable-msg=C0111 |
| |
| """Unit tests for server/cros/dynamic_suite/job_status.py.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| import shutil |
| from six.moves import map |
| from six.moves import range |
| import tempfile |
| import unittest |
| from unittest import mock |
| from unittest.mock import patch |
| |
| import common |
| |
| from autotest_lib.server import frontend |
| from autotest_lib.server.cros.dynamic_suite import job_status |
| from autotest_lib.server.cros.dynamic_suite.fakes import FakeJob |
| from autotest_lib.server.cros.dynamic_suite.fakes import FakeStatus |
| |
| |
| DEFAULT_WAITTIMEOUT_MINS = 60 * 4 |
| |
| |
| class StatusTest(unittest.TestCase): |
| """Unit tests for job_status.Status. |
| """ |
| |
| |
| def setUp(self): |
| super(StatusTest, self).setUp() |
| afe_patcher = patch.object(frontend, 'AFE') |
| self.afe = afe_patcher.start() |
| self.addCleanup(afe_patcher.stop) |
| tko_patcher = patch.object(frontend, 'TKO') |
| self.tko = tko_patcher.start() |
| self.addCleanup(tko_patcher.stop) |
| self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__) |
| # These are called a few times, so we need to return via side_effect. |
| # for some reason side_effect doesn't like appending, so just keeping |
| # a list to then be added at once. |
| self.tko.get_job_test_statuses_from_db.side_effect = [] |
| self.afe.run.side_effect = [] |
| self.run_list = [] |
| self.run_call_list = [] |
| self.job_statuses = [] |
| self.job_statuses_call_list = [] |
| |
| def tearDown(self): |
| super(StatusTest, self).tearDown() |
| shutil.rmtree(self.tmpdir, ignore_errors=True) |
| |
| def expect_yield_job_entries(self, job): |
| entries = [s.entry for s in job.statuses] |
| self.run_list.append(entries) |
| self.run_call_list.append( |
| mock.call('get_host_queue_entries', job=job.id)) |
| |
| if True not in ['aborted' in e and e['aborted'] for e in entries]: |
| self.job_statuses.append(job.statuses) |
| self.job_statuses_call_list.append(mock.call(job.id)) |
| |
| @patch('autotest_lib.server.cros.dynamic_suite.job_status.JobResultWaiter._sleep' |
| ) |
| def testJobResultWaiter(self, mock_sleep): |
| """Should gather status and return records for job summaries.""" |
| jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''), |
| FakeStatus('GOOD', 'T1', '')]), |
| FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False), |
| FakeStatus('GOOD', 'T1', '')]), |
| FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')]), |
| FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')]), |
| FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'), |
| FakeStatus('GOOD', 'T0', '')]),] |
| # TODO: Write a better test for the case where we yield |
| # results for aborts vs cannot yield results because of |
| # a premature abort. Currently almost all client aborts |
| # have been converted to failures, and when aborts do happen |
| # they result in server job failures for which we always |
| # want results. |
| # FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)]), |
| # The next job shouldn't be recorded in the results. |
| # FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')])] |
| for status in jobs[4].statuses: |
| status.entry['job'] = {'name': 'broken_infra_job'} |
| |
| job_id_set = set([job.id for job in jobs]) |
| yield_values = [ |
| [jobs[1]], |
| [jobs[0], jobs[2]], |
| jobs[3:6] |
| ] |
| |
| yield_list = [] |
| called_list = [] |
| |
| for yield_this in yield_values: |
| yield_list.append(yield_this) |
| |
| # Expected list of calls... |
| called_list.append( |
| mock.call(id__in=list(job_id_set), finished=True)) |
| for job in yield_this: |
| self.expect_yield_job_entries(job) |
| job_id_set.remove(job.id) |
| self.afe.get_jobs.side_effect = yield_list |
| self.afe.run.side_effect = self.run_list |
| self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses |
| |
| waiter = job_status.JobResultWaiter(self.afe, self.tko) |
| waiter.add_jobs(jobs) |
| results = [result for result in waiter.wait_for_results()] |
| for job in jobs[:6]: # the 'GOOD' SERVER_JOB shouldn't be there. |
| for status in job.statuses: |
| self.assertTrue(True in list(map(status.equals_record, results))) |
| |
| self.afe.get_jobs.assert_has_calls(called_list) |
| self.afe.run.assert_has_calls(self.run_call_list) |
| self.tko.get_job_test_statuses_from_db.assert_has_calls( |
| self.job_statuses_call_list) |
| |
| def testYieldSubdir(self): |
| """Make sure subdir are properly set for test and non-test status.""" |
| job_tag = '0-owner/172.33.44.55' |
| job_name = 'broken_infra_job' |
| job = FakeJob(0, [FakeStatus('ERROR', 'SERVER_JOB', 'server error', |
| subdir='---', job_tag=job_tag), |
| FakeStatus('GOOD', 'T0', '', |
| subdir='T0.subdir', job_tag=job_tag)], |
| parent_job_id=54321) |
| for status in job.statuses: |
| status.entry['job'] = {'name': job_name} |
| |
| self.expect_yield_job_entries(job) |
| self.afe.run.side_effect = self.run_list |
| self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses |
| results = list(job_status._yield_job_results(self.afe, self.tko, job)) |
| for i in range(len(results)): |
| result = results[i] |
| if result.test_name.endswith('SERVER_JOB'): |
| expected_name = '%s_%s' % (job_name, job.statuses[i].test_name) |
| expected_subdir = job_tag |
| else: |
| expected_name = job.statuses[i].test_name |
| expected_subdir = os.path.join(job_tag, job.statuses[i].subdir) |
| self.assertEqual(results[i].test_name, expected_name) |
| self.assertEqual(results[i].subdir, expected_subdir) |
| |
| self.afe.run.assert_has_calls(self.run_call_list) |
| self.tko.get_job_test_statuses_from_db.assert_has_calls( |
| self.job_statuses_call_list) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |