This depends on Martin's "run autoserv without a results dir" patch
to be applied cleanly.


A rather large refactoring and stubbing change working towards
client-side profilers from the server side. This makes a few major
changes:
  - refactors almost all of client/bin/profilers.py into a common lib
    profiler_manager class; except for the code for actually loading
    a profiler object, all of that code is already completely generic
  - add a server-side profiler_proxy class that will act as a proxy
    on the server for using a client-side profiler, this doesn't
    actually do anything useful right now but it basically just a
    stub for later changes
  - add a server-side profiler_manager implementation that creates a
    profiler_proxy class instead of actually loading a real profiler

The intended changes still in the pipeline that will build on this are:
  - add code to the profiler_proxy for actually making sure the
    profiler is set up and installed on the remote host(s)
  - add a mechanism for client-side profilers to deal with reboots

Risk: Medium
Visibility: Adds a bunch of stubs that don't actually do anything yet
but will do things soon.

Signed-off-by: John Admanski <[email protected]>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@2447 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/autotest.py b/server/autotest.py
index 6910f99..ba45f86 100644
--- a/server/autotest.py
+++ b/server/autotest.py
@@ -1,7 +1,7 @@
 # Copyright 2007 Google Inc. Released under the GPL v2
 
 import re, os, sys, traceback, subprocess, tempfile, shutil, time, pickle
-from autotest_lib.server import installable_object, utils, server_job
+from autotest_lib.server import installable_object, utils
 from autotest_lib.client.common_lib import log, error, debug
 from autotest_lib.client.common_lib import global_config, packages
 
@@ -251,7 +251,7 @@
         try:
             atrun.execute_control(timeout=timeout)
         finally:
-            collector = server_job.log_collector(host, atrun.tag, results_dir)
+            collector = log_collector(host, atrun.tag, results_dir)
             collector.collect_client_job_results()
             self._process_client_state_file(host, atrun, results_dir)
 
@@ -436,8 +436,7 @@
         section = 0
         start_time = time.time()
 
-        client_logger = server_job.client_logger(self.host, self.tag,
-                                                 self.results_dir)
+        logger = client_logger(self.host, self.tag, self.results_dir)
         try:
             while not timeout or time.time() < start_time + timeout:
                 if timeout:
@@ -445,7 +444,7 @@
                 else:
                     section_timeout = None
                 last = self.execute_section(section, section_timeout,
-                                            client_logger)
+                                            logger)
                 section += 1
                 if re.match(r'^END .*\t----\t----\t.*$', last):
                     print "Client complete"
@@ -454,8 +453,8 @@
                     try:
                         self._wait_for_reboot()
                     except error.AutotestRunError, e:
-                        job.record("ABORT", None, "reboot", str(e))
-                        job.record("END ABORT", None, None, str(e))
+                        self.host.job.record("ABORT", None, "reboot", str(e))
+                        self.host.job.record("END ABORT", None, None, str(e))
                         raise
                     continue
 
@@ -469,7 +468,7 @@
                        "client: %s\n") % last
                 raise error.AutotestRunError(msg)
         finally:
-            client_logger.close()
+            logger.close()
 
         # should only get here if we timed out
         assert timeout
@@ -498,6 +497,248 @@
     raise error.AutotestRunError("Cannot figure out autotest directory")
 
 
+class log_collector(object):
+    def __init__(self, host, client_tag, results_dir):
+        self.host = host
+        if not client_tag:
+            client_tag = "default"
+        self.client_results_dir = os.path.join(host.get_autodir(), "results",
+                                               client_tag)
+        self.server_results_dir = results_dir
+
+
+    def collect_client_job_results(self):
+        """ A method that collects all the current results of a running
+        client job into the results dir. By default does nothing as no
+        client job is running, but when running a client job you can override
+        this with something that will actually do something. """
+
+        # make an effort to wait for the machine to come up
+        try:
+            self.host.wait_up(timeout=30)
+        except error.AutoservError:
+            # don't worry about any errors, we'll try and
+            # get the results anyway
+            pass
+
+
+        # Copy all dirs in default to results_dir
+        try:
+            keyval_path = self._prepare_for_copying_logs()
+            self.host.get_file(self.client_results_dir + '/',
+                               self.server_results_dir)
+            self._process_copied_logs(keyval_path)
+            self._postprocess_copied_logs()
+        except Exception:
+            # well, don't stop running just because we couldn't get logs
+            print "Unexpected error copying test result logs, continuing ..."
+            traceback.print_exc(file=sys.stdout)
+
+
+    def _prepare_for_copying_logs(self):
+        server_keyval = os.path.join(self.server_results_dir, 'keyval')
+        if not os.path.exists(server_keyval):
+            # Client-side keyval file can be copied directly
+            return
+
+        # Copy client-side keyval to temporary location
+        suffix = '.keyval_%s' % self.host.hostname
+        fd, keyval_path = tempfile.mkstemp(suffix)
+        os.close(fd)
+        try:
+            client_keyval = os.path.join(self.client_results_dir, 'keyval')
+            try:
+                self.host.get_file(client_keyval, keyval_path)
+            finally:
+                # We will squirrel away the client side keyval
+                # away and move it back when we are done
+                remote_temp_dir = self.host.get_tmp_dir()
+                self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
+                self.host.run('mv %s %s' % (client_keyval,
+                                            self.temp_keyval_path))
+        except (error.AutoservRunError, error.AutoservSSHTimeout):
+            print "Prepare for copying logs failed"
+        return keyval_path
+
+
+    def _process_copied_logs(self, keyval_path):
+        if not keyval_path:
+            # Client-side keyval file was copied directly
+            return
+
+        # Append contents of keyval_<host> file to keyval file
+        try:
+            # Read in new and old keyval files
+            new_keyval = utils.read_keyval(keyval_path)
+            old_keyval = utils.read_keyval(self.server_results_dir)
+            # 'Delete' from new keyval entries that are in both
+            tmp_keyval = {}
+            for key, val in new_keyval.iteritems():
+                if key not in old_keyval:
+                    tmp_keyval[key] = val
+            # Append new info to keyval file
+            utils.write_keyval(self.server_results_dir, tmp_keyval)
+            # Delete keyval_<host> file
+            os.remove(keyval_path)
+        except IOError:
+            print "Process copied logs failed"
+
+
+    def _postprocess_copied_logs(self):
+        # we can now put our keyval file back
+        client_keyval = os.path.join(self.client_results_dir, 'keyval')
+        try:
+            self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
+        except Exception:
+            pass
+
+
+
+# a file-like object for catching stderr from an autotest client and
+# extracting status logs from it
+class client_logger(object):
+    """Partial file object to write to both stdout and
+    the status log file.  We only implement those methods
+    utils.run() actually calls.
+
+    Note that this class is fairly closely coupled with server_job, as it
+    uses special job._ methods to actually carry out the loggging.
+    """
+    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
+    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
+    extract_indent = re.compile(r"^(\t*).*$")
+
+    def __init__(self, host, tag, server_results_dir):
+        self.host = host
+        self.job = host.job
+        self.log_collector = log_collector(host, tag, server_results_dir)
+        self.leftover = ""
+        self.last_line = ""
+        self.logs = {}
+
+
+    def _process_log_dict(self, log_dict):
+        log_list = log_dict.pop("logs", [])
+        for key in sorted(log_dict.iterkeys()):
+            log_list += self._process_log_dict(log_dict.pop(key))
+        return log_list
+
+
+    def _process_logs(self):
+        """Go through the accumulated logs in self.log and print them
+        out to stdout and the status log. Note that this processes
+        logs in an ordering where:
+
+        1) logs to different tags are never interleaved
+        2) logs to x.y come before logs to x.y.z for all z
+        3) logs to x.y come before x.z whenever y < z
+
+        Note that this will in general not be the same as the
+        chronological ordering of the logs. However, if a chronological
+        ordering is desired that one can be reconstructed from the
+        status log by looking at timestamp lines."""
+        log_list = self._process_log_dict(self.logs)
+        for line in log_list:
+            self.job._record_prerendered(line + '\n')
+        if log_list:
+            self.last_line = log_list[-1]
+
+
+    def _process_quoted_line(self, tag, line):
+        """Process a line quoted with an AUTOTEST_STATUS flag. If the
+        tag is blank then we want to push out all the data we've been
+        building up in self.logs, and then the newest line. If the
+        tag is not blank, then push the line into the logs for handling
+        later."""
+        print line
+        if tag == "":
+            self._process_logs()
+            self.job._record_prerendered(line + '\n')
+            self.last_line = line
+        else:
+            tag_parts = [int(x) for x in tag.split(".")]
+            log_dict = self.logs
+            for part in tag_parts:
+                log_dict = log_dict.setdefault(part, {})
+            log_list = log_dict.setdefault("logs", [])
+            log_list.append(line)
+
+
+    def _process_line(self, line):
+        """Write out a line of data to the appropriate stream. Status
+        lines sent by autotest will be prepended with
+        "AUTOTEST_STATUS", and all other lines are ssh error
+        messages."""
+        status_match = self.status_parser.search(line)
+        test_complete_match = self.test_complete_parser.search(line)
+        if status_match:
+            tag, line = status_match.groups()
+            self._process_quoted_line(tag, line)
+        elif test_complete_match:
+            fifo_path, = test_complete_match.groups()
+            self.log_collector.collect_client_job_results()
+            self.host.run("echo A > %s" % fifo_path)
+        else:
+            print line
+
+
+    def _format_warnings(self, last_line, warnings):
+        # use the indentation of whatever the last log line was
+        indent = self.extract_indent.match(last_line).group(1)
+        # if the last line starts a new group, add an extra indent
+        if last_line.lstrip('\t').startswith("START\t"):
+            indent += '\t'
+        return [self.job._render_record("WARN", None, None, msg,
+                                        timestamp, indent).rstrip('\n')
+                for timestamp, msg in warnings]
+
+
+    def _process_warnings(self, last_line, log_dict, warnings):
+        if log_dict.keys() in ([], ["logs"]):
+            # there are no sub-jobs, just append the warnings here
+            warnings = self._format_warnings(last_line, warnings)
+            log_list = log_dict.setdefault("logs", [])
+            log_list += warnings
+            for warning in warnings:
+                sys.stdout.write(warning + '\n')
+        else:
+            # there are sub-jobs, so put the warnings in there
+            log_list = log_dict.get("logs", [])
+            if log_list:
+                last_line = log_list[-1]
+            for key in sorted(log_dict.iterkeys()):
+                if key != "logs":
+                    self._process_warnings(last_line,
+                                           log_dict[key],
+                                           warnings)
+
+
+    def write(self, data):
+        # first check for any new console warnings
+        warnings = self.job._read_warnings()
+        self._process_warnings(self.last_line, self.logs, warnings)
+        # now process the newest data written out
+        data = self.leftover + data
+        lines = data.split("\n")
+        # process every line but the last one
+        for line in lines[:-1]:
+            self._process_line(line)
+        # save the last line for later processing
+        # since we may not have the whole line yet
+        self.leftover = lines[-1]
+
+
+    def flush(self):
+        sys.stdout.flush()
+
+
+    def close(self):
+        if self.leftover:
+            self._process_line(self.leftover)
+        self._process_logs()
+        self.flush()
+
+
 # site_autotest.py may be non-existant or empty, make sure that an appropriate
 # SiteAutotest class is created nevertheless
 try:
diff --git a/server/autotest_unittest.py b/server/autotest_unittest.py
index c7247ff..619106d 100644
--- a/server/autotest_unittest.py
+++ b/server/autotest_unittest.py
@@ -46,7 +46,7 @@
         self.god.stub_function(autotest.global_config.global_config,
                                "get_config_value")
         self.god.stub_class(autotest, "_Run")
-        self.god.stub_class(server_job, "log_collector")
+        self.god.stub_class(autotest, "log_collector")
 
 
     def tearDown(self):
@@ -190,7 +190,7 @@
         os.path.abspath.expect_call('control').and_return('control')
         os.remove.expect_call("temp")
         run_obj.execute_control.expect_call(timeout=30)
-        collector = server_job.log_collector.expect_new(self.host, tag, '.')
+        collector = autotest.log_collector.expect_new(self.host, tag, '.')
         collector.collect_client_job_results.expect_call()
 
         autotest.open.expect_call('./control.None.autoserv.state').and_raises(
@@ -203,5 +203,63 @@
         self.god.check_playback()
 
 
+class CopyLogsTest(unittest.TestCase):
+    def setUp(self):
+        self.god = mock.mock_god()
+
+        self.host = self.god.create_mock_class(hosts.RemoteHost, "host")
+        self.host.hostname = "testhost"
+
+        self.god.stub_function(os.path, "exists")
+        self.god.stub_function(os, "close")
+        self.god.stub_function(os, "remove")
+        self.god.stub_function(tempfile, "mkstemp")
+        self.god.stub_function(utils, "read_keyval")
+        self.god.stub_function(utils, "write_keyval")
+
+
+    def tearDown(self):
+        self.god.unstub_all()
+
+
+    def test_prepare_for_copying_logs(self):
+        self.host.get_autodir.expect_call().and_return("/autodir")
+        collector = autotest.log_collector(self.host, None, "/resultsdir")
+        self.god.check_playback()
+
+        os.path.exists.expect_call("/resultsdir/keyval").and_return(True)
+        tempfile.mkstemp.expect_call(".keyval_testhost").and_return(
+            (10, "tmp.keyval_testhost"))
+        os.close.expect_call(10)
+        self.host.get_file.expect_call("/autodir/results/default/keyval",
+                                       "tmp.keyval_testhost")
+        self.host.get_tmp_dir.expect_call().and_return("/autotmp")
+        self.host.run.expect_call(
+            "mv /autodir/results/default/keyval /autotmp/keyval")
+
+        # run and check
+        keyval = collector._prepare_for_copying_logs()
+        self.assertEquals(keyval, "tmp.keyval_testhost")
+        self.god.check_playback()
+
+
+    def test_process_copied_logs(self):
+        self.host.get_autodir.expect_call().and_return("/autodir")
+        collector = autotest.log_collector(self.host, None, "/resultsdir")
+        self.god.check_playback()
+
+        utils.read_keyval.expect_call("tmp.keyval_testhost").and_return(
+            {"field1": "new thing", "field3": "other new thing"})
+        utils.read_keyval.expect_call("/resultsdir").and_return(
+            {"field1": "thing", "field2": "otherthing"})
+        utils.write_keyval.expect_call("/resultsdir",
+                                       {"field3": "other new thing"})
+        os.remove.expect_call("tmp.keyval_testhost")
+
+        # run and check
+        collector._process_copied_logs("tmp.keyval_testhost")
+        self.god.check_playback()
+
+
 if __name__ == "__main__":
     unittest.main()
diff --git a/server/profiler.py b/server/profiler.py
new file mode 100644
index 0000000..a23c655
--- /dev/null
+++ b/server/profiler.py
@@ -0,0 +1,84 @@
+import itertools
+from autotest_lib.server import autotest
+
+
+
+def get_unpassable_types(arg):
+    """ Given an argument, returns a set of types contained in arg that are
+    unpassable. If arg is an atomic type (e.g. int) it either returns an
+    empty set (if the type is passable) or a singleton of the type (if the
+    type is not passable). """
+    if isinstance(arg, (basestring, int, long)):
+        return set()
+    elif isinstance(arg, (list, tuple, set, frozenset, dict)):
+        if isinstance(arg, dict):
+            # keys and values must both be passable
+            parts = itertools.chain(arg.iterkeys(), arg.itervalues())
+        else:
+            # for all other containers we just iterate
+            parts = iter(arg)
+        types = set()
+        for part in parts:
+            types |= get_unpassable_types(arg)
+        return types
+    else:
+        return set([type(arg)])
+
+
+def validate_args(args):
+    """ Validates arguments. Lists and dictionaries are valid argument types,
+    so you can pass *args and **dargs in directly, rather than having to
+    iterate over them yourself. """
+    unpassable_types = get_unpassable_types(args)
+    if unpassable_types:
+        msg = "arguments of type '%s' cannot be passed to remote profilers"
+        msg %= ", ".join(t.__name__ for t in unpassable_types)
+        raise TypeError(msg)
+
+
+class profiler_proxy(object):
+    """ This is a server-side class that acts as a proxy to a real client-side
+    profiler class."""
+
+    def __init__(self, job, profiler_name):
+        self.job = job
+        self.name = profiler_name
+        self.installed_hosts = {}
+
+
+    def _install(self):
+        """ Install autotest on any current job hosts. """
+        current_job_hosts = self.job.hosts
+        current_profiler_hosts = set(self.installed_hosts.keys())
+        # install autotest on any new hosts in job.hosts
+        for host in current_job_hosts - current_profiler_hosts:
+            tmp_dir = host.get_tmp_dir(parent="/tmp/profilers")
+            at = autotest.Autotest(host)
+            at.install(autodir=tmp_dir)
+            self.installed_hosts[host] = at
+        # drop any installs from hosts no longer in job.hosts
+        for host in current_profiler_hosts - current_job_hosts:
+            del self.installed_hosts[host]
+
+
+    def setup(self, *args, **dargs):
+        validate_args(args)
+        validate_args(dargs)
+        self._install()
+
+
+    def initialize(self, *args, **dargs):
+        validate_args(args)
+        validate_args(dargs)
+
+
+    def start(self, test):
+        pass
+
+
+    def stop(self, test):
+        pass
+
+
+    def report(self, test):
+        pass
diff --git a/server/profilers.py b/server/profilers.py
new file mode 100644
index 0000000..62122ae
--- /dev/null
+++ b/server/profilers.py
@@ -0,0 +1,13 @@
+import os, sys
+import common
+
+from autotest_lib.client.common_lib import utils, packages, profiler_manager
+from autotest_lib.server import profiler
+
+
+class profilers(profiler_manager.profiler_manager):
+    def load_profiler(self, profiler_name, args, dargs):
+        newprofiler = profiler.profiler_proxy(self.job, profiler_name)
+        newprofiler.initialize(*args, **dargs)
+        newprofiler.setup(*args, **dargs) # lazy setup is done client-side
+        return newprofiler
diff --git a/server/server_job.py b/server/server_job.py
index cf404ad..9ddf7b9 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -10,7 +10,7 @@
 import shutil, warnings
 from autotest_lib.client.bin import fd_stack, sysinfo
 from autotest_lib.client.common_lib import error, log, utils, packages
-from autotest_lib.server import test, subcommand
+from autotest_lib.server import test, subcommand, profilers
 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
 
 
@@ -128,6 +128,7 @@
 
         if resultdir:
             self.sysinfo = sysinfo.sysinfo(self.resultdir)
+        self.profilers = profilers.profilers(self)
 
         if not os.access(self.tmpdir, os.W_OK):
             try:
@@ -802,245 +803,6 @@
             print >> sys.stderr, msg
 
 
-
-class log_collector(object):
-    def __init__(self, host, client_tag, results_dir):
-        self.host = host
-        if not client_tag:
-            client_tag = "default"
-        self.client_results_dir = os.path.join(host.get_autodir(), "results",
-                                               client_tag)
-        self.server_results_dir = results_dir
-
-
-    def collect_client_job_results(self):
-        """ A method that collects all the current results of a running
-        client job into the results dir. By default does nothing as no
-        client job is running, but when running a client job you can override
-        this with something that will actually do something. """
-
-        # make an effort to wait for the machine to come up
-        try:
-            self.host.wait_up(timeout=30)
-        except error.AutoservError:
-            # don't worry about any errors, we'll try and
-            # get the results anyway
-            pass
-
-
-        # Copy all dirs in default to results_dir
-        try:
-            keyval_path = self._prepare_for_copying_logs()
-            self.host.get_file(self.client_results_dir + '/',
-                               self.server_results_dir)
-            self._process_copied_logs(keyval_path)
-            self._postprocess_copied_logs()
-        except Exception:
-            # well, don't stop running just because we couldn't get logs
-            print "Unexpected error copying test result logs, continuing ..."
-            traceback.print_exc(file=sys.stdout)
-
-
-    def _prepare_for_copying_logs(self):
-        server_keyval = os.path.join(self.server_results_dir, 'keyval')
-        if not os.path.exists(server_keyval):
-            # Client-side keyval file can be copied directly
-            return
-
-        # Copy client-side keyval to temporary location
-        suffix = '.keyval_%s' % self.host.hostname
-        fd, keyval_path = tempfile.mkstemp(suffix)
-        os.close(fd)
-        try:
-            client_keyval = os.path.join(self.client_results_dir, 'keyval')
-            try:
-                self.host.get_file(client_keyval, keyval_path)
-            finally:
-                # We will squirrel away the client side keyval
-                # away and move it back when we are done
-                remote_temp_dir = self.host.get_tmp_dir()
-                self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
-                self.host.run('mv %s %s' % (client_keyval,
-                                            self.temp_keyval_path))
-        except (error.AutoservRunError, error.AutoservSSHTimeout):
-            print "Prepare for copying logs failed"
-        return keyval_path
-
-
-    def _process_copied_logs(self, keyval_path):
-        if not keyval_path:
-            # Client-side keyval file was copied directly
-            return
-
-        # Append contents of keyval_<host> file to keyval file
-        try:
-            # Read in new and old keyval files
-            new_keyval = utils.read_keyval(keyval_path)
-            old_keyval = utils.read_keyval(self.server_results_dir)
-            # 'Delete' from new keyval entries that are in both
-            tmp_keyval = {}
-            for key, val in new_keyval.iteritems():
-                if key not in old_keyval:
-                    tmp_keyval[key] = val
-            # Append new info to keyval file
-            utils.write_keyval(self.server_results_dir, tmp_keyval)
-            # Delete keyval_<host> file
-            os.remove(keyval_path)
-        except IOError:
-            print "Process copied logs failed"
-
-
-    def _postprocess_copied_logs(self):
-        # we can now put our keyval file back
-        client_keyval = os.path.join(self.client_results_dir, 'keyval')
-        try:
-            self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
-        except Exception:
-            pass
-
-
-# a file-like object for catching stderr from an autotest client and
-# extracting status logs from it
-class client_logger(object):
-    """Partial file object to write to both stdout and
-    the status log file.  We only implement those methods
-    utils.run() actually calls.
-    """
-    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
-    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
-    extract_indent = re.compile(r"^(\t*).*$")
-
-    def __init__(self, host, tag, server_results_dir):
-        self.host = host
-        self.job = host.job
-        self.log_collector = log_collector(host, tag, server_results_dir)
-        self.leftover = ""
-        self.last_line = ""
-        self.logs = {}
-
-
-    def _process_log_dict(self, log_dict):
-        log_list = log_dict.pop("logs", [])
-        for key in sorted(log_dict.iterkeys()):
-            log_list += self._process_log_dict(log_dict.pop(key))
-        return log_list
-
-
-    def _process_logs(self):
-        """Go through the accumulated logs in self.log and print them
-        out to stdout and the status log. Note that this processes
-        logs in an ordering where:
-
-        1) logs to different tags are never interleaved
-        2) logs to x.y come before logs to x.y.z for all z
-        3) logs to x.y come before x.z whenever y < z
-
-        Note that this will in general not be the same as the
-        chronological ordering of the logs. However, if a chronological
-        ordering is desired that one can be reconstructed from the
-        status log by looking at timestamp lines."""
-        log_list = self._process_log_dict(self.logs)
-        for line in log_list:
-            self.job._record_prerendered(line + '\n')
-        if log_list:
-            self.last_line = log_list[-1]
-
-
-    def _process_quoted_line(self, tag, line):
-        """Process a line quoted with an AUTOTEST_STATUS flag. If the
-        tag is blank then we want to push out all the data we've been
-        building up in self.logs, and then the newest line. If the
-        tag is not blank, then push the line into the logs for handling
-        later."""
-        print line
-        if tag == "":
-            self._process_logs()
-            self.job._record_prerendered(line + '\n')
-            self.last_line = line
-        else:
-            tag_parts = [int(x) for x in tag.split(".")]
-            log_dict = self.logs
-            for part in tag_parts:
-                log_dict = log_dict.setdefault(part, {})
-            log_list = log_dict.setdefault("logs", [])
-            log_list.append(line)
-
-
-    def _process_line(self, line):
-        """Write out a line of data to the appropriate stream. Status
-        lines sent by autotest will be prepended with
-        "AUTOTEST_STATUS", and all other lines are ssh error
-        messages."""
-        status_match = self.status_parser.search(line)
-        test_complete_match = self.test_complete_parser.search(line)
-        if status_match:
-            tag, line = status_match.groups()
-            self._process_quoted_line(tag, line)
-        elif test_complete_match:
-            fifo_path, = test_complete_match.groups()
-            self.log_collector.collect_client_job_results()
-            self.host.run("echo A > %s" % fifo_path)
-        else:
-            print line
-
-
-    def _format_warnings(self, last_line, warnings):
-        # use the indentation of whatever the last log line was
-        indent = self.extract_indent.match(last_line).group(1)
-        # if the last line starts a new group, add an extra indent
-        if last_line.lstrip('\t').startswith("START\t"):
-            indent += '\t'
-        return [self.job._render_record("WARN", None, None, msg,
-                                        timestamp, indent).rstrip('\n')
-                for timestamp, msg in warnings]
-
-
-    def _process_warnings(self, last_line, log_dict, warnings):
-        if log_dict.keys() in ([], ["logs"]):
-            # there are no sub-jobs, just append the warnings here
-            warnings = self._format_warnings(last_line, warnings)
-            log_list = log_dict.setdefault("logs", [])
-            log_list += warnings
-            for warning in warnings:
-                sys.stdout.write(warning + '\n')
-        else:
-            # there are sub-jobs, so put the warnings in there
-            log_list = log_dict.get("logs", [])
-            if log_list:
-                last_line = log_list[-1]
-            for key in sorted(log_dict.iterkeys()):
-                if key != "logs":
-                    self._process_warnings(last_line,
-                                           log_dict[key],
-                                           warnings)
-
-
-    def write(self, data):
-        # first check for any new console warnings
-        warnings = self.job._read_warnings()
-        self._process_warnings(self.last_line, self.logs, warnings)
-        # now process the newest data written out
-        data = self.leftover + data
-        lines = data.split("\n")
-        # process every line but the last one
-        for line in lines[:-1]:
-            self._process_line(line)
-        # save the last line for later processing
-        # since we may not have the whole line yet
-        self.leftover = lines[-1]
-
-
-    def flush(self):
-        sys.stdout.flush()
-
-
-    def close(self):
-        if self.leftover:
-            self._process_line(self.leftover)
-        self._process_logs()
-        self.flush()
-
-
 # site_server_job.py may be non-existant or empty, make sure that an
 # appropriate site_server_job class is created nevertheless
 try:
diff --git a/server/server_job_unittest.py b/server/server_job_unittest.py
index 96af5c6..15f952d 100644
--- a/server/server_job_unittest.py
+++ b/server/server_job_unittest.py
@@ -198,10 +198,6 @@
                      'ssh_user' : self.job.ssh_user, \
                      'ssh_port' : self.job.ssh_port, \
                      'ssh_pass' : self.job.ssh_pass}
-        os.path.exists.expect_call(
-                server_job.SITE_VERIFY_CONTROL_FILE).and_return(True)
-        self.job._execute_code.expect_call(server_job.SITE_VERIFY_CONTROL_FILE,
-                                           namespace, protect=False)
         self.job._execute_code.expect_call(server_job.VERIFY_CONTROL_FILE,
                                            namespace, protect=False)
 
@@ -221,17 +217,9 @@
         repair_namespace = verify_namespace.copy()
         repair_namespace['protection_level'] = host_protections.default
 
-        os.path.exists.expect_call(
-                server_job.SITE_REPAIR_CONTROL_FILE).and_return(True)
-        self.job._execute_code.expect_call(server_job.SITE_REPAIR_CONTROL_FILE,
-                                           repair_namespace, protect=False)
         self.job._execute_code.expect_call(server_job.REPAIR_CONTROL_FILE,
                                            repair_namespace, protect=False)
 
-        os.path.exists.expect_call(
-                server_job.SITE_VERIFY_CONTROL_FILE).and_return(True)
-        self.job._execute_code.expect_call(server_job.SITE_VERIFY_CONTROL_FILE,
-                                           verify_namespace, protect=False)
         self.job._execute_code.expect_call(server_job.VERIFY_CONTROL_FILE,
                                            verify_namespace, protect=False)
 
@@ -450,63 +438,5 @@
         self.god.check_playback()
 
 
-class CopyLogsTest(unittest.TestCase):
-    def setUp(self):
-        self.god = mock.mock_god()
-
-        self.host = self.god.create_mock_class(hosts.RemoteHost, "host")
-        self.host.hostname = "testhost"
-
-        self.god.stub_function(os.path, "exists")
-        self.god.stub_function(os, "close")
-        self.god.stub_function(os, "remove")
-        self.god.stub_function(tempfile, "mkstemp")
-        self.god.stub_function(utils, "read_keyval")
-        self.god.stub_function(utils, "write_keyval")
-
-
-    def tearDown(self):
-        self.god.unstub_all()
-
-
-    def test_prepare_for_copying_logs(self):
-        self.host.get_autodir.expect_call().and_return("/autodir")
-        collector = server_job.log_collector(self.host, None, "/resultsdir")
-        self.god.check_playback()
-
-        os.path.exists.expect_call("/resultsdir/keyval").and_return(True)
-        tempfile.mkstemp.expect_call(".keyval_testhost").and_return(
-            (10, "tmp.keyval_testhost"))
-        os.close.expect_call(10)
-        self.host.get_file.expect_call("/autodir/results/default/keyval",
-                                       "tmp.keyval_testhost")
-        self.host.get_tmp_dir.expect_call().and_return("/autotmp")
-        self.host.run.expect_call(
-            "mv /autodir/results/default/keyval /autotmp/keyval")
-
-        # run and check
-        keyval = collector._prepare_for_copying_logs()
-        self.assertEquals(keyval, "tmp.keyval_testhost")
-        self.god.check_playback()
-
-
-    def test_process_copied_logs(self):
-        self.host.get_autodir.expect_call().and_return("/autodir")
-        collector = server_job.log_collector(self.host, None, "/resultsdir")
-        self.god.check_playback()
-
-        utils.read_keyval.expect_call("tmp.keyval_testhost").and_return(
-            {"field1": "new thing", "field3": "other new thing"})
-        utils.read_keyval.expect_call("/resultsdir").and_return(
-            {"field1": "thing", "field2": "otherthing"})
-        utils.write_keyval.expect_call("/resultsdir",
-                                       {"field3": "other new thing"})
-        os.remove.expect_call("tmp.keyval_testhost")
-
-        # run and check
-        collector._process_copied_logs("tmp.keyval_testhost")
-        self.god.check_playback()
-
-
 if __name__ == "__main__":
     unittest.main()