| # Copyright 2013 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Pipeline Process unittest. |
| |
| Part of the Chrome build flags optimization. |
| """ |
| |
| __author__ = "[email protected] (Yuheng Long)" |
| |
| import multiprocessing |
| import unittest |
| |
| from mock_task import MockTask |
| import pipeline_process |
| |
| |
| # Pick an integer at random. |
| ERROR = -334 |
| # Pick an integer at random. |
| TEST_STAGE = -8 |
| |
| |
| def MockHelper(stage, done_dict, helper_queue, _, result_queue): |
| """This method echos input to the output.""" |
| |
| assert stage == TEST_STAGE |
| while True: |
| if not helper_queue.empty(): |
| task = helper_queue.get() |
| if task == pipeline_process.POISONPILL: |
| # Poison pill means shutdown |
| break |
| |
| if task in done_dict: |
| # verify that it does not get duplicate "1"s in the test. |
| result_queue.put(ERROR) |
| else: |
| result_queue.put(("helper", task.GetIdentifier(TEST_STAGE))) |
| |
| |
| def MockWorker(stage, task, _, result_queue): |
| assert stage == TEST_STAGE |
| result_queue.put(("worker", task.GetIdentifier(TEST_STAGE))) |
| |
| |
| class PipelineProcessTest(unittest.TestCase): |
| """This class test the PipelineProcess. |
| |
| All the task inserted into the input queue should be taken out and hand to the |
| actual pipeline handler, except for the POISON_PILL. All these task should |
| also be passed to the next pipeline stage via the output queue. |
| """ |
| |
| def testRun(self): |
| """Test the run method. |
| |
| Ensure that all the tasks inserted into the queue are properly handled. |
| """ |
| |
| manager = multiprocessing.Manager() |
| inp = manager.Queue() |
| output = manager.Queue() |
| |
| process = pipeline_process.PipelineProcess( |
| 2, "testing", {}, TEST_STAGE, inp, MockHelper, MockWorker, output |
| ) |
| |
| process.start() |
| inp.put(MockTask(TEST_STAGE, 1)) |
| inp.put(MockTask(TEST_STAGE, 1)) |
| inp.put(MockTask(TEST_STAGE, 2)) |
| inp.put(pipeline_process.POISONPILL) |
| process.join() |
| |
| # All tasks are processed once and only once. |
| result = [ |
| ("worker", 1), |
| ("helper", 1), |
| ("worker", 2), |
| pipeline_process.POISONPILL, |
| ] |
| while result: |
| task = output.get() |
| |
| # One "1"s is passed to the worker and one to the helper. |
| self.assertNotEqual(task, ERROR) |
| |
| # The messages received should be exactly the same as the result. |
| self.assertTrue(task in result) |
| result.remove(task) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |