[autotest] test_droid V1.

This CL introduces test_droid, a script to launch autotest
tests against local Android DUTs connected to the host
computer via USB.

Much of the logic is similar to test_that, so this has all
been pulled into a new script test_utils.py

Pending work:

* Moving of generate_test_report to somewhere it is accesible
  outside of the chroot.
* Add in support for remote adb hosts (i.e. the DUT is
  connected to a remote computer).
* Sequences support.

BUG=b:25748759
TEST=./test_droid.py 7d52318 brillo_WhitelistedGtests
CQ-DEPEND=CL:315230

Change-Id: Idab3805f7921173fbc706354b322f37244240fa2
Reviewed-on: https://chromium-review.googlesource.com/314870
Commit-Ready: Simran Basi <[email protected]>
Tested-by: Simran Basi <[email protected]>
Reviewed-by: Aviv Keshet <[email protected]>
diff --git a/server/autoserv_utils.py b/server/autoserv_utils.py
index 3a11cca..679418c 100644
--- a/server/autoserv_utils.py
+++ b/server/autoserv_utils.py
@@ -29,7 +29,8 @@
                              no_console_prefix=False,
                              ssh_options=None,
                              use_packaging=True,
-                             in_lab=False):
+                             in_lab=False,
+                             host_attributes=None):
     """
     Construct an autoserv command from a job or host queue entry.
 
@@ -61,6 +62,7 @@
                    environment. This information is useful as autoserv knows
                    the database is available and can make database calls such
                    as looking up host attributes at runtime.
+    @param host_attributes: Dict of host attributes to pass into autoserv.
 
     @returns The autoserv command line as a list of executable + parameters.
 
@@ -109,6 +111,9 @@
             elif control_type_value == control_data.CONTROL_TYPE.SERVER:
                 command.append('-s')
 
+    if host_attributes:
+        command += ['--host_attributes', repr(host_attributes)]
+
     if verbose:
         command.append('--verbose')
 
diff --git a/server/constants.py b/server/constants.py
index 2840370..18319c1 100644
--- a/server/constants.py
+++ b/server/constants.py
@@ -8,4 +8,4 @@
 CRASHLOGS_MARKER = '.crashjob'
 
 # Flag file to indicate the host is an adb tester.
-ANDROID_TESTER_FILEFLAG = '/mnt/stateful_partition/.android_tester'
+ANDROID_TESTER_FILEFLAG = '/mnt/stateful_partition/.android_tester'
\ No newline at end of file
diff --git a/server/hosts/adb_host.py b/server/hosts/adb_host.py
index cce602b..238cbdd 100644
--- a/server/hosts/adb_host.py
+++ b/server/hosts/adb_host.py
@@ -115,6 +115,10 @@
         """
         Check if the given host is an adb host.
 
+        If SSH connectivity can't be established, check_host will try to use
+        user 'adb' as well. If SSH connectivity still can't be established
+        then the original SSH user is restored.
+
         @param host: An ssh host representing a device.
         @param timeout: The timeout for the run command.
 
@@ -124,13 +128,16 @@
         @raises AutoservRunError: If the command failed.
         @raises AutoservSSHTimeout: Ssh connection has timed out.
         """
+        ssh_user = host.user
         try:
-            if not host.verify_ssh_user_access():
+            if not (host.hostname == 'localhost' or
+                    host.verify_ssh_user_access()):
                 host.user = 'adb'
             result = host.run(
                     'test -f %s' % server_constants.ANDROID_TESTER_FILEFLAG,
                     timeout=timeout)
         except (error.AutoservRunError, error.AutoservSSHTimeout):
+            host.user = ssh_user
             return False
         return result.exit_status == 0
 
diff --git a/server/hosts/factory.py b/server/hosts/factory.py
index 9e87012..2382df9 100644
--- a/server/hosts/factory.py
+++ b/server/hosts/factory.py
@@ -3,6 +3,7 @@
 import logging
 from contextlib import closing
 
+from autotest_lib.client.bin import local_host
 from autotest_lib.client.common_lib import error, global_config
 from autotest_lib.server import utils as server_utils
 from autotest_lib.server.hosts import cros_host, ssh_host
@@ -29,6 +30,8 @@
 # overhead in checking for less common host types.
 host_types = [cros_host.CrosHost, moblab_host.MoblabHost, sonic_host.SonicHost,
               adb_host.ADBHost,]
+OS_HOST_DICT = {'cros' : cros_host.CrosHost,
+                'android': adb_host.ADBHost}
 
 
 def _get_host_arguments():
@@ -113,8 +116,10 @@
     args['ssh_verbosity_flag'] = ssh_verbosity_flag
     args['ssh_options'] = ssh_options
 
+    if hostname == 'localhost':
+        connectivity_class = local_host.LocalHost
     # by default assume we're using SSH support
-    if SSH_ENGINE == 'paramiko':
+    elif SSH_ENGINE == 'paramiko':
         from autotest_lib.server.hosts import paramiko_host
         connectivity_class = paramiko_host.ParamikoHost
     elif SSH_ENGINE == 'raw_ssh':
@@ -124,12 +129,13 @@
                                   "value of the configuration key 'ssh_engine' "
                                   "on autotest's global_config.ini file." %
                                   SSH_ENGINE)
-
-    if not host_class:
+    host_attributes = args.get('host_attributes', {})
+    host_class = host_class or OS_HOST_DICT.get(host_attributes.get('os_type'))
+    if host_class:
+        classes = [host_class, connectivity_class]
+    else:
         classes = [_detect_host(connectivity_class, hostname, **args),
                    connectivity_class]
-    else:
-        classes = [host_class, connectivity_class]
 
     # create a custom host class for this machine and return an instance of it
     host_class = type("%s_host" % hostname, tuple(classes), {})
diff --git a/site_utils/test_droid.py b/site_utils/test_droid.py
new file mode 100755
index 0000000..f28e91c
--- /dev/null
+++ b/site_utils/test_droid.py
@@ -0,0 +1,88 @@
+#!/usr/bin/python
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import argparse
+import os
+import sys
+
+import logging
+# Turn the logging level to INFO before importing other autotest
+# code, to avoid having failed import logging messages confuse the
+# test_droid user.
+logging.basicConfig(level=logging.INFO)
+
+import common
+from autotest_lib.site_utils import test_runner_utils
+
+
+_TEST_REPORT_SCRIPTNAME = '/usr/bin/generate_test_report'
+
+
+def parse_arguments(argv):
+    """
+    Parse command line arguments
+
+    @param argv: argument list to parse
+
+    @returns:    parsed arguments
+
+    @raises SystemExit if arguments are malformed, or required arguments
+            are not present.
+    """
+    return _parse_arguments_internal(argv)[0]
+
+
+def _parse_arguments_internal(argv):
+    """
+    Parse command line arguments
+
+    @param argv: argument list to parse
+
+    @returns:    tuple of parsed arguments and argv suitable for remote runs
+
+    @raises SystemExit if arguments are malformed, or required arguments
+            are not present.
+    """
+
+    parser = argparse.ArgumentParser(description='Run remote tests.')
+
+    parser.add_argument('serials', metavar='SERIALS',
+                        help='Comma separate list of device serials under '
+                             'test.')
+    test_runner_utils.add_common_args(parser)
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    """
+    Entry point for test_droid script.
+
+    @param argv: arguments list
+    """
+    arguments = _parse_arguments_internal(argv)
+
+    results_directory = test_runner_utils.create_results_directory(
+            arguments.results_dir)
+    arguments.results_dir = results_directory
+
+    autotest_path = os.path.dirname(os.path.dirname(
+            os.path.realpath(__file__)))
+    site_utils_path = os.path.join(autotest_path, 'site_utils')
+    realpath = os.path.realpath(__file__)
+    site_utils_path = os.path.realpath(site_utils_path)
+    host_attributes = {'serials' : arguments.serials,
+                       'os_type' : 'android'}
+
+    return test_runner_utils.perform_run_from_autotest_root(
+                autotest_path, argv, arguments.tests, 'localhost',
+                args=arguments.args, ignore_deps=not arguments.enforce_deps,
+                results_directory=results_directory,
+                iterations=arguments.iterations,
+                fast_mode=arguments.fast_mode, debug=arguments.debug,
+                host_attributes=host_attributes)
+
+
+if __name__ == '__main__':
+    sys.exit(main(sys.argv[1:]))
diff --git a/site_utils/test_runner_utils.py b/site_utils/test_runner_utils.py
new file mode 100755
index 0000000..6708590
--- /dev/null
+++ b/site_utils/test_runner_utils.py
@@ -0,0 +1,705 @@
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import errno
+import os
+import re
+import shutil
+import signal
+import stat
+import subprocess
+import sys
+import tempfile
+import threading
+
+import logging
+# Turn the logging level to INFO before importing other autotest
+# code, to avoid having failed import logging messages confuse the
+# test_that user.
+logging.basicConfig(level=logging.INFO)
+
+import common
+from autotest_lib.client.common_lib.cros import dev_server, retry
+from autotest_lib.client.common_lib import logging_manager
+from autotest_lib.server.cros.dynamic_suite import suite, constants
+from autotest_lib.server.cros import provision
+from autotest_lib.server.hosts import factory
+from autotest_lib.server import autoserv_utils
+from autotest_lib.server import server_logging_config
+from autotest_lib.server import utils
+
+
+_autoserv_proc = None
+_sigint_handler_lock = threading.Lock()
+
+_AUTOSERV_SIGINT_TIMEOUT_SECONDS = 5
+_NO_BOARD = 'ad_hoc_board'
+NO_BUILD = 'ad_hoc_build'
+_SUITE_REGEX = r'suite:(.*)'
+
+_TEST_KEY_FILENAME = 'testing_rsa'
+TEST_KEY_PATH = ('/mnt/host/source/src/scripts/mod_for_test_scripts/'
+                  'ssh_keys/%s' % _TEST_KEY_FILENAME)
+
+_TEST_REPORT_SCRIPTNAME = '/usr/bin/generate_test_report'
+
+_LATEST_RESULTS_DIRECTORY = '/tmp/test_that_latest'
+
+
+class TestThatRunError(Exception):
+    """Raised if test_that encounters something unexpected while running."""
+
+
+class TestThatProvisioningError(Exception):
+    """Raised when it fails to provision the DUT to the requested build."""
+
+
+def add_common_args(parser):
+    """
+    Add common arguments for both test_that and test_droid to their parser.
+
+    @param parser: argparse.ArgumentParser object to add arguments to.
+    """
+    parser.add_argument('tests', nargs='+', metavar='TEST',
+                        help='Run given test(s). Use suite:SUITE to specify '
+                             'test suite. Use e:[NAME_PATTERN] to specify a '
+                             'NAME-matching regular expression. Use '
+                             'f:[FILE_PATTERN] to specify a filename matching '
+                             'regular expression. Specified regular '
+                             'expressions will be implicitly wrapped in '
+                             '^ and $.')
+    parser.add_argument('--fast', action='store_true', dest='fast_mode',
+                        default=False,
+                        help='Enable fast mode.  This will cause test_droid '
+                             'to skip time consuming steps like sysinfo and '
+                             'collecting crash information.')
+    parser.add_argument('--args', metavar='ARGS',
+                        help='Whitespace separated argument string to pass '
+                             'through to test. Only supported for runs '
+                             'against a local DUT.')
+    parser.add_argument('--results_dir', metavar='RESULTS_DIR', default=None,
+                        help='Instead of storing results in a new subdirectory'
+                             ' of /tmp , store results in RESULTS_DIR. If '
+                             'RESULTS_DIR already exists, it will be deleted.')
+    parser.add_argument('--pretend', action='store_true', default=False,
+                        help='Print autoserv commands that would be run, '
+                             'rather than running them.')
+    parser.add_argument('--no-experimental', action='store_true',
+                        default=False, dest='no_experimental',
+                        help='When scheduling a suite, skip any tests marked '
+                             'as experimental. Applies only to tests scheduled'
+                             ' via suite:[SUITE].')
+    parser.add_argument('--enforce-deps', action='store_true',
+                        default=False, dest='enforce_deps',
+                        help='Skip tests whose DEPENDENCIES can not '
+                             'be satisfied.')
+    parser.add_argument('--debug', action='store_true',
+                        help='Include DEBUG level messages in stdout. Note: '
+                             'these messages will be included in output log '
+                             'file regardless. In addition, turn on autoserv '
+                             'verbosity.')
+    parser.add_argument('--iterations', action='store', type=int, default=1,
+                        help='Number of times to run the tests specified.')
+
+
+
+def fetch_local_suite(autotest_path, suite_predicate, afe, test_arg, remote,
+                      build=NO_BUILD, board=_NO_BOARD,
+                      results_directory=None, no_experimental=False,
+                      ignore_deps=True):
+    """Create a suite from the given suite predicate.
+
+    Satisfaction of dependencies is enforced by Suite.schedule() if
+    ignore_deps is False. Note that this method assumes only one host,
+    i.e. |remote|, was added to afe. Suite.schedule() will not
+    schedule a job if none of the hosts in the afe (in our case,
+    just one host |remote|) has a label that matches a requested
+    test dependency.
+
+    @param autotest_path: Absolute path to autotest (in sysroot or
+                          custom autotest directory set by --autotest_dir).
+    @param suite_predicate: callable that takes ControlData objects, and
+                            returns True on those that should be in suite
+    @param afe: afe object to schedule against (typically a directAFE)
+    @param test_arg: String. An individual TEST command line argument, e.g.
+                     'login_CryptohomeMounted' or 'suite:smoke'.
+    @param remote: String representing the IP of the remote host.
+    @param build: Build to schedule suite for.
+    @param board: Board to schedule suite for.
+    @param results_directory: Absolute path of directory to store results in.
+                              (results will be stored in subdirectory of this).
+    @param no_experimental: Skip experimental tests when scheduling a suite.
+    @param ignore_deps: If True, test dependencies will be ignored.
+
+    @returns: A suite.Suite object.
+
+    """
+    fs_getter = suite.Suite.create_fs_getter(autotest_path)
+    devserver = dev_server.ImageServer('')
+    my_suite = suite.Suite.create_from_predicates([suite_predicate],
+            {provision.CROS_VERSION_PREFIX: build},
+            constants.BOARD_PREFIX + board,
+            devserver, fs_getter, afe=afe,
+            ignore_deps=ignore_deps,
+            results_dir=results_directory, forgiving_parser=False)
+    if len(my_suite.tests) == 0:
+        (similarity_predicate, similarity_description) = (
+                get_predicate_for_possible_test_arg(test_arg))
+        logging.error('No test found, searching for possible tests with %s',
+                      similarity_description)
+        possible_tests = suite.Suite.find_possible_tests(fs_getter,
+                                                         similarity_predicate)
+        raise ValueError('Found no tests. Check your suite name, test name, '
+                         'or test matching wildcard.\nDid you mean any of '
+                         'following tests?\n  %s' % '\n  '.join(possible_tests))
+
+    if not ignore_deps:
+        # Log tests whose dependencies can't be satisfied.
+        labels = [label.name for label in
+                  afe.get_labels(host__hostname=remote)]
+        for test in my_suite.tests:
+            if test.experimental and no_experimental:
+                continue
+            unsatisfiable_deps = set(test.dependencies).difference(labels)
+            if unsatisfiable_deps:
+                logging.warning('%s will be skipped, unsatisfiable '
+                             'test dependencies: %s', test.name,
+                             unsatisfiable_deps)
+    return my_suite
+
+
+def _run_autoserv(command, pretend=False):
+    """Run autoserv command.
+
+    Run the autoserv command and wait on it. Log the stdout.
+    Ensure that SIGINT signals are passed along to autoserv.
+
+    @param command: the autoserv command to run.
+    @returns: exit code of the command.
+
+    """
+    if not pretend:
+        logging.debug('Running autoserv command: %s', command)
+        global _autoserv_proc
+        _autoserv_proc = subprocess.Popen(command,
+                                          stdout=subprocess.PIPE,
+                                          stderr=subprocess.STDOUT)
+        # This incantation forces unbuffered reading from stdout,
+        # so that autoserv output can be displayed to the user
+        # immediately.
+        for message in iter(_autoserv_proc.stdout.readline, b''):
+            logging.info('autoserv| %s', message.strip())
+
+        _autoserv_proc.wait()
+        returncode = _autoserv_proc.returncode
+        _autoserv_proc = None
+    else:
+        logging.info('Pretend mode. Would run autoserv command: %s',
+                     command)
+        returncode = 0
+    return returncode
+
+
+def run_provisioning_job(provision_label, host, autotest_path,
+                         results_directory, fast_mode,
+                         ssh_verbosity=0, ssh_options=None,
+                         pretend=False, autoserv_verbose=False):
+    """Shell out to autoserv to run provisioning job.
+
+    @param provision_label: Label to provision the machine to.
+    @param host: Hostname of DUT.
+    @param autotest_path: Absolute path of autotest directory.
+    @param results_directory: Absolute path of directory to store results in.
+                              (results will be stored in subdirectory of this).
+    @param fast_mode: bool to use fast mode (disables slow autotest features).
+    @param ssh_verbosity: SSH verbosity level, passed along to autoserv_utils
+    @param ssh_options: Additional ssh options to be passed to autoserv_utils
+    @param pretend: If True, will print out autoserv commands rather than
+                    running them.
+    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
+
+    @returns: Absolute path of directory where results were stored.
+
+    """
+    # TODO(fdeng): When running against a local DUT, autoserv
+    # is still hitting the AFE in the lab.
+    # provision_AutoUpdate checks the current build of DUT by
+    # retrieving build info from AFE. crosbug.com/295178
+    results_directory = os.path.join(results_directory, 'results-provision')
+    command = autoserv_utils.autoserv_run_job_command(
+            os.path.join(autotest_path, 'server'),
+            machines=host, job=None, verbose=autoserv_verbose,
+            results_directory=results_directory,
+            fast_mode=fast_mode, ssh_verbosity=ssh_verbosity,
+            ssh_options=ssh_options,
+            extra_args=['--provision', '--job-labels', provision_label],
+            no_console_prefix=True)
+    if _run_autoserv(command, pretend) != 0:
+        raise TestThatProvisioningError('Command returns non-zero code: %s ' %
+                                        command)
+    return results_directory
+
+
+def run_job(job, host, autotest_path, results_directory, fast_mode,
+            id_digits=1, ssh_verbosity=0, ssh_options=None,
+            args=None, pretend=False,
+            autoserv_verbose=False, host_attributes={}):
+    """
+    Shell out to autoserv to run an individual test job.
+
+    @param job: A Job object containing the control file contents and other
+                relevent metadata for this test.
+    @param host: Hostname of DUT to run test against.
+    @param autotest_path: Absolute path of autotest directory.
+    @param results_directory: Absolute path of directory to store results in.
+                              (results will be stored in subdirectory of this).
+    @param fast_mode: bool to use fast mode (disables slow autotest features).
+    @param id_digits: The minimum number of digits that job ids should be
+                      0-padded to when formatting as a string for results
+                      directory.
+    @param ssh_verbosity: SSH verbosity level, passed along to autoserv_utils
+    @param ssh_options: Additional ssh options to be passed to autoserv_utils
+    @param args: String that should be passed as args parameter to autoserv,
+                 and then ultimitely to test itself.
+    @param pretend: If True, will print out autoserv commands rather than
+                    running them.
+    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
+    @param host_attributes: Dict of host attributes to pass into autoserv.
+
+    @returns: a tuple, return code of the job and absolute path of directory
+              where results were stored.
+    """
+    with tempfile.NamedTemporaryFile() as temp_file:
+        temp_file.write(job.control_file)
+        temp_file.flush()
+        name_tail = job.name.split('/')[-1]
+        results_directory = os.path.join(results_directory,
+                                         'results-%0*d-%s' % (id_digits, job.id,
+                                                              name_tail))
+        # Drop experimental keyval in the keval file in the job result folder.
+        os.makedirs(results_directory)
+        utils.write_keyval(results_directory,
+                           {constants.JOB_EXPERIMENTAL_KEY: job.keyvals[
+                                   constants.JOB_EXPERIMENTAL_KEY]})
+        extra_args = [temp_file.name]
+        if args:
+            extra_args.extend(['--args', args])
+
+        command = autoserv_utils.autoserv_run_job_command(
+                os.path.join(autotest_path, 'server'),
+                machines=host, job=job, verbose=autoserv_verbose,
+                results_directory=results_directory,
+                fast_mode=fast_mode, ssh_verbosity=ssh_verbosity,
+                ssh_options=ssh_options,
+                extra_args=extra_args,
+                no_console_prefix=True,
+                use_packaging=False,
+                host_attributes=host_attributes)
+
+        code = _run_autoserv(command, pretend)
+        return code, results_directory
+
+
+def setup_local_afe():
+    """
+    Setup a local afe database and return a direct_afe object to access it.
+
+    @returns: A autotest_lib.frontend.afe.direct_afe instance.
+    """
+    # This import statement is delayed until now rather than running at
+    # module load time, because it kicks off a local sqlite :memory: backed
+    # database, and we don't need that unless we are doing a local run.
+    from autotest_lib.frontend import setup_django_lite_environment
+    from autotest_lib.frontend.afe import direct_afe
+    return direct_afe.directAFE()
+
+
+def get_predicate_for_test_arg(test):
+    """
+    Gets a suite predicte function for a given command-line argument.
+
+    @param test: String. An individual TEST command line argument, e.g.
+                         'login_CryptohomeMounted' or 'suite:smoke'
+    @returns: A (predicate, string) tuple with the necessary suite
+              predicate, and a description string of the suite that
+              this predicate will produce.
+    """
+    suitematch = re.match(_SUITE_REGEX, test)
+    name_pattern_match = re.match(r'e:(.*)', test)
+    file_pattern_match = re.match(r'f:(.*)', test)
+    if suitematch:
+        suitename = suitematch.group(1)
+        return (suite.Suite.name_in_tag_predicate(suitename),
+                'suite named %s' % suitename)
+    if name_pattern_match:
+        pattern = '^%s$' % name_pattern_match.group(1)
+        return (suite.Suite.test_name_matches_pattern_predicate(pattern),
+                'suite to match name pattern %s' % pattern)
+    if file_pattern_match:
+        pattern = '^%s$' % file_pattern_match.group(1)
+        return (suite.Suite.test_file_matches_pattern_predicate(pattern),
+                'suite to match file name pattern %s' % pattern)
+    return (suite.Suite.test_name_equals_predicate(test),
+            'job named %s' % test)
+
+
+def get_predicate_for_possible_test_arg(test):
+    """
+    Gets a suite predicte function to calculate the similarity of given test
+    and possible tests.
+
+    @param test: String. An individual TEST command line argument, e.g.
+                         'login_CryptohomeMounted' or 'suite:smoke'
+    @returns: A (predicate, string) tuple with the necessary suite
+              predicate, and a description string of the suite that
+              this predicate will produce.
+    """
+    suitematch = re.match(_SUITE_REGEX, test)
+    name_pattern_match = re.match(r'e:(.*)', test)
+    file_pattern_match = re.match(r'f:(.*)', test)
+    if suitematch:
+        suitename = suitematch.group(1)
+        return (suite.Suite.name_in_tag_similarity_predicate(suitename),
+                'suite name similar to %s' % suitename)
+    if name_pattern_match:
+        pattern = '^%s$' % name_pattern_match.group(1)
+        return (suite.Suite.test_name_similarity_predicate(pattern),
+                'job name similar to %s' % pattern)
+    if file_pattern_match:
+        pattern = '^%s$' % file_pattern_match.group(1)
+        return (suite.Suite.test_file_similarity_predicate(pattern),
+                'suite to match file name similar to %s' % pattern)
+    return (suite.Suite.test_name_similarity_predicate(test),
+            'job name similar to %s' % test)
+
+
+def add_ssh_identity(temp_directory, ssh_private_key=TEST_KEY_PATH):
+    """Add an ssh identity to the agent.
+
+    TODO (sbasi) b/26186193: Add support for test_droid and make TEST_KEY_PATH
+    not Chrome OS specific.
+
+    @param temp_directory: A directory to copy the |private key| into.
+    @param ssh_private_key: Path to the ssh private key to use for testing.
+    """
+    # Add the testing key to the current ssh agent.
+    if os.environ.has_key('SSH_AGENT_PID'):
+        # Copy the testing key to the temp directory and make it NOT
+        # world-readable. Otherwise, ssh-add complains.
+        shutil.copy(ssh_private_key, temp_directory)
+        key_copy_path = os.path.join(temp_directory,
+                                     os.path.basename(ssh_private_key))
+        os.chmod(key_copy_path, stat.S_IRUSR | stat.S_IWUSR)
+        p = subprocess.Popen(['ssh-add', key_copy_path],
+                             stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        p_out, _ = p.communicate()
+        for line in p_out.splitlines():
+            logging.info(line)
+    else:
+        logging.warning('There appears to be no running ssh-agent. Attempting '
+                        'to continue without running ssh-add, but ssh commands '
+                        'may fail.')
+
+
+def _auto_detect_labels(afe, remote):
+    """Automatically detect host labels and add them to the host in afe.
+
+    Note that the label of board will not be auto-detected.
+    This method assumes the host |remote| has already been added to afe.
+
+    @param afe: A direct_afe object used to interact with local afe database.
+    @param remote: The hostname of the remote device.
+
+    """
+    cros_host = factory.create_host(remote)
+    labels_to_create = [label for label in cros_host.get_labels()
+                        if not label.startswith(constants.BOARD_PREFIX)]
+    labels_to_add_to_afe_host = []
+    for label in labels_to_create:
+        new_label = afe.create_label(label)
+        labels_to_add_to_afe_host.append(new_label.name)
+    hosts = afe.get_hosts(hostname=remote)
+    if not hosts:
+        raise TestThatRunError('Unexpected error: %s has not '
+                               'been added to afe.' % remote)
+    afe_host = hosts[0]
+    afe_host.add_labels(labels_to_add_to_afe_host)
+
+
+def perform_local_run(afe, autotest_path, tests, remote, fast_mode,
+                      build=NO_BUILD, board=_NO_BOARD, args=None,
+                      pretend=False, no_experimental=False,
+                      ignore_deps=True,
+                      results_directory=None, ssh_verbosity=0,
+                      ssh_options=None,
+                      autoserv_verbose=False,
+                      iterations=1,
+                      host_attributes={}):
+    """Perform local run of tests.
+
+    This method enforces satisfaction of test dependencies for tests that are
+    run as a part of a suite.
+
+    @param afe: A direct_afe object used to interact with local afe database.
+    @param autotest_path: Absolute path of autotest installed in sysroot or
+                          custom autotest path set by --autotest_dir.
+    @param tests: List of strings naming tests and suites to run. Suite strings
+                  should be formed like "suite:smoke".
+    @param remote: Remote hostname.
+    @param fast_mode: bool to use fast mode (disables slow autotest features).
+    @param build: String specifying build for local run.
+    @param board: String specifyinb board for local run.
+    @param args: String that should be passed as args parameter to autoserv,
+                 and then ultimitely to test itself.
+    @param pretend: If True, will print out autoserv commands rather than
+                    running them.
+    @param no_experimental: Skip experimental tests when scheduling a suite.
+    @param ignore_deps: If True, test dependencies will be ignored.
+    @param results_directory: Directory to store results in. Defaults to None,
+                              in which case results will be stored in a new
+                              subdirectory of /tmp
+    @param ssh_verbosity: SSH verbosity level, passed through to
+                          autoserv_utils.
+    @param ssh_options: Additional ssh options to be passed to autoserv_utils
+    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
+    @param iterations: int number of times to schedule tests.
+    @param host_attributes: Dict of host attributes to pass into autoserv.
+
+    @returns: A list of return codes each job that has run.
+    """
+    # Create host in afe, add board and build labels.
+    cros_version_label = provision.cros_version_to_label(build)
+    build_label = afe.create_label(cros_version_label)
+    board_label = afe.create_label(constants.BOARD_PREFIX + board)
+    new_host = afe.create_host(remote)
+    new_host.add_labels([build_label.name, board_label.name])
+    if not ignore_deps:
+        logging.info('Auto-detecting labels for %s', remote)
+        _auto_detect_labels(afe, remote)
+    # Provision the host to |build|.
+    if build != NO_BUILD:
+        logging.info('Provisioning %s...', cros_version_label)
+        try:
+            run_provisioning_job(cros_version_label, remote, autotest_path,
+                                 results_directory, fast_mode,
+                                 ssh_verbosity, ssh_options,
+                                 pretend, autoserv_verbose)
+        except TestThatProvisioningError as e:
+            logging.error('Provisioning %s to %s failed, tests are aborted, '
+                          'failure reason: %s',
+                          remote, cros_version_label, e)
+            return
+
+    # Create suites that will be scheduled.
+    suites_and_descriptions = []
+    for test in tests:
+        (predicate, description) = get_predicate_for_test_arg(test)
+        logging.info('Fetching suite for %s...', description)
+        suite = fetch_local_suite(autotest_path, predicate, afe, test_arg=test,
+                                  remote=remote,
+                                  build=build, board=board,
+                                  results_directory=results_directory,
+                                  no_experimental=no_experimental,
+                                  ignore_deps=ignore_deps)
+        suites_and_descriptions.append((suite, description))
+
+    # Schedule the suites, looping over iterations if necessary.
+    for iteration in range(iterations):
+        if iteration > 0:
+            logging.info('Repeating scheduling for iteration %d:', iteration)
+
+        for suite, description in suites_and_descriptions:
+            logging.info('Scheduling suite for %s...', description)
+            ntests = suite.schedule(
+                    lambda log_entry, log_in_subdir=False: None,
+                    add_experimental=not no_experimental)
+            logging.info('... scheduled %s job(s).', ntests)
+
+    if not afe.get_jobs():
+        logging.info('No jobs scheduled. End of local run.')
+        return
+
+    last_job_id = afe.get_jobs()[-1].id
+    job_id_digits = len(str(last_job_id))
+    codes = []
+    for job in afe.get_jobs():
+        code, _ = run_job(job, remote, autotest_path, results_directory,
+                fast_mode, job_id_digits, ssh_verbosity, ssh_options, args,
+                pretend, autoserv_verbose, host_attributes)
+        codes.append(code)
+    return codes
+
+
+def sigint_handler(signum, stack_frame):
+    #pylint: disable-msg=C0111
+    """Handle SIGINT or SIGTERM to a local test_that run.
+
+    This handler sends a SIGINT to the running autoserv process,
+    if one is running, giving it up to 5 seconds to clean up and exit. After
+    the timeout elapses, autoserv is killed. In either case, after autoserv
+    exits then this process exits with status 1.
+    """
+    # If multiple signals arrive before handler is unset, ignore duplicates
+    if not _sigint_handler_lock.acquire(False):
+        return
+    try:
+        # Ignore future signals by unsetting handler.
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+        signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+        logging.warning('Received SIGINT or SIGTERM. Cleaning up and exiting.')
+        if _autoserv_proc:
+            logging.warning('Sending SIGINT to autoserv process. Waiting up '
+                            'to %s seconds for cleanup.',
+                            _AUTOSERV_SIGINT_TIMEOUT_SECONDS)
+            _autoserv_proc.send_signal(signal.SIGINT)
+            timed_out, _ = retry.timeout(_autoserv_proc.wait,
+                    timeout_sec=_AUTOSERV_SIGINT_TIMEOUT_SECONDS)
+            if timed_out:
+                _autoserv_proc.kill()
+                logging.warning('Timed out waiting for autoserv to handle '
+                                'SIGINT. Killed autoserv.')
+    finally:
+        _sigint_handler_lock.release() # this is not really necessary?
+        sys.exit(1)
+
+
+def create_results_directory(results_directory=None):
+    """Create a results directory.
+
+    If no directory is specified this method will create and return a
+    temp directory to hold results. If a directory name is specified this
+    method will create a directory at the given path, provided it doesn't
+    already exist.
+
+    @param results_directory: The path to the results_directory to create.
+
+    @return results_directory: A path to the results_directory, ready for use.
+    """
+    if results_directory is None:
+        # Create a results_directory as subdir of /tmp
+        results_directory = tempfile.mkdtemp(prefix='test_that_results_')
+    else:
+        # Delete results_directory if it already exists.
+        try:
+            shutil.rmtree(results_directory)
+        except OSError as e:
+            if e.errno != errno.ENOENT:
+                raise
+
+        # Create results_directory if it does not exist
+        try:
+            os.makedirs(results_directory)
+        except OSError as e:
+            if e.errno != errno.EEXIST:
+                raise
+    return results_directory
+
+
+def perform_run_from_autotest_root(autotest_path, argv, tests, remote,
+                                   build=NO_BUILD, board=_NO_BOARD, args=None,
+                                   pretend=False, no_experimental=False,
+                                   ignore_deps=True,
+                                   results_directory=None, ssh_verbosity=0,
+                                   ssh_options=None,
+                                   iterations=1, fast_mode=False, debug=False,
+                                   whitelist_chrome_crashes=False,
+                                   host_attributes={}):
+    """
+    Perform a test_that run, from the |autotest_path|.
+
+    This function is to be called from test_that/test_droid's main() script,
+    when tests are executed from the |autotest_path|. It handles all stages
+    of a test run that come after the bootstrap into |autotest_path|.
+
+    @param autotest_path: Full absolute path to the autotest root directory.
+    @param argv: The arguments list, as passed to main(...)
+    @param tests: List of strings naming tests and suites to run. Suite strings
+                  should be formed like "suite:smoke".
+    @param remote: Remote hostname.
+    @param build: String specifying build for local run.
+    @param board: String specifyinb board for local run.
+    @param args: String that should be passed as args parameter to autoserv,
+                 and then ultimitely to test itself.
+    @param pretend: If True, will print out autoserv commands rather than
+                    running them.
+    @param no_experimental: Skip experimental tests when scheduling a suite.
+    @param ignore_deps: If True, test dependencies will be ignored.
+    @param results_directory: Directory to store results in. Defaults to None,
+                              in which case results will be stored in a new
+                              subdirectory of /tmp
+    @param ssh_verbosity: SSH verbosity level, passed through to
+                          autoserv_utils.
+    @param ssh_options: Additional ssh options to be passed to autoserv_utils
+    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
+    @param iterations: int number of times to schedule tests.
+    @param fast_mode: bool to use fast mode (disables slow autotest features).
+    @param debug: Logging and autoserv verbosity.
+    @param whitelist_chrome_crashes: If True, whitelist chrome crashes.
+    @param host_attributes: Dict of host attributes to pass into autoserv.
+
+    @returns: A return code that test_that should exit with.
+    """
+    if results_directory is None or not os.path.exists(results_directory):
+        raise ValueError('Expected valid results directory, got %s' %
+                          results_directory)
+
+    logging_manager.configure_logging(
+            server_logging_config.ServerLoggingConfig(),
+            results_dir=results_directory,
+            use_console=True,
+            verbose=debug,
+            debug_log_name='test_that')
+    logging.info('Began logging to %s', results_directory)
+
+    logging.debug('test_that command line was: %s', argv)
+
+    signal.signal(signal.SIGINT, sigint_handler)
+    signal.signal(signal.SIGTERM, sigint_handler)
+
+    afe = setup_local_afe()
+    codes = perform_local_run(afe, autotest_path, tests, remote, fast_mode,
+                      build, board,
+                      args=args,
+                      pretend=pretend,
+                      no_experimental=no_experimental,
+                      ignore_deps=ignore_deps,
+                      results_directory=results_directory,
+                      ssh_verbosity=ssh_verbosity,
+                      ssh_options=ssh_options,
+                      autoserv_verbose=debug,
+                      iterations=iterations,
+                      host_attributes=host_attributes)
+    if pretend:
+        logging.info('Finished pretend run. Exiting.')
+        return 0
+
+    # TODO b/25929635 (sbasi/wiley) Move the generate test report script.
+    if not os.path.exists(_TEST_REPORT_SCRIPTNAME):
+        logging.info('%s does not exist. Exiting.', _TEST_REPORT_SCRIPTNAME)
+        return 0
+    test_report_command = [_TEST_REPORT_SCRIPTNAME]
+    # Experimental test results do not influence the exit code.
+    test_report_command.append('--ignore_experimental_tests')
+    if whitelist_chrome_crashes:
+        test_report_command.append('--whitelist_chrome_crashes')
+    test_report_command.append(results_directory)
+    final_result = subprocess.call(test_report_command)
+    with open(os.path.join(results_directory, 'test_report.log'),
+              'w') as report_log:
+        subprocess.call(test_report_command, stdout=report_log)
+    try:
+        os.unlink(_LATEST_RESULTS_DIRECTORY)
+    except OSError:
+        pass
+    link_target = os.path.relpath(results_directory,
+                                  os.path.dirname(_LATEST_RESULTS_DIRECTORY))
+    if any(codes):
+        logging.error('Autoserv encountered unexpected errors '
+                      'when executing jobs.')
+        final_result = final_result or 1
+    os.symlink(link_target, _LATEST_RESULTS_DIRECTORY)
+    logging.info('Finished running tests. Results can be found in %s or %s',
+                 results_directory, _LATEST_RESULTS_DIRECTORY)
+    return final_result
diff --git a/site_utils/test_runner_utils_unittest.py b/site_utils/test_runner_utils_unittest.py
new file mode 100755
index 0000000..f1a0a01
--- /dev/null
+++ b/site_utils/test_runner_utils_unittest.py
@@ -0,0 +1,228 @@
+#!/usr/bin/python
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+# pylint: disable-msg=C0111
+
+import os, unittest
+import mox
+import common
+import subprocess
+import types
+from autotest_lib.server import utils
+from autotest_lib.server.cros.dynamic_suite import constants
+from autotest_lib.site_utils import test_runner_utils
+
+
+class StartsWithList(mox.Comparator):
+    def __init__(self, start_of_list):
+        """Mox comparator which returns True if the argument
+        to the mocked function is a list that begins with the elements
+        in start_of_list.
+        """
+        self._lhs = start_of_list
+
+    def equals(self, rhs):
+        if len(rhs)<len(self._lhs):
+            return False
+        for (x, y) in zip(self._lhs, rhs):
+            if x != y:
+                return False
+        return True
+
+
+class ContainsSublist(mox.Comparator):
+    def __init__(self, sublist):
+        """Mox comparator which returns True if the argument
+        to the mocked function is a list that contains sublist
+        as a sub-list.
+        """
+        self._sublist = sublist
+
+    def equals(self, rhs):
+        n = len(self._sublist)
+        if len(rhs)<n:
+            return False
+        return any((self._sublist == rhs[i:i+n])
+                   for i in xrange(len(rhs) - n + 1))
+
+
+class TestRunnerUnittests(unittest.TestCase):
+
+    def test_fetch_local_suite(self):
+        # Deferred until fetch_local_suite knows about non-local builds.
+        pass
+
+    def test_get_predicate_for_test_arg(self):
+        # Assert the type signature of get_predicate_for_test(...)
+        # Because control.test_utils_wrapper calls this function,
+        # it is imperative for backwards compatilbility that
+        # the return type of the tested function does not change.
+        tests = ['dummy_test', 'e:name_expression', 'f:expression',
+                 'suite:suitename']
+        for test in tests:
+            pred, desc = test_runner_utils.get_predicate_for_test_arg(test)
+            self.assertTrue(isinstance(pred, types.FunctionType))
+            self.assertTrue(isinstance(desc, str))
+
+    def test_run_job(self):
+        class Object():
+            pass
+
+        autotest_path = 'htap_tsetotua'
+        autoserv_command = os.path.join(autotest_path, 'server', 'autoserv')
+        remote = 'etomer'
+        results_dir = '/tmp/fakeresults'
+        fast_mode = False
+        job1_results_dir = '/tmp/fakeresults/results-1-gilbert'
+        job2_results_dir = '/tmp/fakeresults/results-2-sullivan'
+        args = 'matey'
+        expected_args_sublist = ['--args', args]
+        experimental_keyval = {constants.JOB_EXPERIMENTAL_KEY: False}
+        self.mox = mox.Mox()
+
+        # Create some dummy job objects.
+        job1 = Object()
+        job2 = Object()
+        setattr(job1, 'control_type', 'cLiEnT')
+        setattr(job1, 'control_file', 'c1')
+        setattr(job1, 'id', 1)
+        setattr(job1, 'name', 'gilbert')
+        setattr(job1, 'keyvals', experimental_keyval)
+
+        setattr(job2, 'control_type', 'Server')
+        setattr(job2, 'control_file', 'c2')
+        setattr(job2, 'id', 2)
+        setattr(job2, 'name', 'sullivan')
+        setattr(job2, 'keyvals', experimental_keyval)
+
+        id_digits = 1
+
+        # Stub out subprocess.Popen and wait calls.
+        # Make them expect correct arguments.
+        def fake_readline():
+            return b''
+        mock_process_1 = self.mox.CreateMock(subprocess.Popen)
+        mock_process_2 = self.mox.CreateMock(subprocess.Popen)
+        fake_stdout = self.mox.CreateMock(file)
+        fake_returncode = 0
+        mock_process_1.stdout = fake_stdout
+        mock_process_1.returncode = fake_returncode
+        mock_process_2.stdout = fake_stdout
+        mock_process_2.returncode = fake_returncode
+
+        self.mox.StubOutWithMock(os, 'makedirs')
+        self.mox.StubOutWithMock(utils, 'write_keyval')
+        self.mox.StubOutWithMock(subprocess, 'Popen')
+
+        os.makedirs(job1_results_dir)
+        utils.write_keyval(job1_results_dir, experimental_keyval)
+        arglist_1 = [autoserv_command, '-p', '-r', job1_results_dir,
+                     '-m', remote, '--no_console_prefix', '-l', 'gilbert',
+                     '-c']
+        subprocess.Popen(mox.And(StartsWithList(arglist_1),
+                                 ContainsSublist(expected_args_sublist)),
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT
+                        ).AndReturn(mock_process_1)
+        mock_process_1.stdout.readline().AndReturn(b'')
+        mock_process_1.wait().AndReturn(0)
+
+        os.makedirs(job2_results_dir)
+        utils.write_keyval(job2_results_dir, experimental_keyval)
+        arglist_2 = [autoserv_command, '-p', '-r', job2_results_dir,
+                     '-m', remote,  '--no_console_prefix', '-l', 'sullivan',
+                     '-s']
+        subprocess.Popen(mox.And(StartsWithList(arglist_2),
+                                 ContainsSublist(expected_args_sublist)),
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT
+                        ).AndReturn(mock_process_2)
+        mock_process_2.stdout.readline().AndReturn(b'')
+        mock_process_2.wait().AndReturn(0)
+
+        # Test run_job.
+        self.mox.ReplayAll()
+        code, job_res = test_runner_utils.run_job(
+                job1, remote, autotest_path,results_dir, fast_mode, id_digits,
+                0, None, args)
+        self.assertEqual(job_res, job1_results_dir)
+        self.assertEqual(code, 0)
+        code, job_res = test_runner_utils.run_job(
+                job2, remote, autotest_path, results_dir, fast_mode, id_digits,
+                0, None, args)
+
+        self.assertEqual(job_res, job2_results_dir)
+        self.assertEqual(code, 0)
+        self.mox.UnsetStubs()
+        self.mox.VerifyAll()
+        self.mox.ResetAll()
+
+    def test_perform_local_run(self):
+        afe = test_runner_utils.setup_local_afe()
+        autotest_path = 'ottotest_path'
+        suite_name = 'sweet_name'
+        test_arg = 'suite:' + suite_name
+        remote = 'remoat'
+        build = 'bild'
+        board = 'bored'
+        fast_mode = False
+        suite_control_files = ['c1', 'c2', 'c3', 'c4']
+        results_dir = '/tmp/test_that_results_fake'
+        id_digits = 1
+        ssh_verbosity = 2
+        ssh_options = '-F /dev/null -i /dev/null'
+        args = 'matey'
+        ignore_deps = False
+
+        # Fake suite objects that will be returned by fetch_local_suite
+        class fake_suite(object):
+            def __init__(self, suite_control_files, hosts):
+                self._suite_control_files = suite_control_files
+                self._hosts = hosts
+
+            def schedule(self, *args, **kwargs):
+                for control_file in self._suite_control_files:
+                    afe.create_job(control_file, hosts=self._hosts)
+
+        # Mock out scheduling of suite and running of jobs.
+        self.mox = mox.Mox()
+
+        self.mox.StubOutWithMock(test_runner_utils, 'fetch_local_suite')
+        test_runner_utils.fetch_local_suite(autotest_path, mox.IgnoreArg(),
+                afe, test_arg=test_arg, remote=remote, build=build,
+                board=board, results_directory=results_dir,
+                no_experimental=False,
+                ignore_deps=ignore_deps
+                ).AndReturn(fake_suite(suite_control_files, [remote]))
+        self.mox.StubOutWithMock(test_runner_utils, 'run_job')
+        self.mox.StubOutWithMock(test_runner_utils, 'run_provisioning_job')
+        self.mox.StubOutWithMock(test_runner_utils, '_auto_detect_labels')
+
+        test_runner_utils._auto_detect_labels(afe, remote)
+        # Test perform_local_run. Enforce that run_provisioning_job,
+        # run_job and _auto_detect_labels are called correctly.
+        test_runner_utils.run_provisioning_job(
+                'cros-version:' + build, remote, autotest_path,
+                 results_dir, fast_mode,
+                 ssh_verbosity, ssh_options,
+                 False, False)
+
+        for control_file in suite_control_files:
+            test_runner_utils.run_job(
+                    mox.ContainsAttributeValue('control_file', control_file),
+                    remote, autotest_path, results_dir, fast_mode,id_digits,
+                    ssh_verbosity, ssh_options,args, False,
+                    False, {}).AndReturn((0, '/fake/dir'))
+        self.mox.ReplayAll()
+        test_runner_utils.perform_local_run(
+                afe, autotest_path, ['suite:'+suite_name], remote, fast_mode,
+                build=build, board=board, ignore_deps=False,
+                ssh_verbosity=ssh_verbosity, ssh_options=ssh_options,
+                args=args, results_directory=results_dir)
+        self.mox.UnsetStubs()
+        self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/site_utils/test_that.py b/site_utils/test_that.py
index df0d689..ae553fc 100755
--- a/site_utils/test_that.py
+++ b/site_utils/test_that.py
@@ -4,17 +4,11 @@
 # found in the LICENSE file.
 
 import argparse
-import errno
 import os
 import pipes
-import re
-import shutil
 import signal
-import stat
 import subprocess
 import sys
-import tempfile
-import threading
 
 import logging
 # Turn the logging level to INFO before importing other autotest
@@ -23,14 +17,11 @@
 logging.basicConfig(level=logging.INFO)
 
 import common
-from autotest_lib.client.common_lib.cros import dev_server, retry
 from autotest_lib.client.common_lib import error, logging_manager
-from autotest_lib.server.cros.dynamic_suite import suite, constants
-from autotest_lib.server.cros import provision
-from autotest_lib.server.hosts import factory
-from autotest_lib.server import autoserv_utils
 from autotest_lib.server import server_logging_config
-from autotest_lib.server import utils
+from autotest_lib.server.cros.dynamic_suite import constants
+from autotest_lib.server.hosts import factory
+from autotest_lib.site_utils import test_runner_utils
 
 
 try:
@@ -42,322 +33,7 @@
     print '  - (not yet supported) be run after running '
     print '    ../utils/build_externals.py'
 
-_autoserv_proc = None
-_sigint_handler_lock = threading.Lock()
-
-_AUTOSERV_SIGINT_TIMEOUT_SECONDS = 5
-_NO_BOARD = 'ad_hoc_board'
-_NO_BUILD = 'ad_hoc_build'
-_SUITE_REGEX = r'suite:(.*)'
-
 _QUICKMERGE_SCRIPTNAME = '/mnt/host/source/chromite/bin/autotest_quickmerge'
-_TEST_KEY_FILENAME = 'testing_rsa'
-_TEST_KEY_PATH = ('/mnt/host/source/src/scripts/mod_for_test_scripts/'
-                  'ssh_keys/%s' % _TEST_KEY_FILENAME)
-
-_TEST_REPORT_SCRIPTNAME = '/usr/bin/generate_test_report'
-
-_LATEST_RESULTS_DIRECTORY = '/tmp/test_that_latest'
-
-
-class TestThatRunError(Exception):
-    """Raised if test_that encounters something unexpected while running."""
-
-
-class TestThatProvisioningError(Exception):
-    """Raised when it fails to provision the DUT to the requested build."""
-
-
-def fetch_local_suite(autotest_path, suite_predicate, afe, test_arg, remote,
-                      build=_NO_BUILD, board=_NO_BOARD,
-                      results_directory=None, no_experimental=False,
-                      ignore_deps=True):
-    """Create a suite from the given suite predicate.
-
-    Satisfaction of dependencies is enforced by Suite.schedule() if
-    ignore_deps is False. Note that this method assumes only one host,
-    i.e. |remote|, was added to afe. Suite.schedule() will not
-    schedule a job if none of the hosts in the afe (in our case,
-    just one host |remote|) has a label that matches a requested
-    test dependency.
-
-    @param autotest_path: Absolute path to autotest (in sysroot or
-                          custom autotest directory set by --autotest_dir).
-    @param suite_predicate: callable that takes ControlData objects, and
-                            returns True on those that should be in suite
-    @param afe: afe object to schedule against (typically a directAFE)
-    @param test_arg: String. An individual TEST command line argument, e.g.
-                     'login_CryptohomeMounted' or 'suite:smoke'.
-    @param remote: String representing the IP of the remote host.
-    @param build: Build to schedule suite for.
-    @param board: Board to schedule suite for.
-    @param results_directory: Absolute path of directory to store results in.
-                              (results will be stored in subdirectory of this).
-    @param no_experimental: Skip experimental tests when scheduling a suite.
-    @param ignore_deps: If True, test dependencies will be ignored.
-
-    @returns: A suite.Suite object.
-
-    """
-    fs_getter = suite.Suite.create_fs_getter(autotest_path)
-    devserver = dev_server.ImageServer('')
-    my_suite = suite.Suite.create_from_predicates([suite_predicate],
-            {provision.CROS_VERSION_PREFIX: build},
-            constants.BOARD_PREFIX + board,
-            devserver, fs_getter, afe=afe,
-            ignore_deps=ignore_deps,
-            results_dir=results_directory, forgiving_parser=False)
-    if len(my_suite.tests) == 0:
-        (similarity_predicate, similarity_description) = (
-                get_predicate_for_possible_test_arg(test_arg))
-        logging.error('No test found, searching for possible tests with %s',
-                      similarity_description)
-        possible_tests = suite.Suite.find_possible_tests(fs_getter,
-                                                         similarity_predicate)
-        raise ValueError('Found no tests. Check your suite name, test name, '
-                         'or test matching wildcard.\nDid you mean any of '
-                         'following tests?\n  %s' % '\n  '.join(possible_tests))
-
-    if not ignore_deps:
-        # Log tests whose dependencies can't be satisfied.
-        labels = [label.name for label in
-                  afe.get_labels(host__hostname=remote)]
-        for test in my_suite.tests:
-            if test.experimental and no_experimental:
-                continue
-            unsatisfiable_deps = set(test.dependencies).difference(labels)
-            if unsatisfiable_deps:
-                logging.warning('%s will be skipped, unsatisfiable '
-                             'test dependencies: %s', test.name,
-                             unsatisfiable_deps)
-    return my_suite
-
-
-def _run_autoserv(command, pretend=False):
-    """Run autoserv command.
-
-    Run the autoserv command and wait on it. Log the stdout.
-    Ensure that SIGINT signals are passed along to autoserv.
-
-    @param command: the autoserv command to run.
-    @returns: exit code of the command.
-
-    """
-    if not pretend:
-        logging.debug('Running autoserv command: %s', command)
-        global _autoserv_proc
-        _autoserv_proc = subprocess.Popen(command,
-                                          stdout=subprocess.PIPE,
-                                          stderr=subprocess.STDOUT)
-        # This incantation forces unbuffered reading from stdout,
-        # so that autoserv output can be displayed to the user
-        # immediately.
-        for message in iter(_autoserv_proc.stdout.readline, b''):
-            logging.info('autoserv| %s', message.strip())
-
-        _autoserv_proc.wait()
-        returncode = _autoserv_proc.returncode
-        _autoserv_proc = None
-    else:
-        logging.info('Pretend mode. Would run autoserv command: %s',
-                     command)
-        returncode = 0
-    return returncode
-
-
-def run_provisioning_job(provision_label, host, autotest_path,
-                         results_directory, fast_mode,
-                         ssh_verbosity=0, ssh_options=None,
-                         pretend=False, autoserv_verbose=False):
-    """Shell out to autoserv to run provisioning job.
-
-    @param provision_label: Label to provision the machine to.
-    @param host: Hostname of DUT.
-    @param autotest_path: Absolute path of autotest directory.
-    @param results_directory: Absolute path of directory to store results in.
-                              (results will be stored in subdirectory of this).
-    @param fast_mode: bool to use fast mode (disables slow autotest features).
-    @param ssh_verbosity: SSH verbosity level, passed along to autoserv_utils
-    @param ssh_options: Additional ssh options to be passed to autoserv_utils
-    @param pretend: If True, will print out autoserv commands rather than
-                    running them.
-    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
-
-    @returns: Absolute path of directory where results were stored.
-
-    """
-    # TODO(fdeng): When running against a local DUT, autoserv
-    # is still hitting the AFE in the lab.
-    # provision_AutoUpdate checks the current build of DUT by
-    # retrieving build info from AFE. crosbug.com/295178
-    results_directory = os.path.join(results_directory, 'results-provision')
-    command = autoserv_utils.autoserv_run_job_command(
-            os.path.join(autotest_path, 'server'),
-            machines=host, job=None, verbose=autoserv_verbose,
-            results_directory=results_directory,
-            fast_mode=fast_mode, ssh_verbosity=ssh_verbosity,
-            ssh_options=ssh_options,
-            extra_args=['--provision', '--job-labels', provision_label],
-            no_console_prefix=True)
-    if _run_autoserv(command, pretend) != 0:
-        raise TestThatProvisioningError('Command returns non-zero code: %s ' %
-                                        command)
-    return results_directory
-
-
-def run_job(job, host, autotest_path, results_directory, fast_mode,
-            id_digits=1, ssh_verbosity=0, ssh_options=None,
-            args=None, pretend=False,
-            autoserv_verbose=False):
-    """
-    Shell out to autoserv to run an individual test job.
-
-    @param job: A Job object containing the control file contents and other
-                relevent metadata for this test.
-    @param host: Hostname of DUT to run test against.
-    @param autotest_path: Absolute path of autotest directory.
-    @param results_directory: Absolute path of directory to store results in.
-                              (results will be stored in subdirectory of this).
-    @param fast_mode: bool to use fast mode (disables slow autotest features).
-    @param id_digits: The minimum number of digits that job ids should be
-                      0-padded to when formatting as a string for results
-                      directory.
-    @param ssh_verbosity: SSH verbosity level, passed along to autoserv_utils
-    @param ssh_options: Additional ssh options to be passed to autoserv_utils
-    @param args: String that should be passed as args parameter to autoserv,
-                 and then ultimitely to test itself.
-    @param pretend: If True, will print out autoserv commands rather than
-                    running them.
-    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
-    @returns: a tuple, return code of the job and absolute path of directory
-              where results were stored.
-    """
-    with tempfile.NamedTemporaryFile() as temp_file:
-        temp_file.write(job.control_file)
-        temp_file.flush()
-        name_tail = job.name.split('/')[-1]
-        results_directory = os.path.join(results_directory,
-                                         'results-%0*d-%s' % (id_digits, job.id,
-                                                              name_tail))
-        # Drop experimental keyval in the keval file in the job result folder.
-        os.makedirs(results_directory)
-        utils.write_keyval(results_directory,
-                           {constants.JOB_EXPERIMENTAL_KEY: job.keyvals[
-                                   constants.JOB_EXPERIMENTAL_KEY]})
-        extra_args = [temp_file.name]
-        if args:
-            extra_args.extend(['--args', args])
-
-        command = autoserv_utils.autoserv_run_job_command(
-                os.path.join(autotest_path, 'server'),
-                machines=host, job=job, verbose=autoserv_verbose,
-                results_directory=results_directory,
-                fast_mode=fast_mode, ssh_verbosity=ssh_verbosity,
-                ssh_options=ssh_options,
-                extra_args=extra_args,
-                no_console_prefix=True,
-                use_packaging=False)
-
-        code = _run_autoserv(command, pretend)
-        return code, results_directory
-
-
-def setup_local_afe():
-    """
-    Setup a local afe database and return a direct_afe object to access it.
-
-    @returns: A autotest_lib.frontend.afe.direct_afe instance.
-    """
-    # This import statement is delayed until now rather than running at
-    # module load time, because it kicks off a local sqlite :memory: backed
-    # database, and we don't need that unless we are doing a local run.
-    from autotest_lib.frontend import setup_django_lite_environment
-    from autotest_lib.frontend.afe import direct_afe
-    return direct_afe.directAFE()
-
-
-def get_predicate_for_test_arg(test):
-    """
-    Gets a suite predicte function for a given command-line argument.
-
-    @param test: String. An individual TEST command line argument, e.g.
-                         'login_CryptohomeMounted' or 'suite:smoke'
-    @returns: A (predicate, string) tuple with the necessary suite
-              predicate, and a description string of the suite that
-              this predicate will produce.
-    """
-    suitematch = re.match(_SUITE_REGEX, test)
-    name_pattern_match = re.match(r'e:(.*)', test)
-    file_pattern_match = re.match(r'f:(.*)', test)
-    if suitematch:
-        suitename = suitematch.group(1)
-        return (suite.Suite.name_in_tag_predicate(suitename),
-                'suite named %s' % suitename)
-    if name_pattern_match:
-        pattern = '^%s$' % name_pattern_match.group(1)
-        return (suite.Suite.test_name_matches_pattern_predicate(pattern),
-                'suite to match name pattern %s' % pattern)
-    if file_pattern_match:
-        pattern = '^%s$' % file_pattern_match.group(1)
-        return (suite.Suite.test_file_matches_pattern_predicate(pattern),
-                'suite to match file name pattern %s' % pattern)
-    return (suite.Suite.test_name_equals_predicate(test),
-            'job named %s' % test)
-
-
-def get_predicate_for_possible_test_arg(test):
-    """
-    Gets a suite predicte function to calculate the similarity of given test
-    and possible tests.
-
-    @param test: String. An individual TEST command line argument, e.g.
-                         'login_CryptohomeMounted' or 'suite:smoke'
-    @returns: A (predicate, string) tuple with the necessary suite
-              predicate, and a description string of the suite that
-              this predicate will produce.
-    """
-    suitematch = re.match(_SUITE_REGEX, test)
-    name_pattern_match = re.match(r'e:(.*)', test)
-    file_pattern_match = re.match(r'f:(.*)', test)
-    if suitematch:
-        suitename = suitematch.group(1)
-        return (suite.Suite.name_in_tag_similarity_predicate(suitename),
-                'suite name similar to %s' % suitename)
-    if name_pattern_match:
-        pattern = '^%s$' % name_pattern_match.group(1)
-        return (suite.Suite.test_name_similarity_predicate(pattern),
-                'job name similar to %s' % pattern)
-    if file_pattern_match:
-        pattern = '^%s$' % file_pattern_match.group(1)
-        return (suite.Suite.test_file_similarity_predicate(pattern),
-                'suite to match file name similar to %s' % pattern)
-    return (suite.Suite.test_name_similarity_predicate(test),
-            'job name similar to %s' % test)
-
-
-def _add_ssh_identity(temp_directory, ssh_private_key=_TEST_KEY_PATH):
-    """Add an ssh identity to the agent.
-
-    @param temp_directory: A directory to copy the |private key| into.
-    @param ssh_private_key: Path to the ssh private key to use for testing.
-    """
-    # Add the testing key to the current ssh agent.
-    if os.environ.has_key('SSH_AGENT_PID'):
-        # Copy the testing key to the temp directory and make it NOT
-        # world-readable. Otherwise, ssh-add complains.
-        shutil.copy(ssh_private_key, temp_directory)
-        key_copy_path = os.path.join(temp_directory,
-                                     os.path.basename(ssh_private_key))
-        os.chmod(key_copy_path, stat.S_IRUSR | stat.S_IWUSR)
-        p = subprocess.Popen(['ssh-add', key_copy_path],
-                             stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
-        p_out, _ = p.communicate()
-        for line in p_out.splitlines():
-            logging.info(line)
-    else:
-        logging.warning('There appears to be no running ssh-agent. Attempting '
-                        'to continue without running ssh-add, but ssh commands '
-                        'may fail.')
 
 
 def _get_board_from_host(remote):
@@ -372,139 +48,12 @@
     try:
         board = host.get_board().replace(constants.BOARD_PREFIX, '')
     except error.AutoservRunError:
-        raise TestThatRunError('Cannot determine board, please specify '
-                               'a --board option.')
+        raise test_runner_utils.TestThatRunError(
+                'Cannot determine board, please specify a --board option.')
     logging.info('Detected host board: %s', board)
     return board
 
 
-def _auto_detect_labels(afe, remote):
-    """Automatically detect host labels and add them to the host in afe.
-
-    Note that the label of board will not be auto-detected.
-    This method assumes the host |remote| has already been added to afe.
-
-    @param afe: A direct_afe object used to interact with local afe database.
-    @param remote: The hostname of the remote device.
-
-    """
-    cros_host = factory.create_host(remote)
-    labels_to_create = [label for label in cros_host.get_labels()
-                        if not label.startswith(constants.BOARD_PREFIX)]
-    labels_to_add_to_afe_host = []
-    for label in labels_to_create:
-        new_label = afe.create_label(label)
-        labels_to_add_to_afe_host.append(new_label.name)
-    hosts = afe.get_hosts(hostname=remote)
-    if not hosts:
-        raise TestThatRunError('Unexpected error: %s has not '
-                               'been added to afe.' % remote)
-    afe_host = hosts[0]
-    afe_host.add_labels(labels_to_add_to_afe_host)
-
-
-def perform_local_run(afe, autotest_path, tests, remote, fast_mode,
-                      build=_NO_BUILD, board=_NO_BOARD, args=None,
-                      pretend=False, no_experimental=False,
-                      ignore_deps=True,
-                      results_directory=None, ssh_verbosity=0,
-                      ssh_options=None,
-                      autoserv_verbose=False,
-                      iterations=1):
-    """Perform local run of tests.
-
-    This method enforces satisfaction of test dependencies for tests that are
-    run as a part of a suite.
-
-    @param afe: A direct_afe object used to interact with local afe database.
-    @param autotest_path: Absolute path of autotest installed in sysroot or
-                          custom autotest path set by --autotest_dir.
-    @param tests: List of strings naming tests and suites to run. Suite strings
-                  should be formed like "suite:smoke".
-    @param remote: Remote hostname.
-    @param fast_mode: bool to use fast mode (disables slow autotest features).
-    @param build: String specifying build for local run.
-    @param board: String specifyinb board for local run.
-    @param args: String that should be passed as args parameter to autoserv,
-                 and then ultimitely to test itself.
-    @param pretend: If True, will print out autoserv commands rather than
-                    running them.
-    @param no_experimental: Skip experimental tests when scheduling a suite.
-    @param ignore_deps: If True, test dependencies will be ignored.
-    @param results_directory: Directory to store results in. Defaults to None,
-                              in which case results will be stored in a new
-                              subdirectory of /tmp
-    @param ssh_verbosity: SSH verbosity level, passed through to
-                          autoserv_utils.
-    @param ssh_options: Additional ssh options to be passed to autoserv_utils
-    @param autoserv_verbose: If true, pass the --verbose flag to autoserv.
-    @param iterations: int number of times to schedule tests.
-
-    @returns: A list of return codes each job that has run.
-    """
-    # Create host in afe, add board and build labels.
-    cros_version_label = provision.cros_version_to_label(build)
-    build_label = afe.create_label(cros_version_label)
-    board_label = afe.create_label(constants.BOARD_PREFIX + board)
-    new_host = afe.create_host(remote)
-    new_host.add_labels([build_label.name, board_label.name])
-    if not ignore_deps:
-        logging.info('Auto-detecting labels for %s', remote)
-        _auto_detect_labels(afe, remote)
-    # Provision the host to |build|.
-    if build != _NO_BUILD:
-        logging.info('Provisioning %s...', cros_version_label)
-        try:
-            run_provisioning_job(cros_version_label, remote, autotest_path,
-                                 results_directory, fast_mode,
-                                 ssh_verbosity, ssh_options,
-                                 pretend, autoserv_verbose)
-        except TestThatProvisioningError as e:
-            logging.error('Provisioning %s to %s failed, tests are aborted, '
-                          'failure reason: %s',
-                          remote, cros_version_label, e)
-            return
-
-    # Create suites that will be scheduled.
-    suites_and_descriptions = []
-    for test in tests:
-        (predicate, description) = get_predicate_for_test_arg(test)
-        logging.info('Fetching suite for %s...', description)
-        suite = fetch_local_suite(autotest_path, predicate, afe, test_arg=test,
-                                  remote=remote,
-                                  build=build, board=board,
-                                  results_directory=results_directory,
-                                  no_experimental=no_experimental,
-                                  ignore_deps=ignore_deps)
-        suites_and_descriptions.append((suite, description))
-
-    # Schedule the suites, looping over iterations if necessary.
-    for iteration in range(iterations):
-        if iteration > 0:
-            logging.info('Repeating scheduling for iteration %d:', iteration)
-
-        for suite, description in suites_and_descriptions:
-            logging.info('Scheduling suite for %s...', description)
-            ntests = suite.schedule(
-                    lambda log_entry, log_in_subdir=False: None,
-                    add_experimental=not no_experimental)
-            logging.info('... scheduled %s job(s).', ntests)
-
-    if not afe.get_jobs():
-        logging.info('No jobs scheduled. End of local run.')
-        return
-
-    last_job_id = afe.get_jobs()[-1].id
-    job_id_digits = len(str(last_job_id))
-    codes = []
-    for job in afe.get_jobs():
-        code, _ = run_job(job, remote, autotest_path, results_directory, fast_mode,
-                job_id_digits, ssh_verbosity, ssh_options, args, pretend,
-                autoserv_verbose)
-        codes.append(code)
-    return codes
-
-
 def validate_arguments(arguments):
     """
     Validates parsed arguments.
@@ -527,7 +76,7 @@
                              'running against :lab:')
     else:
         if arguments.build is None:
-            arguments.build = _NO_BUILD
+            arguments.build = test_runner_utils.NO_BUILD
         if arguments.web:
             raise ValueError('--web flag not supported when running locally')
 
@@ -565,20 +114,14 @@
                              'package for the build specified with --build, '
                              'and the lab server code rather than local '
                              'changes.')
-    parser.add_argument('tests', nargs='+', metavar='TEST',
-                        help='Run given test(s). Use suite:SUITE to specify '
-                             'test suite. Use e:[NAME_PATTERN] to specify a '
-                             'NAME-matching regular expression. Use '
-                             'f:[FILE_PATTERN] to specify a filename matching '
-                             'regular expression. Specified regular '
-                             'expressions will be implicitly wrapped in '
-                             '^ and $.')
+    test_runner_utils.add_common_args(parser)
     default_board = cros_build_lib.GetDefaultBoard()
     parser.add_argument('-b', '--board', metavar='BOARD', default=default_board,
                         action='store',
                         help='Board for which the test will run. Default: %s' %
                              (default_board or 'Not configured'))
-    parser.add_argument('-i', '--build', metavar='BUILD', default=None,
+    parser.add_argument('-i', '--build', metavar='BUILD',
+                        default=test_runner_utils.NO_BUILD,
                         help='Build to test. Device will be reimaged if '
                              'necessary. Omit flag to skip reimage and test '
                              'against already installed DUT image. Examples: '
@@ -587,25 +130,9 @@
     parser.add_argument('-p', '--pool', metavar='POOL', default='suites',
                         help='Pool to use when running tests in the lab. '
                              'Default is "suites"')
-    parser.add_argument('--fast', action='store_true', dest='fast_mode',
-                        default=False,
-                        help='Enable fast mode.  This will cause test_that to '
-                             'skip time consuming steps like sysinfo and '
-                             'collecting crash information.')
-    parser.add_argument('--args', metavar='ARGS',
-                        help='Whitespace separated argument string to pass '
-                             'through to test. Only supported for runs '
-                             'against a local DUT.')
     parser.add_argument('--autotest_dir', metavar='AUTOTEST_DIR',
                         help='Use AUTOTEST_DIR instead of normal board sysroot '
                              'copy of autotest, and skip the quickmerge step.')
-    parser.add_argument('--results_dir', metavar='RESULTS_DIR', default=None,
-                        help='Instead of storing results in a new subdirectory'
-                             ' of /tmp , store results in RESULTS_DIR. If '
-                             'RESULTS_DIR already exists, it will be deleted.')
-    parser.add_argument('--pretend', action='store_true', default=False,
-                        help='Print autoserv commands that would be run, '
-                             'rather than running them.')
     parser.add_argument('--no-quickmerge', action='store_true', default=False,
                         dest='no_quickmerge',
                         help='Skip the quickmerge step and use the sysroot '
@@ -613,20 +140,11 @@
                              'source tree changes not being reflected in the '
                              'run. If using --autotest_dir, this flag is '
                              'automatically applied.')
-    parser.add_argument('--no-experimental', action='store_true',
-                        default=False, dest='no_experimental',
-                        help='When scheduling a suite, skip any tests marked '
-                             'as experimental. Applies only to tests scheduled'
-                             ' via suite:[SUITE].')
     parser.add_argument('--whitelist-chrome-crashes', action='store_true',
                         default=False, dest='whitelist_chrome_crashes',
                         help='Ignore chrome crashes when producing test '
                              'report. This flag gets passed along to the '
                              'report generation tool.')
-    parser.add_argument('--enforce-deps', action='store_true',
-                        default=False, dest='enforce_deps',
-                        help='Skip tests whose DEPENDENCIES can not '
-                             'be satisfied.')
     parser.add_argument('--ssh_verbosity', action='store', type=int,
                         choices=[0, 1, 2, 3], default=0,
                         help='Verbosity level for ssh, between 0 and 3 '
@@ -635,15 +153,8 @@
                         help='A string giving additional options to be '
                         'added to ssh commands.')
     parser.add_argument('--ssh_private_key', action='store',
-                        default=_TEST_KEY_PATH, help='Path to the private ssh '
-                        'key.')
-    parser.add_argument('--debug', action='store_true',
-                        help='Include DEBUG level messages in stdout. Note: '
-                             'these messages will be included in output log '
-                             'file regardless. In addition, turn on autoserv '
-                             'verbosity.')
-    parser.add_argument('--iterations', action='store', type=int, default=1,
-                        help='Number of times to run the tests specified.')
+                        default=test_runner_utils.TEST_KEY_PATH,
+                        help='Path to the private ssh key.')
     return parser.parse_args(argv), remote_argv
 
 
@@ -663,73 +174,7 @@
     return parser, remaining_argv
 
 
-def sigint_handler(signum, stack_frame):
-    #pylint: disable-msg=C0111
-    """Handle SIGINT or SIGTERM to a local test_that run.
-
-    This handler sends a SIGINT to the running autoserv process,
-    if one is running, giving it up to 5 seconds to clean up and exit. After
-    the timeout elapses, autoserv is killed. In either case, after autoserv
-    exits then this process exits with status 1.
-    """
-    # If multiple signals arrive before handler is unset, ignore duplicates
-    if not _sigint_handler_lock.acquire(False):
-        return
-    try:
-        # Ignore future signals by unsetting handler.
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
-        signal.signal(signal.SIGTERM, signal.SIG_IGN)
-
-        logging.warning('Received SIGINT or SIGTERM. Cleaning up and exiting.')
-        if _autoserv_proc:
-            logging.warning('Sending SIGINT to autoserv process. Waiting up '
-                            'to %s seconds for cleanup.',
-                            _AUTOSERV_SIGINT_TIMEOUT_SECONDS)
-            _autoserv_proc.send_signal(signal.SIGINT)
-            timed_out, _ = retry.timeout(_autoserv_proc.wait,
-                    timeout_sec=_AUTOSERV_SIGINT_TIMEOUT_SECONDS)
-            if timed_out:
-                _autoserv_proc.kill()
-                logging.warning('Timed out waiting for autoserv to handle '
-                                'SIGINT. Killed autoserv.')
-    finally:
-        _sigint_handler_lock.release() # this is not really necessary?
-        sys.exit(1)
-
-
-def _create_results_directory(results_directory=None):
-    """Create a results directory.
-
-    If no directory is specified this method will create and return a
-    temp directory to hold results. If a directory name is specified this
-    method will create a directory at the given path, provided it doesn't
-    already exist.
-
-    @param results_directory: The path to the results_directory to create.
-
-    @return results_directory: A path to the results_directory, ready for use.
-    """
-    if results_directory is None:
-        # Create a results_directory as subdir of /tmp
-        results_directory = tempfile.mkdtemp(prefix='test_that_results_')
-    else:
-        # Delete results_directory if it already exists.
-        try:
-            shutil.rmtree(results_directory)
-        except OSError as e:
-            if e.errno != errno.ENOENT:
-                raise
-
-        # Create results_directory if it does not exist
-        try:
-            os.makedirs(results_directory)
-        except OSError as e:
-            if e.errno != errno.EEXIST:
-                raise
-    return results_directory
-
-
-def _perform_bootstrap_into_autotest_root(arguments, autotest_path, argv):
+def perform_bootstrap_into_autotest_root(arguments, autotest_path, argv):
     """
     Perfoms a bootstrap to run test_that from the |autotest_path|.
 
@@ -740,7 +185,7 @@
     in |autotest_path|.
 
     @param arguments: A parsed arguments object, as returned from
-                      parse_arguments(...).
+                      test_that.parse_arguments(...).
     @param autotest_path: Full absolute path to the autotest root directory.
     @param argv: The arguments list, as passed to main(...)
 
@@ -763,16 +208,18 @@
             logging.info('quickmerge| %s', message.strip())
         return_code = s.wait()
         if return_code:
-            raise TestThatRunError('autotest_quickmerge failed with error code'
-                                   ' %s.' % return_code)
+            raise test_runner_utils.TestThatRunError(
+                    'autotest_quickmerge failed with error code %s.' %
+                    return_code)
 
     logging.info('Re-running test_that script in %s copy of autotest.',
                  autotest_path)
     script_command = os.path.join(autotest_path, 'site_utils',
-                                  os.path.basename(__file__))
+                                  'test_that.py')
     if not os.path.exists(script_command):
-        raise TestThatRunError('Unable to bootstrap to autotest root, '
-                               '%s not found.' % script_command)
+        raise test_runner_utils.TestThatRunError(
+            'Unable to bootstrap to autotest root, %s not found.' %
+            script_command)
     proc = None
     def resend_sig(signum, stack_frame):
         #pylint: disable-msg=C0111
@@ -786,82 +233,6 @@
     return proc.wait()
 
 
-def _perform_run_from_autotest_root(arguments, autotest_path, argv):
-    """
-    Perform a test_that run, from the |autotest_path|.
-
-    This function is to be called from test_that's main() script, when
-    test_that is executed from the |autotest_path|. It handles all stages
-    of a test_that run that come after the bootstrap into |autotest_path|.
-
-    @param arguments: A parsed arguments object, as returned from
-                      parse_arguments(...).
-    @param autotest_path: Full absolute path to the autotest root directory.
-    @param argv: The arguments list, as passed to main(...)
-
-    @returns: A return code that test_that should exit with.
-    """
-    results_directory = arguments.results_dir
-    if results_directory is None or not os.path.exists(results_directory):
-        raise ValueError('Expected valid results directory, got %s' %
-                          results_directory)
-
-    logging_manager.configure_logging(
-            server_logging_config.ServerLoggingConfig(),
-            results_dir=results_directory,
-            use_console=True,
-            verbose=arguments.debug,
-            debug_log_name='test_that')
-    logging.info('Began logging to %s', results_directory)
-
-    logging.debug('test_that command line was: %s', argv)
-
-    signal.signal(signal.SIGINT, sigint_handler)
-    signal.signal(signal.SIGTERM, sigint_handler)
-
-    afe = setup_local_afe()
-    codes = perform_local_run(afe, autotest_path, arguments.tests,
-                      arguments.remote, arguments.fast_mode,
-                      arguments.build, arguments.board,
-                      args=arguments.args,
-                      pretend=arguments.pretend,
-                      no_experimental=arguments.no_experimental,
-                      ignore_deps=not arguments.enforce_deps,
-                      results_directory=results_directory,
-                      ssh_verbosity=arguments.ssh_verbosity,
-                      ssh_options=arguments.ssh_options,
-                      autoserv_verbose=arguments.debug,
-                      iterations=arguments.iterations)
-    if arguments.pretend:
-        logging.info('Finished pretend run. Exiting.')
-        return 0
-
-    test_report_command = [_TEST_REPORT_SCRIPTNAME]
-    # Experimental test results do not influence the exit code.
-    test_report_command.append('--ignore_experimental_tests')
-    if arguments.whitelist_chrome_crashes:
-        test_report_command.append('--whitelist_chrome_crashes')
-    test_report_command.append(results_directory)
-    final_result = subprocess.call(test_report_command)
-    with open(os.path.join(results_directory, 'test_report.log'),
-              'w') as report_log:
-        subprocess.call(test_report_command, stdout=report_log)
-    try:
-        os.unlink(_LATEST_RESULTS_DIRECTORY)
-    except OSError:
-        pass
-    link_target = os.path.relpath(results_directory,
-                                  os.path.dirname(_LATEST_RESULTS_DIRECTORY))
-    os.symlink(link_target, _LATEST_RESULTS_DIRECTORY)
-    logging.info('Finished running tests. Results can be found in %s or %s',
-                 results_directory, _LATEST_RESULTS_DIRECTORY)
-    if any(codes):
-        logging.error("Autoserv encountered unexpected errors "
-                      "when executing jobs.")
-        final_result = final_result or 1
-    return final_result
-
-
 def _main_for_local_run(argv, arguments):
     """
     Effective entry point for local test_that runs.
@@ -873,8 +244,10 @@
         print >> sys.stderr, 'For local runs, script must be run inside chroot.'
         return 1
 
-    results_directory = _create_results_directory(arguments.results_dir)
-    _add_ssh_identity(results_directory, arguments.ssh_private_key)
+    results_directory = test_runner_utils.create_results_directory(
+            arguments.results_dir)
+    test_runner_utils.add_ssh_identity(results_directory,
+                                       arguments.ssh_private_key)
     arguments.results_dir = results_directory
 
     # If the board has not been specified through --board, and is not set in the
@@ -915,11 +288,19 @@
     # a quickmerge if necessary and then re-execute
     # the sysroot version of script with the same arguments.
     if os.path.dirname(realpath) != site_utils_path:
-        return _perform_bootstrap_into_autotest_root(
+        return perform_bootstrap_into_autotest_root(
                 arguments, autotest_path, argv)
     else:
-        return _perform_run_from_autotest_root(
-                arguments, autotest_path, argv)
+        return test_runner_utils.perform_run_from_autotest_root(
+                autotest_path, argv, arguments.tests, arguments.remote,
+                build=arguments.build, board=arguments.board,
+                args=arguments.args, ignore_deps=not arguments.enforce_deps,
+                results_directory=results_directory,
+                ssh_verbosity=arguments.ssh_verbosity,
+                ssh_options=arguments.ssh_options,
+                iterations=arguments.iterations,
+                fast_mode=arguments.fast_mode, debug=arguments.debug,
+                whitelist_chrome_crashes=arguments.whitelist_chrome_crashes)
 
 
 def _main_for_lab_run(argv, arguments):
diff --git a/site_utils/test_that_unittest.py b/site_utils/test_that_unittest.py
index cdc797c..b46fca3 100755
--- a/site_utils/test_that_unittest.py
+++ b/site_utils/test_that_unittest.py
@@ -4,49 +4,11 @@
 # found in the LICENSE file.
 # pylint: disable-msg=C0111
 
-import os, unittest
-import mox
+import unittest
 import common
-import subprocess
-import types
-from autotest_lib.server import utils
-from autotest_lib.server.cros.dynamic_suite import constants
 from autotest_lib.site_utils import test_that
 
 
-class StartsWithList(mox.Comparator):
-    def __init__(self, start_of_list):
-        """Mox comparator which returns True if the argument
-        to the mocked function is a list that begins with the elements
-        in start_of_list.
-        """
-        self._lhs = start_of_list
-
-    def equals(self, rhs):
-        if len(rhs)<len(self._lhs):
-            return False
-        for (x, y) in zip(self._lhs, rhs):
-            if x != y:
-                return False
-        return True
-
-
-class ContainsSublist(mox.Comparator):
-    def __init__(self, sublist):
-        """Mox comparator which returns True if the argument
-        to the mocked function is a list that contains sublist
-        as a sub-list.
-        """
-        self._sublist = sublist
-
-    def equals(self, rhs):
-        n = len(self._sublist)
-        if len(rhs)<n:
-            return False
-        return any((self._sublist == rhs[i:i+n])
-                   for i in xrange(len(rhs) - n + 1))
-
-
 class TestThatUnittests(unittest.TestCase):
     def test_validate_arguments(self):
         # Deferred until validate_arguments allows for lab runs.
@@ -104,177 +66,6 @@
         # Deferred until fetch_local_suite knows about non-local builds.
         pass
 
-    def test_get_predicate_for_test_arg(self):
-        # Assert the type signature of get_predicate_for_test(...)
-        # Because control.test_that_wrapper calls this function,
-        # it is imperative for backwards compatilbility that
-        # the return type of the tested function does not change.
-        tests = ['dummy_test', 'e:name_expression', 'f:expression',
-                 'suite:suitename']
-        for test in tests:
-            pred, desc = test_that.get_predicate_for_test_arg(test)
-            self.assertTrue(isinstance(pred, types.FunctionType))
-            self.assertTrue(isinstance(desc, str))
-
-    def test_run_job(self):
-        class Object():
-            pass
-
-        autotest_path = 'htap_tsetotua'
-        autoserv_command = os.path.join(autotest_path, 'server', 'autoserv')
-        remote = 'etomer'
-        results_dir = '/tmp/fakeresults'
-        fast_mode = False
-        job1_results_dir = '/tmp/fakeresults/results-1-gilbert'
-        job2_results_dir = '/tmp/fakeresults/results-2-sullivan'
-        args = 'matey'
-        expected_args_sublist = ['--args', args]
-        experimental_keyval = {constants.JOB_EXPERIMENTAL_KEY: False}
-        self.mox = mox.Mox()
-
-        # Create some dummy job objects.
-        job1 = Object()
-        job2 = Object()
-        setattr(job1, 'control_type', 'cLiEnT')
-        setattr(job1, 'control_file', 'c1')
-        setattr(job1, 'id', 1)
-        setattr(job1, 'name', 'gilbert')
-        setattr(job1, 'keyvals', experimental_keyval)
-
-        setattr(job2, 'control_type', 'Server')
-        setattr(job2, 'control_file', 'c2')
-        setattr(job2, 'id', 2)
-        setattr(job2, 'name', 'sullivan')
-        setattr(job2, 'keyvals', experimental_keyval)
-
-        id_digits = 1
-
-        # Stub out subprocess.Popen and wait calls.
-        # Make them expect correct arguments.
-        def fake_readline():
-            return b''
-        mock_process_1 = self.mox.CreateMock(subprocess.Popen)
-        mock_process_2 = self.mox.CreateMock(subprocess.Popen)
-        fake_stdout = self.mox.CreateMock(file)
-        fake_returncode = 0
-        mock_process_1.stdout = fake_stdout
-        mock_process_1.returncode = fake_returncode
-        mock_process_2.stdout = fake_stdout
-        mock_process_2.returncode = fake_returncode
-
-        self.mox.StubOutWithMock(os, 'makedirs')
-        self.mox.StubOutWithMock(utils, 'write_keyval')
-        self.mox.StubOutWithMock(subprocess, 'Popen')
-
-        os.makedirs(job1_results_dir)
-        utils.write_keyval(job1_results_dir, experimental_keyval)
-        arglist_1 = [autoserv_command, '-p', '-r', job1_results_dir,
-                     '-m', remote, '--no_console_prefix', '-l', 'gilbert',
-                     '-c']
-        subprocess.Popen(mox.And(StartsWithList(arglist_1),
-                                 ContainsSublist(expected_args_sublist)),
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT
-                        ).AndReturn(mock_process_1)
-        mock_process_1.stdout.readline().AndReturn(b'')
-        mock_process_1.wait().AndReturn(0)
-
-        os.makedirs(job2_results_dir)
-        utils.write_keyval(job2_results_dir, experimental_keyval)
-        arglist_2 = [autoserv_command, '-p', '-r', job2_results_dir,
-                     '-m', remote,  '--no_console_prefix', '-l', 'sullivan',
-                     '-s']
-        subprocess.Popen(mox.And(StartsWithList(arglist_2),
-                                 ContainsSublist(expected_args_sublist)),
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT
-                        ).AndReturn(mock_process_2)
-        mock_process_2.stdout.readline().AndReturn(b'')
-        mock_process_2.wait().AndReturn(0)
-
-        # Test run_job.
-        self.mox.ReplayAll()
-        code, job_res = test_that.run_job(job1, remote, autotest_path, results_dir,
-                                    fast_mode, id_digits, 0, None, args)
-        self.assertEqual(job_res, job1_results_dir)
-        self.assertEqual(code, 0)
-        code, job_res = test_that.run_job(job2, remote, autotest_path, results_dir,
-                                    fast_mode, id_digits, 0, None, args)
-
-        self.assertEqual(job_res, job2_results_dir)
-        self.assertEqual(code, 0)
-        self.mox.UnsetStubs()
-        self.mox.VerifyAll()
-        self.mox.ResetAll()
-
-
-    def test_perform_local_run(self):
-        afe = test_that.setup_local_afe()
-        autotest_path = 'ottotest_path'
-        suite_name = 'sweet_name'
-        test_arg = 'suite:' + suite_name
-        remote = 'remoat'
-        build = 'bild'
-        board = 'bored'
-        fast_mode = False
-        suite_control_files = ['c1', 'c2', 'c3', 'c4']
-        results_dir = '/tmp/test_that_results_fake'
-        id_digits = 1
-        ssh_verbosity = 2
-        ssh_options = '-F /dev/null -i /dev/null'
-        args = 'matey'
-        ignore_deps = False
-
-        # Fake suite objects that will be returned by fetch_local_suite
-        class fake_suite(object):
-            def __init__(self, suite_control_files, hosts):
-                self._suite_control_files = suite_control_files
-                self._hosts = hosts
-
-            def schedule(self, *args, **kwargs):
-                for control_file in self._suite_control_files:
-                    afe.create_job(control_file, hosts=self._hosts)
-
-        # Mock out scheduling of suite and running of jobs.
-        self.mox = mox.Mox()
-
-        self.mox.StubOutWithMock(test_that, 'fetch_local_suite')
-        test_that.fetch_local_suite(autotest_path, mox.IgnoreArg(),
-                afe, test_arg=test_arg, remote=remote, build=build,
-                board=board, results_directory=results_dir,
-                no_experimental=False,
-                ignore_deps=ignore_deps
-                ).AndReturn(fake_suite(suite_control_files, [remote]))
-        self.mox.StubOutWithMock(test_that, 'run_job')
-        self.mox.StubOutWithMock(test_that, 'run_provisioning_job')
-        self.mox.StubOutWithMock(test_that, '_auto_detect_labels')
-
-        test_that._auto_detect_labels(afe, remote)
-        # Test perform_local_run. Enforce that run_provisioning_job,
-        # run_job and _auto_detect_labels are called correctly.
-        test_that.run_provisioning_job(
-                'cros-version:' + build, remote, autotest_path,
-                 results_dir, fast_mode,
-                 ssh_verbosity, ssh_options,
-                 False, False)
-
-        for control_file in suite_control_files:
-            test_that.run_job(mox.ContainsAttributeValue('control_file',
-                                                         control_file),
-                             remote, autotest_path, results_dir, fast_mode,
-                             id_digits, ssh_verbosity, ssh_options,
-                             args, False, False).AndReturn((0, '/fake/dir'))
-        self.mox.ReplayAll()
-        test_that.perform_local_run(afe, autotest_path, ['suite:'+suite_name],
-                                    remote, fast_mode, build=build, board=board,
-                                    ignore_deps=False,
-                                    ssh_verbosity=ssh_verbosity,
-                                    ssh_options=ssh_options,
-                                    args=args,
-                                    results_directory=results_dir)
-        self.mox.UnsetStubs()
-        self.mox.VerifyAll()
-
 
 if __name__ == '__main__':
     unittest.main()