Now that we've finally figured out how to properly create site_*
classes and work them into the inheritance hierarchy without creating
circular dependencies or three separate files, apply this technique
to the worst offender, server_job.

Risk: Medium
Visibility: Internal code change only.

Signed-off-by: John Admanski <[email protected]>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1978 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/server_job.py b/server/server_job.py
index 1e20622..fed468f 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -11,8 +11,684 @@
 Andy Whitcroft <[email protected]>
 """
 
-import re, sys
-from autotest_lib.server import base_server_job
+import os, sys, re, time, select, subprocess, traceback
+
+from autotest_lib.client.bin import fd_stack
+from autotest_lib.client.common_lib import error, logging
+from autotest_lib.server import test, subcommand
+from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
+from autotest_lib.client.common_lib import utils, packages
+
+
+# load up a control segment
+# these are all stored in <server_dir>/control_segments
+def load_control_segment(name):
+    server_dir = os.path.dirname(os.path.abspath(__file__))
+    script_file = os.path.join(server_dir, "control_segments", name)
+    if os.path.exists(script_file):
+        return file(script_file).read()
+    else:
+        return ""
+
+
+preamble = """\
+import os, sys
+
+from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
+from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
+from autotest_lib.server import git_kernel
+from autotest_lib.server.subcommand import *
+from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
+from autotest_lib.server.utils import parse_machine
+from autotest_lib.client.common_lib.error import *
+from autotest_lib.client.common_lib import barrier
+
+autotest.Autotest.job = job
+hosts.SSHHost.job = job
+barrier = barrier.barrier
+if len(machines) > 1:
+        open('.machines', 'w').write('\\n'.join(machines) + '\\n')
+"""
+
+client_wrapper = """
+at = autotest.Autotest()
+
+def run_client(machine):
+        hostname, user, password, port = parse_machine(machine,
+                ssh_user, ssh_port, ssh_pass)
+
+        host = hosts.SSHHost(hostname, user, port, password=password)
+        at.run(control, host=host)
+
+job.parallel_simple(run_client, machines)
+"""
+
+crashdumps = """
+def crashdumps(machine):
+        hostname, user, password, port = parse_machine(machine,
+                ssh_user, ssh_port, ssh_pass)
+
+        host = hosts.SSHHost(hostname, user, port, initialize=False, \
+            password=password)
+        host.get_crashdumps(test_start_time)
+
+job.parallel_simple(crashdumps, machines, log=False)
+"""
+
+reboot_segment="""\
+def reboot(machine):
+        hostname, user, password, port = parse_machine(machine,
+                ssh_user, ssh_port, ssh_pass)
+
+        host = hosts.SSHHost(hostname, user, port, initialize=False, \
+            password=password)
+        host.reboot()
+
+job.parallel_simple(reboot, machines, log=False)
+"""
+
+install="""\
+def install(machine):
+        hostname, user, password, port = parse_machine(machine,
+                ssh_user, ssh_port, ssh_pass)
+
+        host = hosts.SSHHost(hostname, user, port, initialize=False, \
+            password=password)
+        host.machine_install()
+
+job.parallel_simple(install, machines, log=False)
+"""
+
+# load up the verifier control segment, with an optional site-specific hook
+verify = load_control_segment("site_verify")
+verify += load_control_segment("verify")
+
+# load up the repair control segment, with an optional site-specific hook
+repair = load_control_segment("site_repair")
+repair += load_control_segment("repair")
+
+
+# load up site-specific code for generating site-specific job data
+try:
+    import site_job
+    get_site_job_data = site_job.get_site_job_data
+    del site_job
+except ImportError:
+    # by default provide a stub that generates no site data
+    def get_site_job_data(job):
+        return {}
+
+
+class base_server_job(object):
+    """The actual job against which we do everything.
+
+    Properties:
+            autodir
+                    The top level autotest directory (/usr/local/autotest).
+            serverdir
+                    <autodir>/server/
+            clientdir
+                    <autodir>/client/
+            conmuxdir
+                    <autodir>/conmux/
+            testdir
+                    <autodir>/server/tests/
+            site_testdir
+                    <autodir>/server/site_tests/
+            control
+                    the control file for this job
+    """
+
+    STATUS_VERSION = 1
+
+
+    def __init__(self, control, args, resultdir, label, user, machines,
+                 client=False, parse_job='',
+                 ssh_user='root', ssh_port=22, ssh_pass=''):
+        """
+                control
+                        The control file (pathname of)
+                args
+                        args to pass to the control file
+                resultdir
+                        where to throw the results
+                label
+                        label for the job
+                user
+                        Username for the job (email address)
+                client
+                        True if a client-side control file
+        """
+        path = os.path.dirname(__file__)
+        self.autodir = os.path.abspath(os.path.join(path, '..'))
+        self.serverdir = os.path.join(self.autodir, 'server')
+        self.testdir   = os.path.join(self.serverdir, 'tests')
+        self.site_testdir = os.path.join(self.serverdir, 'site_tests')
+        self.tmpdir    = os.path.join(self.serverdir, 'tmp')
+        self.conmuxdir = os.path.join(self.autodir, 'conmux')
+        self.clientdir = os.path.join(self.autodir, 'client')
+        self.toolsdir = os.path.join(self.autodir, 'client/tools')
+        if control:
+            self.control = open(control, 'r').read()
+            self.control = re.sub('\r', '', self.control)
+        else:
+            self.control = None
+        self.resultdir = resultdir
+        if not os.path.exists(resultdir):
+            os.mkdir(resultdir)
+        self.debugdir = os.path.join(resultdir, 'debug')
+        if not os.path.exists(self.debugdir):
+            os.mkdir(self.debugdir)
+        self.status = os.path.join(resultdir, 'status')
+        self.label = label
+        self.user = user
+        self.args = args
+        self.machines = machines
+        self.client = client
+        self.record_prefix = ''
+        self.warning_loggers = set()
+        self.ssh_user = ssh_user
+        self.ssh_port = ssh_port
+        self.ssh_pass = ssh_pass
+
+        self.stdout = fd_stack.fd_stack(1, sys.stdout)
+        self.stderr = fd_stack.fd_stack(2, sys.stderr)
+
+        if os.path.exists(self.status):
+            os.unlink(self.status)
+        job_data = {'label' : label, 'user' : user,
+                    'hostname' : ','.join(machines),
+                    'status_version' : str(self.STATUS_VERSION)}
+        job_data.update(get_site_job_data(self))
+        utils.write_keyval(self.resultdir, job_data)
+
+        self.parse_job = parse_job
+        if self.parse_job and len(machines) == 1:
+            self.using_parser = True
+            self.init_parser(resultdir)
+        else:
+            self.using_parser = False
+        self.pkgmgr = packages.PackageManager(
+            self.autodir, run_function_dargs={'timeout':600})
+        self.pkgdir = os.path.join(self.autodir, 'packages')
+
+
+    def init_parser(self, resultdir):
+        """Start the continuous parsing of resultdir. This sets up
+        the database connection and inserts the basic job object into
+        the database if necessary."""
+        # redirect parser debugging to .parse.log
+        parse_log = os.path.join(resultdir, '.parse.log')
+        parse_log = open(parse_log, 'w', 0)
+        tko_utils.redirect_parser_debugging(parse_log)
+        # create a job model object and set up the db
+        self.results_db = tko_db.db(autocommit=True)
+        self.parser = status_lib.parser(self.STATUS_VERSION)
+        self.job_model = self.parser.make_job(resultdir)
+        self.parser.start(self.job_model)
+        # check if a job already exists in the db and insert it if
+        # it does not
+        job_idx = self.results_db.find_job(self.parse_job)
+        if job_idx is None:
+            self.results_db.insert_job(self.parse_job,
+                                       self.job_model)
+        else:
+            machine_idx = self.results_db.lookup_machine(
+                self.job_model.machine)
+            self.job_model.index = job_idx
+            self.job_model.machine_idx = machine_idx
+
+
+    def cleanup_parser(self):
+        """This should be called after the server job is finished
+        to carry out any remaining cleanup (e.g. flushing any
+        remaining test results to the results db)"""
+        if not self.using_parser:
+            return
+        final_tests = self.parser.end()
+        for test in final_tests:
+            self.__insert_test(test)
+        self.using_parser = False
+
+
+    def verify(self):
+        if not self.machines:
+            raise error.AutoservError(
+                'No machines specified to verify')
+        try:
+            namespace = {'machines' : self.machines, 'job' : self, \
+                                     'ssh_user' : self.ssh_user, \
+                                     'ssh_port' : self.ssh_port, \
+                                     'ssh_pass' : self.ssh_pass}
+            self._execute_code(preamble + verify, namespace, namespace)
+        except Exception, e:
+            msg = ('Verify failed\n' + str(e) + '\n'
+                    + traceback.format_exc())
+            self.record('ABORT', None, None, msg)
+            raise
+
+
+    def repair(self, host_protection):
+        if not self.machines:
+            raise error.AutoservError('No machines specified to repair')
+        namespace = {'machines': self.machines, 'job': self,
+                     'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
+                     'ssh_pass': self.ssh_pass,
+                     'protection_level': host_protection}
+        # no matter what happens during repair, go on to try to reverify
+        try:
+            self._execute_code(preamble + repair, namespace, namespace)
+        except Exception, exc:
+            print 'Exception occured during repair'
+            traceback.print_exc()
+        self.verify()
+
+
+    def precheck(self):
+        """
+        perform any additional checks in derived classes.
+        """
+        pass
+
+
+    def enable_external_logging(self):
+        """Start or restart external logging mechanism.
+        """
+        pass
+
+
+    def disable_external_logging(self):
+        """ Pause or stop external logging mechanism.
+        """
+        pass
+
+
+    def use_external_logging(self):
+        """Return True if external logging should be used.
+        """
+        return False
+
+
+    def parallel_simple(self, function, machines, log=True, timeout=None):
+        """Run 'function' using parallel_simple, with an extra
+        wrapper to handle the necessary setup for continuous parsing,
+        if possible. If continuous parsing is already properly
+        initialized then this should just work."""
+        is_forking = not (len(machines) == 1 and
+                          self.machines == machines)
+        if self.parse_job and is_forking:
+            def wrapper(machine):
+                self.parse_job += "/" + machine
+                self.using_parser = True
+                self.machines = [machine]
+                self.resultdir = os.path.join(self.resultdir,
+                                              machine)
+                self.init_parser(self.resultdir)
+                result = function(machine)
+                self.cleanup_parser()
+                return result
+        elif len(machines) > 1:
+            def wrapper(machine):
+                self.resultdir = os.path.join(self.resultdir, machine)
+                result = function(machine)
+                return result
+        else:
+            wrapper = function
+        subcommand.parallel_simple(wrapper, machines, log, timeout)
+
+
+    def run(self, reboot = False, install_before = False,
+            install_after = False, collect_crashdumps = True,
+            namespace = {}):
+        # use a copy so changes don't affect the original dictionary
+        namespace = namespace.copy()
+        machines = self.machines
+
+        self.aborted = False
+        namespace['machines'] = machines
+        namespace['args'] = self.args
+        namespace['job'] = self
+        namespace['ssh_user'] = self.ssh_user
+        namespace['ssh_port'] = self.ssh_port
+        namespace['ssh_pass'] = self.ssh_pass
+        test_start_time = int(time.time())
+
+        os.chdir(self.resultdir)
+
+        self.enable_external_logging()
+        status_log = os.path.join(self.resultdir, 'status.log')
+        try:
+            if install_before and machines:
+                self._execute_code(preamble + install, namespace, namespace)
+            if self.client:
+                namespace['control'] = self.control
+                open('control', 'w').write(self.control)
+                open('control.srv', 'w').write(client_wrapper)
+                server_control = client_wrapper
+            else:
+                open('control.srv', 'w').write(self.control)
+                server_control = self.control
+            self._execute_code(preamble + server_control, namespace,
+                                   namespace)
+
+        finally:
+            if machines and collect_crashdumps:
+                namespace['test_start_time'] = test_start_time
+                self._execute_code(preamble + crashdumps, namespace,
+                                       namespace)
+            self.disable_external_logging()
+            if reboot and machines:
+                self._execute_code(preamble + reboot_segment,namespace,
+                                       namespace)
+            if install_after and machines:
+                self._execute_code(preamble + install, namespace, namespace)
+
+
+    def run_test(self, url, *args, **dargs):
+        """Summon a test object and run it.
+
+        tag
+                tag to add to testname
+        url
+                url of the test to run
+        """
+
+        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
+        tag = None
+        subdir = testname
+
+        tag = dargs.pop('tag', None)
+        if tag:
+            subdir += '.' + tag
+
+        outputdir = os.path.join(self.resultdir, subdir)
+        if os.path.exists(outputdir):
+            msg = ("%s already exists, test <%s> may have"
+                   " already run with tag <%s>"
+                   % (outputdir, testname, tag) )
+            raise error.TestError(msg)
+        os.mkdir(outputdir)
+
+        def group_func():
+            try:
+                test.runtest(self, url, tag, args, dargs)
+            except error.TestBaseException, e:
+                self.record(e.exit_status, subdir, testname, str(e))
+                raise
+            except Exception, e:
+                info = str(e) + "\n" + traceback.format_exc()
+                self.record('FAIL', subdir, testname, info)
+                raise
+            else:
+                self.record('GOOD', subdir, testname,
+                            'completed successfully')
+        self._run_group(testname, subdir, group_func)
+
+
+    def _run_group(self, name, subdir, function, *args, **dargs):
+        """\
+        Underlying method for running something inside of a group.
+        """
+        result = None
+        old_record_prefix = self.record_prefix
+        try:
+            self.record('START', subdir, name)
+            self.record_prefix += '\t'
+            try:
+                result = function(*args, **dargs)
+            finally:
+                self.record_prefix = old_record_prefix
+        except error.TestBaseException, e:
+            self.record("END %s" % e.exit_status, subdir, name, str(e))
+        except Exception, e:
+            err_msg = str(e) + '\n'
+            err_msg += traceback.format_exc()
+            self.record('END ABORT', subdir, name, err_msg)
+            raise error.JobError(name + ' failed\n' + traceback.format_exc())
+        else:
+            self.record('END GOOD', subdir, name)
+
+        return result
+
+
+    def run_group(self, function, *args, **dargs):
+        """\
+        function:
+                subroutine to run
+        *args:
+                arguments for the function
+        """
+
+        name = function.__name__
+
+        # Allow the tag for the group to be specified.
+        tag = dargs.pop('tag', None)
+        if tag:
+            name = tag
+
+        return self._run_group(name, None, function, *args, **dargs)
+
+
+    def run_reboot(self, reboot_func, get_kernel_func):
+        """\
+        A specialization of run_group meant specifically for handling
+        a reboot. Includes support for capturing the kernel version
+        after the reboot.
+
+        reboot_func: a function that carries out the reboot
+
+        get_kernel_func: a function that returns a string
+        representing the kernel version.
+        """
+
+        old_record_prefix = self.record_prefix
+        try:
+            self.record('START', None, 'reboot')
+            self.record_prefix += '\t'
+            reboot_func()
+        except Exception, e:
+            self.record_prefix = old_record_prefix
+            err_msg = str(e) + '\n' + traceback.format_exc()
+            self.record('END FAIL', None, 'reboot', err_msg)
+        else:
+            kernel = get_kernel_func()
+            self.record_prefix = old_record_prefix
+            self.record('END GOOD', None, 'reboot',
+                        optional_fields={"kernel": kernel})
+
+
+    def record(self, status_code, subdir, operation, status='',
+               optional_fields=None):
+        """
+        Record job-level status
+
+        The intent is to make this file both machine parseable and
+        human readable. That involves a little more complexity, but
+        really isn't all that bad ;-)
+
+        Format is <status code>\t<subdir>\t<operation>\t<status>
+
+        status code: see common_lib.logging.is_valid_status()
+                     for valid status definition
+
+        subdir: MUST be a relevant subdirectory in the results,
+        or None, which will be represented as '----'
+
+        operation: description of what you ran (e.g. "dbench", or
+                                        "mkfs -t foobar /dev/sda9")
+
+        status: error message or "completed sucessfully"
+
+        ------------------------------------------------------------
+
+        Initial tabs indicate indent levels for grouping, and is
+        governed by self.record_prefix
+
+        multiline messages have secondary lines prefaced by a double
+        space ('  ')
+
+        Executing this method will trigger the logging of all new
+        warnings to date from the various console loggers.
+        """
+        # poll all our warning loggers for new warnings
+        warnings = self._read_warnings()
+        for timestamp, msg in warnings:
+            self._record("WARN", None, None, msg, timestamp)
+
+        # write out the actual status log line
+        self._record(status_code, subdir, operation, status,
+                      optional_fields=optional_fields)
+
+
+    def _read_warnings(self):
+        warnings = []
+        while True:
+            # pull in a line of output from every logger that has
+            # output ready to be read
+            loggers, _, _ = select.select(self.warning_loggers,
+                                          [], [], 0)
+            closed_loggers = set()
+            for logger in loggers:
+                line = logger.readline()
+                # record any broken pipes (aka line == empty)
+                if len(line) == 0:
+                    closed_loggers.add(logger)
+                    continue
+                timestamp, msg = line.split('\t', 1)
+                warnings.append((int(timestamp), msg.strip()))
+
+            # stop listening to loggers that are closed
+            self.warning_loggers -= closed_loggers
+
+            # stop if none of the loggers have any output left
+            if not loggers:
+                break
+
+        # sort into timestamp order
+        warnings.sort()
+        return warnings
+
+
+    def _render_record(self, status_code, subdir, operation, status='',
+                       epoch_time=None, record_prefix=None,
+                       optional_fields=None):
+        """
+        Internal Function to generate a record to be written into a
+        status log. For use by server_job.* classes only.
+        """
+        if subdir:
+            if re.match(r'[\n\t]', subdir):
+                raise ValueError(
+                    'Invalid character in subdir string')
+            substr = subdir
+        else:
+            substr = '----'
+
+        if not logging.is_valid_status(status_code):
+            raise ValueError('Invalid status code supplied: %s' %
+                             status_code)
+        if not operation:
+            operation = '----'
+        if re.match(r'[\n\t]', operation):
+            raise ValueError(
+                'Invalid character in operation string')
+        operation = operation.rstrip()
+        status = status.rstrip()
+        status = re.sub(r"\t", "  ", status)
+        # Ensure any continuation lines are marked so we can
+        # detect them in the status file to ensure it is parsable.
+        status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
+
+        if not optional_fields:
+            optional_fields = {}
+
+        # Generate timestamps for inclusion in the logs
+        if epoch_time is None:
+            epoch_time = int(time.time())
+        local_time = time.localtime(epoch_time)
+        optional_fields["timestamp"] = str(epoch_time)
+        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
+                                                     local_time)
+
+        fields = [status_code, substr, operation]
+        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
+        fields.append(status)
+
+        if record_prefix is None:
+            record_prefix = self.record_prefix
+
+        msg = '\t'.join(str(x) for x in fields)
+
+        return record_prefix + msg + '\n'
+
+
+    def _record_prerendered(self, msg):
+        """
+        Record a pre-rendered msg into the status logs. The only
+        change this makes to the message is to add on the local
+        indentation. Should not be called outside of server_job.*
+        classes. Unlike _record, this does not write the message
+        to standard output.
+        """
+        lines = []
+        status_file = os.path.join(self.resultdir, 'status.log')
+        status_log = open(status_file, 'a')
+        for line in msg.splitlines():
+            line = self.record_prefix + line + '\n'
+            lines.append(line)
+            status_log.write(line)
+        status_log.close()
+        self.__parse_status(lines)
+
+
+    def _execute_code(self, code, global_scope, local_scope):
+        exec(code, global_scope, local_scope)
+
+
+    def _record(self, status_code, subdir, operation, status='',
+                 epoch_time=None, optional_fields=None):
+        """
+        Actual function for recording a single line into the status
+        logs. Should never be called directly, only by job.record as
+        this would bypass the console monitor logging.
+        """
+
+        msg = self._render_record(status_code, subdir, operation,
+                                  status, epoch_time,
+                                  optional_fields=optional_fields)
+
+
+        status_file = os.path.join(self.resultdir, 'status.log')
+        sys.stdout.write(msg)
+        open(status_file, "a").write(msg)
+        if subdir:
+            test_dir = os.path.join(self.resultdir, subdir)
+            status_file = os.path.join(test_dir, 'status')
+            open(status_file, "a").write(msg)
+        self.__parse_status(msg.splitlines())
+
+
+    def __parse_status(self, new_lines):
+        if not self.using_parser:
+            return
+        new_tests = self.parser.process_lines(new_lines)
+        for test in new_tests:
+            self.__insert_test(test)
+
+
+    def __insert_test(self, test):
+        """ An internal method to insert a new test result into the
+        database. This method will not raise an exception, even if an
+        error occurs during the insert, to avoid failing a test
+        simply because of unexpected database issues."""
+        try:
+            self.results_db.insert_test(self.job_model, test)
+        except Exception:
+            msg = ("WARNING: An unexpected error occured while "
+                   "inserting test results into the database. "
+                   "Ignoring error.\n" + traceback.format_exc())
+            print >> sys.stderr, msg
+
 
 # a file-like object for catching stderr from an autotest client and
 # extracting status logs from it
@@ -153,8 +829,8 @@
 try:
     from autotest_lib.server.site_server_job import site_server_job
 except ImportError:
-    class site_server_job(base_server_job.base_server_job):
+    class site_server_job(object):
         pass
 
-class server_job(site_server_job):
+class server_job(site_server_job, base_server_job):
     pass