Now that we've finally figured out how to properly create site_*
classes and work them into the inheritance hierarchy without creating
circular dependencies or three separate files, apply this technique
to the worst offender, server_job.
Risk: Medium
Visibility: Internal code change only.
Signed-off-by: John Admanski <[email protected]>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1978 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/base_server_job.py b/server/base_server_job.py
deleted file mode 100644
index f502c65..0000000
--- a/server/base_server_job.py
+++ /dev/null
@@ -1,690 +0,0 @@
-"""
-The main job wrapper for the server side.
-
-This is the core infrastructure. Derived from the client side job.py
-
-Copyright Martin J. Bligh, Andy Whitcroft 2007
-"""
-
-__author__ = """
-Martin J. Bligh <[email protected]>
-Andy Whitcroft <[email protected]>
-"""
-
-import os, sys, re, time, select, subprocess, traceback
-
-from autotest_lib.client.bin import fd_stack
-from autotest_lib.client.common_lib import error, logging
-from autotest_lib.server import test, subcommand
-from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
-from autotest_lib.client.common_lib import utils, packages
-
-
-# load up a control segment
-# these are all stored in <server_dir>/control_segments
-def load_control_segment(name):
- server_dir = os.path.dirname(os.path.abspath(__file__))
- script_file = os.path.join(server_dir, "control_segments", name)
- if os.path.exists(script_file):
- return file(script_file).read()
- else:
- return ""
-
-
-preamble = """\
-import os, sys
-
-from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
-from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
-from autotest_lib.server import git_kernel
-from autotest_lib.server.subcommand import *
-from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
-from autotest_lib.server.utils import parse_machine
-from autotest_lib.client.common_lib.error import *
-from autotest_lib.client.common_lib import barrier
-
-autotest.Autotest.job = job
-hosts.SSHHost.job = job
-barrier = barrier.barrier
-if len(machines) > 1:
- open('.machines', 'w').write('\\n'.join(machines) + '\\n')
-"""
-
-client_wrapper = """
-at = autotest.Autotest()
-
-def run_client(machine):
- hostname, user, password, port = parse_machine(machine,
- ssh_user, ssh_port, ssh_pass)
-
- host = hosts.SSHHost(hostname, user, port, password=password)
- at.run(control, host=host)
-
-job.parallel_simple(run_client, machines)
-"""
-
-crashdumps = """
-def crashdumps(machine):
- hostname, user, password, port = parse_machine(machine,
- ssh_user, ssh_port, ssh_pass)
-
- host = hosts.SSHHost(hostname, user, port, initialize=False, \
- password=password)
- host.get_crashdumps(test_start_time)
-
-job.parallel_simple(crashdumps, machines, log=False)
-"""
-
-reboot_segment="""\
-def reboot(machine):
- hostname, user, password, port = parse_machine(machine,
- ssh_user, ssh_port, ssh_pass)
-
- host = hosts.SSHHost(hostname, user, port, initialize=False, \
- password=password)
- host.reboot()
-
-job.parallel_simple(reboot, machines, log=False)
-"""
-
-install="""\
-def install(machine):
- hostname, user, password, port = parse_machine(machine,
- ssh_user, ssh_port, ssh_pass)
-
- host = hosts.SSHHost(hostname, user, port, initialize=False, \
- password=password)
- host.machine_install()
-
-job.parallel_simple(install, machines, log=False)
-"""
-
-# load up the verifier control segment, with an optional site-specific hook
-verify = load_control_segment("site_verify")
-verify += load_control_segment("verify")
-
-# load up the repair control segment, with an optional site-specific hook
-repair = load_control_segment("site_repair")
-repair += load_control_segment("repair")
-
-
-# load up site-specific code for generating site-specific job data
-try:
- import site_job
- get_site_job_data = site_job.get_site_job_data
- del site_job
-except ImportError:
- # by default provide a stub that generates no site data
- def get_site_job_data(job):
- return {}
-
-
-class base_server_job(object):
- """The actual job against which we do everything.
-
- Properties:
- autodir
- The top level autotest directory (/usr/local/autotest).
- serverdir
- <autodir>/server/
- clientdir
- <autodir>/client/
- conmuxdir
- <autodir>/conmux/
- testdir
- <autodir>/server/tests/
- site_testdir
- <autodir>/server/site_tests/
- control
- the control file for this job
- """
-
- STATUS_VERSION = 1
-
-
- def __init__(self, control, args, resultdir, label, user, machines,
- client=False, parse_job='',
- ssh_user='root', ssh_port=22, ssh_pass=''):
- """
- control
- The control file (pathname of)
- args
- args to pass to the control file
- resultdir
- where to throw the results
- label
- label for the job
- user
- Username for the job (email address)
- client
- True if a client-side control file
- """
- path = os.path.dirname(__file__)
- self.autodir = os.path.abspath(os.path.join(path, '..'))
- self.serverdir = os.path.join(self.autodir, 'server')
- self.testdir = os.path.join(self.serverdir, 'tests')
- self.site_testdir = os.path.join(self.serverdir, 'site_tests')
- self.tmpdir = os.path.join(self.serverdir, 'tmp')
- self.conmuxdir = os.path.join(self.autodir, 'conmux')
- self.clientdir = os.path.join(self.autodir, 'client')
- self.toolsdir = os.path.join(self.autodir, 'client/tools')
- if control:
- self.control = open(control, 'r').read()
- self.control = re.sub('\r', '', self.control)
- else:
- self.control = None
- self.resultdir = resultdir
- if not os.path.exists(resultdir):
- os.mkdir(resultdir)
- self.debugdir = os.path.join(resultdir, 'debug')
- if not os.path.exists(self.debugdir):
- os.mkdir(self.debugdir)
- self.status = os.path.join(resultdir, 'status')
- self.label = label
- self.user = user
- self.args = args
- self.machines = machines
- self.client = client
- self.record_prefix = ''
- self.warning_loggers = set()
- self.ssh_user = ssh_user
- self.ssh_port = ssh_port
- self.ssh_pass = ssh_pass
-
- self.stdout = fd_stack.fd_stack(1, sys.stdout)
- self.stderr = fd_stack.fd_stack(2, sys.stderr)
-
- if os.path.exists(self.status):
- os.unlink(self.status)
- job_data = {'label' : label, 'user' : user,
- 'hostname' : ','.join(machines),
- 'status_version' : str(self.STATUS_VERSION)}
- job_data.update(get_site_job_data(self))
- utils.write_keyval(self.resultdir, job_data)
-
- self.parse_job = parse_job
- if self.parse_job and len(machines) == 1:
- self.using_parser = True
- self.init_parser(resultdir)
- else:
- self.using_parser = False
- self.pkgmgr = packages.PackageManager(
- self.autodir, run_function_dargs={'timeout':600})
- self.pkgdir = os.path.join(self.autodir, 'packages')
-
-
- def init_parser(self, resultdir):
- """Start the continuous parsing of resultdir. This sets up
- the database connection and inserts the basic job object into
- the database if necessary."""
- # redirect parser debugging to .parse.log
- parse_log = os.path.join(resultdir, '.parse.log')
- parse_log = open(parse_log, 'w', 0)
- tko_utils.redirect_parser_debugging(parse_log)
- # create a job model object and set up the db
- self.results_db = tko_db.db(autocommit=True)
- self.parser = status_lib.parser(self.STATUS_VERSION)
- self.job_model = self.parser.make_job(resultdir)
- self.parser.start(self.job_model)
- # check if a job already exists in the db and insert it if
- # it does not
- job_idx = self.results_db.find_job(self.parse_job)
- if job_idx is None:
- self.results_db.insert_job(self.parse_job,
- self.job_model)
- else:
- machine_idx = self.results_db.lookup_machine(
- self.job_model.machine)
- self.job_model.index = job_idx
- self.job_model.machine_idx = machine_idx
-
-
- def cleanup_parser(self):
- """This should be called after the server job is finished
- to carry out any remaining cleanup (e.g. flushing any
- remaining test results to the results db)"""
- if not self.using_parser:
- return
- final_tests = self.parser.end()
- for test in final_tests:
- self.__insert_test(test)
- self.using_parser = False
-
-
- def verify(self):
- if not self.machines:
- raise error.AutoservError(
- 'No machines specified to verify')
- try:
- namespace = {'machines' : self.machines, 'job' : self, \
- 'ssh_user' : self.ssh_user, \
- 'ssh_port' : self.ssh_port, \
- 'ssh_pass' : self.ssh_pass}
- self._execute_code(preamble + verify, namespace, namespace)
- except Exception, e:
- msg = ('Verify failed\n' + str(e) + '\n'
- + traceback.format_exc())
- self.record('ABORT', None, None, msg)
- raise
-
-
- def repair(self, host_protection):
- if not self.machines:
- raise error.AutoservError('No machines specified to repair')
- namespace = {'machines': self.machines, 'job': self,
- 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
- 'ssh_pass': self.ssh_pass,
- 'protection_level': host_protection}
- # no matter what happens during repair, go on to try to reverify
- try:
- self._execute_code(preamble + repair, namespace, namespace)
- except Exception, exc:
- print 'Exception occured during repair'
- traceback.print_exc()
- self.verify()
-
-
- def precheck(self):
- """
- perform any additional checks in derived classes.
- """
- pass
-
-
- def enable_external_logging(self):
- """Start or restart external logging mechanism.
- """
- pass
-
-
- def disable_external_logging(self):
- """ Pause or stop external logging mechanism.
- """
- pass
-
-
- def use_external_logging(self):
- """Return True if external logging should be used.
- """
- return False
-
-
- def parallel_simple(self, function, machines, log=True, timeout=None):
- """Run 'function' using parallel_simple, with an extra
- wrapper to handle the necessary setup for continuous parsing,
- if possible. If continuous parsing is already properly
- initialized then this should just work."""
- is_forking = not (len(machines) == 1 and
- self.machines == machines)
- if self.parse_job and is_forking:
- def wrapper(machine):
- self.parse_job += "/" + machine
- self.using_parser = True
- self.machines = [machine]
- self.resultdir = os.path.join(self.resultdir,
- machine)
- self.init_parser(self.resultdir)
- result = function(machine)
- self.cleanup_parser()
- return result
- elif len(machines) > 1:
- def wrapper(machine):
- self.resultdir = os.path.join(self.resultdir, machine)
- result = function(machine)
- return result
- else:
- wrapper = function
- subcommand.parallel_simple(wrapper, machines, log, timeout)
-
-
- def run(self, reboot = False, install_before = False,
- install_after = False, collect_crashdumps = True,
- namespace = {}):
- # use a copy so changes don't affect the original dictionary
- namespace = namespace.copy()
- machines = self.machines
-
- self.aborted = False
- namespace['machines'] = machines
- namespace['args'] = self.args
- namespace['job'] = self
- namespace['ssh_user'] = self.ssh_user
- namespace['ssh_port'] = self.ssh_port
- namespace['ssh_pass'] = self.ssh_pass
- test_start_time = int(time.time())
-
- os.chdir(self.resultdir)
-
- self.enable_external_logging()
- status_log = os.path.join(self.resultdir, 'status.log')
- try:
- if install_before and machines:
- self._execute_code(preamble + install, namespace, namespace)
- if self.client:
- namespace['control'] = self.control
- open('control', 'w').write(self.control)
- open('control.srv', 'w').write(client_wrapper)
- server_control = client_wrapper
- else:
- open('control.srv', 'w').write(self.control)
- server_control = self.control
- self._execute_code(preamble + server_control, namespace,
- namespace)
-
- finally:
- if machines and collect_crashdumps:
- namespace['test_start_time'] = test_start_time
- self._execute_code(preamble + crashdumps, namespace,
- namespace)
- self.disable_external_logging()
- if reboot and machines:
- self._execute_code(preamble + reboot_segment,namespace,
- namespace)
- if install_after and machines:
- self._execute_code(preamble + install, namespace, namespace)
-
-
- def run_test(self, url, *args, **dargs):
- """Summon a test object and run it.
-
- tag
- tag to add to testname
- url
- url of the test to run
- """
-
- (group, testname) = self.pkgmgr.get_package_name(url, 'test')
- tag = None
- subdir = testname
-
- tag = dargs.pop('tag', None)
- if tag:
- subdir += '.' + tag
-
- outputdir = os.path.join(self.resultdir, subdir)
- if os.path.exists(outputdir):
- msg = ("%s already exists, test <%s> may have"
- " already run with tag <%s>"
- % (outputdir, testname, tag) )
- raise error.TestError(msg)
- os.mkdir(outputdir)
-
- def group_func():
- try:
- test.runtest(self, url, tag, args, dargs)
- except error.TestBaseException, e:
- self.record(e.exit_status, subdir, testname, str(e))
- raise
- except Exception, e:
- info = str(e) + "\n" + traceback.format_exc()
- self.record('FAIL', subdir, testname, info)
- raise
- else:
- self.record('GOOD', subdir, testname,
- 'completed successfully')
- self._run_group(testname, subdir, group_func)
-
-
- def _run_group(self, name, subdir, function, *args, **dargs):
- """\
- Underlying method for running something inside of a group.
- """
- result = None
- old_record_prefix = self.record_prefix
- try:
- self.record('START', subdir, name)
- self.record_prefix += '\t'
- try:
- result = function(*args, **dargs)
- finally:
- self.record_prefix = old_record_prefix
- except error.TestBaseException, e:
- self.record("END %s" % e.exit_status, subdir, name, str(e))
- except Exception, e:
- err_msg = str(e) + '\n'
- err_msg += traceback.format_exc()
- self.record('END ABORT', subdir, name, err_msg)
- raise error.JobError(name + ' failed\n' + traceback.format_exc())
- else:
- self.record('END GOOD', subdir, name)
-
- return result
-
-
- def run_group(self, function, *args, **dargs):
- """\
- function:
- subroutine to run
- *args:
- arguments for the function
- """
-
- name = function.__name__
-
- # Allow the tag for the group to be specified.
- tag = dargs.pop('tag', None)
- if tag:
- name = tag
-
- return self._run_group(name, None, function, *args, **dargs)
-
-
- def run_reboot(self, reboot_func, get_kernel_func):
- """\
- A specialization of run_group meant specifically for handling
- a reboot. Includes support for capturing the kernel version
- after the reboot.
-
- reboot_func: a function that carries out the reboot
-
- get_kernel_func: a function that returns a string
- representing the kernel version.
- """
-
- old_record_prefix = self.record_prefix
- try:
- self.record('START', None, 'reboot')
- self.record_prefix += '\t'
- reboot_func()
- except Exception, e:
- self.record_prefix = old_record_prefix
- err_msg = str(e) + '\n' + traceback.format_exc()
- self.record('END FAIL', None, 'reboot', err_msg)
- else:
- kernel = get_kernel_func()
- self.record_prefix = old_record_prefix
- self.record('END GOOD', None, 'reboot',
- optional_fields={"kernel": kernel})
-
-
- def record(self, status_code, subdir, operation, status='',
- optional_fields=None):
- """
- Record job-level status
-
- The intent is to make this file both machine parseable and
- human readable. That involves a little more complexity, but
- really isn't all that bad ;-)
-
- Format is <status code>\t<subdir>\t<operation>\t<status>
-
- status code: see common_lib.logging.is_valid_status()
- for valid status definition
-
- subdir: MUST be a relevant subdirectory in the results,
- or None, which will be represented as '----'
-
- operation: description of what you ran (e.g. "dbench", or
- "mkfs -t foobar /dev/sda9")
-
- status: error message or "completed sucessfully"
-
- ------------------------------------------------------------
-
- Initial tabs indicate indent levels for grouping, and is
- governed by self.record_prefix
-
- multiline messages have secondary lines prefaced by a double
- space (' ')
-
- Executing this method will trigger the logging of all new
- warnings to date from the various console loggers.
- """
- # poll all our warning loggers for new warnings
- warnings = self._read_warnings()
- for timestamp, msg in warnings:
- self._record("WARN", None, None, msg, timestamp)
-
- # write out the actual status log line
- self._record(status_code, subdir, operation, status,
- optional_fields=optional_fields)
-
-
- def _read_warnings(self):
- warnings = []
- while True:
- # pull in a line of output from every logger that has
- # output ready to be read
- loggers, _, _ = select.select(self.warning_loggers,
- [], [], 0)
- closed_loggers = set()
- for logger in loggers:
- line = logger.readline()
- # record any broken pipes (aka line == empty)
- if len(line) == 0:
- closed_loggers.add(logger)
- continue
- timestamp, msg = line.split('\t', 1)
- warnings.append((int(timestamp), msg.strip()))
-
- # stop listening to loggers that are closed
- self.warning_loggers -= closed_loggers
-
- # stop if none of the loggers have any output left
- if not loggers:
- break
-
- # sort into timestamp order
- warnings.sort()
- return warnings
-
-
- def _render_record(self, status_code, subdir, operation, status='',
- epoch_time=None, record_prefix=None,
- optional_fields=None):
- """
- Internal Function to generate a record to be written into a
- status log. For use by server_job.* classes only.
- """
- if subdir:
- if re.match(r'[\n\t]', subdir):
- raise ValueError(
- 'Invalid character in subdir string')
- substr = subdir
- else:
- substr = '----'
-
- if not logging.is_valid_status(status_code):
- raise ValueError('Invalid status code supplied: %s' %
- status_code)
- if not operation:
- operation = '----'
- if re.match(r'[\n\t]', operation):
- raise ValueError(
- 'Invalid character in operation string')
- operation = operation.rstrip()
- status = status.rstrip()
- status = re.sub(r"\t", " ", status)
- # Ensure any continuation lines are marked so we can
- # detect them in the status file to ensure it is parsable.
- status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
-
- if not optional_fields:
- optional_fields = {}
-
- # Generate timestamps for inclusion in the logs
- if epoch_time is None:
- epoch_time = int(time.time())
- local_time = time.localtime(epoch_time)
- optional_fields["timestamp"] = str(epoch_time)
- optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
- local_time)
-
- fields = [status_code, substr, operation]
- fields += ["%s=%s" % x for x in optional_fields.iteritems()]
- fields.append(status)
-
- if record_prefix is None:
- record_prefix = self.record_prefix
-
- msg = '\t'.join(str(x) for x in fields)
-
- return record_prefix + msg + '\n'
-
-
- def _record_prerendered(self, msg):
- """
- Record a pre-rendered msg into the status logs. The only
- change this makes to the message is to add on the local
- indentation. Should not be called outside of server_job.*
- classes. Unlike _record, this does not write the message
- to standard output.
- """
- lines = []
- status_file = os.path.join(self.resultdir, 'status.log')
- status_log = open(status_file, 'a')
- for line in msg.splitlines():
- line = self.record_prefix + line + '\n'
- lines.append(line)
- status_log.write(line)
- status_log.close()
- self.__parse_status(lines)
-
-
- def _execute_code(self, code, global_scope, local_scope):
- exec(code, global_scope, local_scope)
-
-
- def _record(self, status_code, subdir, operation, status='',
- epoch_time=None, optional_fields=None):
- """
- Actual function for recording a single line into the status
- logs. Should never be called directly, only by job.record as
- this would bypass the console monitor logging.
- """
-
- msg = self._render_record(status_code, subdir, operation,
- status, epoch_time,
- optional_fields=optional_fields)
-
-
- status_file = os.path.join(self.resultdir, 'status.log')
- sys.stdout.write(msg)
- open(status_file, "a").write(msg)
- if subdir:
- test_dir = os.path.join(self.resultdir, subdir)
- status_file = os.path.join(test_dir, 'status')
- open(status_file, "a").write(msg)
- self.__parse_status(msg.splitlines())
-
-
- def __parse_status(self, new_lines):
- if not self.using_parser:
- return
- new_tests = self.parser.process_lines(new_lines)
- for test in new_tests:
- self.__insert_test(test)
-
-
- def __insert_test(self, test):
- """ An internal method to insert a new test result into the
- database. This method will not raise an exception, even if an
- error occurs during the insert, to avoid failing a test
- simply because of unexpected database issues."""
- try:
- self.results_db.insert_test(self.job_model, test)
- except Exception:
- msg = ("WARNING: An unexpected error occured while "
- "inserting test results into the database. "
- "Ignoring error.\n" + traceback.format_exc())
- print >> sys.stderr, msg
diff --git a/server/server_job.py b/server/server_job.py
index 1e20622..fed468f 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -11,8 +11,684 @@
Andy Whitcroft <[email protected]>
"""
-import re, sys
-from autotest_lib.server import base_server_job
+import os, sys, re, time, select, subprocess, traceback
+
+from autotest_lib.client.bin import fd_stack
+from autotest_lib.client.common_lib import error, logging
+from autotest_lib.server import test, subcommand
+from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
+from autotest_lib.client.common_lib import utils, packages
+
+
+# load up a control segment
+# these are all stored in <server_dir>/control_segments
+def load_control_segment(name):
+ server_dir = os.path.dirname(os.path.abspath(__file__))
+ script_file = os.path.join(server_dir, "control_segments", name)
+ if os.path.exists(script_file):
+ return file(script_file).read()
+ else:
+ return ""
+
+
+preamble = """\
+import os, sys
+
+from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
+from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
+from autotest_lib.server import git_kernel
+from autotest_lib.server.subcommand import *
+from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
+from autotest_lib.server.utils import parse_machine
+from autotest_lib.client.common_lib.error import *
+from autotest_lib.client.common_lib import barrier
+
+autotest.Autotest.job = job
+hosts.SSHHost.job = job
+barrier = barrier.barrier
+if len(machines) > 1:
+ open('.machines', 'w').write('\\n'.join(machines) + '\\n')
+"""
+
+client_wrapper = """
+at = autotest.Autotest()
+
+def run_client(machine):
+ hostname, user, password, port = parse_machine(machine,
+ ssh_user, ssh_port, ssh_pass)
+
+ host = hosts.SSHHost(hostname, user, port, password=password)
+ at.run(control, host=host)
+
+job.parallel_simple(run_client, machines)
+"""
+
+crashdumps = """
+def crashdumps(machine):
+ hostname, user, password, port = parse_machine(machine,
+ ssh_user, ssh_port, ssh_pass)
+
+ host = hosts.SSHHost(hostname, user, port, initialize=False, \
+ password=password)
+ host.get_crashdumps(test_start_time)
+
+job.parallel_simple(crashdumps, machines, log=False)
+"""
+
+reboot_segment="""\
+def reboot(machine):
+ hostname, user, password, port = parse_machine(machine,
+ ssh_user, ssh_port, ssh_pass)
+
+ host = hosts.SSHHost(hostname, user, port, initialize=False, \
+ password=password)
+ host.reboot()
+
+job.parallel_simple(reboot, machines, log=False)
+"""
+
+install="""\
+def install(machine):
+ hostname, user, password, port = parse_machine(machine,
+ ssh_user, ssh_port, ssh_pass)
+
+ host = hosts.SSHHost(hostname, user, port, initialize=False, \
+ password=password)
+ host.machine_install()
+
+job.parallel_simple(install, machines, log=False)
+"""
+
+# load up the verifier control segment, with an optional site-specific hook
+verify = load_control_segment("site_verify")
+verify += load_control_segment("verify")
+
+# load up the repair control segment, with an optional site-specific hook
+repair = load_control_segment("site_repair")
+repair += load_control_segment("repair")
+
+
+# load up site-specific code for generating site-specific job data
+try:
+ import site_job
+ get_site_job_data = site_job.get_site_job_data
+ del site_job
+except ImportError:
+ # by default provide a stub that generates no site data
+ def get_site_job_data(job):
+ return {}
+
+
+class base_server_job(object):
+ """The actual job against which we do everything.
+
+ Properties:
+ autodir
+ The top level autotest directory (/usr/local/autotest).
+ serverdir
+ <autodir>/server/
+ clientdir
+ <autodir>/client/
+ conmuxdir
+ <autodir>/conmux/
+ testdir
+ <autodir>/server/tests/
+ site_testdir
+ <autodir>/server/site_tests/
+ control
+ the control file for this job
+ """
+
+ STATUS_VERSION = 1
+
+
+ def __init__(self, control, args, resultdir, label, user, machines,
+ client=False, parse_job='',
+ ssh_user='root', ssh_port=22, ssh_pass=''):
+ """
+ control
+ The control file (pathname of)
+ args
+ args to pass to the control file
+ resultdir
+ where to throw the results
+ label
+ label for the job
+ user
+ Username for the job (email address)
+ client
+ True if a client-side control file
+ """
+ path = os.path.dirname(__file__)
+ self.autodir = os.path.abspath(os.path.join(path, '..'))
+ self.serverdir = os.path.join(self.autodir, 'server')
+ self.testdir = os.path.join(self.serverdir, 'tests')
+ self.site_testdir = os.path.join(self.serverdir, 'site_tests')
+ self.tmpdir = os.path.join(self.serverdir, 'tmp')
+ self.conmuxdir = os.path.join(self.autodir, 'conmux')
+ self.clientdir = os.path.join(self.autodir, 'client')
+ self.toolsdir = os.path.join(self.autodir, 'client/tools')
+ if control:
+ self.control = open(control, 'r').read()
+ self.control = re.sub('\r', '', self.control)
+ else:
+ self.control = None
+ self.resultdir = resultdir
+ if not os.path.exists(resultdir):
+ os.mkdir(resultdir)
+ self.debugdir = os.path.join(resultdir, 'debug')
+ if not os.path.exists(self.debugdir):
+ os.mkdir(self.debugdir)
+ self.status = os.path.join(resultdir, 'status')
+ self.label = label
+ self.user = user
+ self.args = args
+ self.machines = machines
+ self.client = client
+ self.record_prefix = ''
+ self.warning_loggers = set()
+ self.ssh_user = ssh_user
+ self.ssh_port = ssh_port
+ self.ssh_pass = ssh_pass
+
+ self.stdout = fd_stack.fd_stack(1, sys.stdout)
+ self.stderr = fd_stack.fd_stack(2, sys.stderr)
+
+ if os.path.exists(self.status):
+ os.unlink(self.status)
+ job_data = {'label' : label, 'user' : user,
+ 'hostname' : ','.join(machines),
+ 'status_version' : str(self.STATUS_VERSION)}
+ job_data.update(get_site_job_data(self))
+ utils.write_keyval(self.resultdir, job_data)
+
+ self.parse_job = parse_job
+ if self.parse_job and len(machines) == 1:
+ self.using_parser = True
+ self.init_parser(resultdir)
+ else:
+ self.using_parser = False
+ self.pkgmgr = packages.PackageManager(
+ self.autodir, run_function_dargs={'timeout':600})
+ self.pkgdir = os.path.join(self.autodir, 'packages')
+
+
+ def init_parser(self, resultdir):
+ """Start the continuous parsing of resultdir. This sets up
+ the database connection and inserts the basic job object into
+ the database if necessary."""
+ # redirect parser debugging to .parse.log
+ parse_log = os.path.join(resultdir, '.parse.log')
+ parse_log = open(parse_log, 'w', 0)
+ tko_utils.redirect_parser_debugging(parse_log)
+ # create a job model object and set up the db
+ self.results_db = tko_db.db(autocommit=True)
+ self.parser = status_lib.parser(self.STATUS_VERSION)
+ self.job_model = self.parser.make_job(resultdir)
+ self.parser.start(self.job_model)
+ # check if a job already exists in the db and insert it if
+ # it does not
+ job_idx = self.results_db.find_job(self.parse_job)
+ if job_idx is None:
+ self.results_db.insert_job(self.parse_job,
+ self.job_model)
+ else:
+ machine_idx = self.results_db.lookup_machine(
+ self.job_model.machine)
+ self.job_model.index = job_idx
+ self.job_model.machine_idx = machine_idx
+
+
+ def cleanup_parser(self):
+ """This should be called after the server job is finished
+ to carry out any remaining cleanup (e.g. flushing any
+ remaining test results to the results db)"""
+ if not self.using_parser:
+ return
+ final_tests = self.parser.end()
+ for test in final_tests:
+ self.__insert_test(test)
+ self.using_parser = False
+
+
+ def verify(self):
+ if not self.machines:
+ raise error.AutoservError(
+ 'No machines specified to verify')
+ try:
+ namespace = {'machines' : self.machines, 'job' : self, \
+ 'ssh_user' : self.ssh_user, \
+ 'ssh_port' : self.ssh_port, \
+ 'ssh_pass' : self.ssh_pass}
+ self._execute_code(preamble + verify, namespace, namespace)
+ except Exception, e:
+ msg = ('Verify failed\n' + str(e) + '\n'
+ + traceback.format_exc())
+ self.record('ABORT', None, None, msg)
+ raise
+
+
+ def repair(self, host_protection):
+ if not self.machines:
+ raise error.AutoservError('No machines specified to repair')
+ namespace = {'machines': self.machines, 'job': self,
+ 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
+ 'ssh_pass': self.ssh_pass,
+ 'protection_level': host_protection}
+ # no matter what happens during repair, go on to try to reverify
+ try:
+ self._execute_code(preamble + repair, namespace, namespace)
+ except Exception, exc:
+ print 'Exception occured during repair'
+ traceback.print_exc()
+ self.verify()
+
+
+ def precheck(self):
+ """
+ perform any additional checks in derived classes.
+ """
+ pass
+
+
+ def enable_external_logging(self):
+ """Start or restart external logging mechanism.
+ """
+ pass
+
+
+ def disable_external_logging(self):
+ """ Pause or stop external logging mechanism.
+ """
+ pass
+
+
+ def use_external_logging(self):
+ """Return True if external logging should be used.
+ """
+ return False
+
+
+ def parallel_simple(self, function, machines, log=True, timeout=None):
+ """Run 'function' using parallel_simple, with an extra
+ wrapper to handle the necessary setup for continuous parsing,
+ if possible. If continuous parsing is already properly
+ initialized then this should just work."""
+ is_forking = not (len(machines) == 1 and
+ self.machines == machines)
+ if self.parse_job and is_forking:
+ def wrapper(machine):
+ self.parse_job += "/" + machine
+ self.using_parser = True
+ self.machines = [machine]
+ self.resultdir = os.path.join(self.resultdir,
+ machine)
+ self.init_parser(self.resultdir)
+ result = function(machine)
+ self.cleanup_parser()
+ return result
+ elif len(machines) > 1:
+ def wrapper(machine):
+ self.resultdir = os.path.join(self.resultdir, machine)
+ result = function(machine)
+ return result
+ else:
+ wrapper = function
+ subcommand.parallel_simple(wrapper, machines, log, timeout)
+
+
+ def run(self, reboot = False, install_before = False,
+ install_after = False, collect_crashdumps = True,
+ namespace = {}):
+ # use a copy so changes don't affect the original dictionary
+ namespace = namespace.copy()
+ machines = self.machines
+
+ self.aborted = False
+ namespace['machines'] = machines
+ namespace['args'] = self.args
+ namespace['job'] = self
+ namespace['ssh_user'] = self.ssh_user
+ namespace['ssh_port'] = self.ssh_port
+ namespace['ssh_pass'] = self.ssh_pass
+ test_start_time = int(time.time())
+
+ os.chdir(self.resultdir)
+
+ self.enable_external_logging()
+ status_log = os.path.join(self.resultdir, 'status.log')
+ try:
+ if install_before and machines:
+ self._execute_code(preamble + install, namespace, namespace)
+ if self.client:
+ namespace['control'] = self.control
+ open('control', 'w').write(self.control)
+ open('control.srv', 'w').write(client_wrapper)
+ server_control = client_wrapper
+ else:
+ open('control.srv', 'w').write(self.control)
+ server_control = self.control
+ self._execute_code(preamble + server_control, namespace,
+ namespace)
+
+ finally:
+ if machines and collect_crashdumps:
+ namespace['test_start_time'] = test_start_time
+ self._execute_code(preamble + crashdumps, namespace,
+ namespace)
+ self.disable_external_logging()
+ if reboot and machines:
+ self._execute_code(preamble + reboot_segment,namespace,
+ namespace)
+ if install_after and machines:
+ self._execute_code(preamble + install, namespace, namespace)
+
+
+ def run_test(self, url, *args, **dargs):
+ """Summon a test object and run it.
+
+ tag
+ tag to add to testname
+ url
+ url of the test to run
+ """
+
+ (group, testname) = self.pkgmgr.get_package_name(url, 'test')
+ tag = None
+ subdir = testname
+
+ tag = dargs.pop('tag', None)
+ if tag:
+ subdir += '.' + tag
+
+ outputdir = os.path.join(self.resultdir, subdir)
+ if os.path.exists(outputdir):
+ msg = ("%s already exists, test <%s> may have"
+ " already run with tag <%s>"
+ % (outputdir, testname, tag) )
+ raise error.TestError(msg)
+ os.mkdir(outputdir)
+
+ def group_func():
+ try:
+ test.runtest(self, url, tag, args, dargs)
+ except error.TestBaseException, e:
+ self.record(e.exit_status, subdir, testname, str(e))
+ raise
+ except Exception, e:
+ info = str(e) + "\n" + traceback.format_exc()
+ self.record('FAIL', subdir, testname, info)
+ raise
+ else:
+ self.record('GOOD', subdir, testname,
+ 'completed successfully')
+ self._run_group(testname, subdir, group_func)
+
+
+ def _run_group(self, name, subdir, function, *args, **dargs):
+ """\
+ Underlying method for running something inside of a group.
+ """
+ result = None
+ old_record_prefix = self.record_prefix
+ try:
+ self.record('START', subdir, name)
+ self.record_prefix += '\t'
+ try:
+ result = function(*args, **dargs)
+ finally:
+ self.record_prefix = old_record_prefix
+ except error.TestBaseException, e:
+ self.record("END %s" % e.exit_status, subdir, name, str(e))
+ except Exception, e:
+ err_msg = str(e) + '\n'
+ err_msg += traceback.format_exc()
+ self.record('END ABORT', subdir, name, err_msg)
+ raise error.JobError(name + ' failed\n' + traceback.format_exc())
+ else:
+ self.record('END GOOD', subdir, name)
+
+ return result
+
+
+ def run_group(self, function, *args, **dargs):
+ """\
+ function:
+ subroutine to run
+ *args:
+ arguments for the function
+ """
+
+ name = function.__name__
+
+ # Allow the tag for the group to be specified.
+ tag = dargs.pop('tag', None)
+ if tag:
+ name = tag
+
+ return self._run_group(name, None, function, *args, **dargs)
+
+
+ def run_reboot(self, reboot_func, get_kernel_func):
+ """\
+ A specialization of run_group meant specifically for handling
+ a reboot. Includes support for capturing the kernel version
+ after the reboot.
+
+ reboot_func: a function that carries out the reboot
+
+ get_kernel_func: a function that returns a string
+ representing the kernel version.
+ """
+
+ old_record_prefix = self.record_prefix
+ try:
+ self.record('START', None, 'reboot')
+ self.record_prefix += '\t'
+ reboot_func()
+ except Exception, e:
+ self.record_prefix = old_record_prefix
+ err_msg = str(e) + '\n' + traceback.format_exc()
+ self.record('END FAIL', None, 'reboot', err_msg)
+ else:
+ kernel = get_kernel_func()
+ self.record_prefix = old_record_prefix
+ self.record('END GOOD', None, 'reboot',
+ optional_fields={"kernel": kernel})
+
+
+ def record(self, status_code, subdir, operation, status='',
+ optional_fields=None):
+ """
+ Record job-level status
+
+ The intent is to make this file both machine parseable and
+ human readable. That involves a little more complexity, but
+ really isn't all that bad ;-)
+
+ Format is <status code>\t<subdir>\t<operation>\t<status>
+
+ status code: see common_lib.logging.is_valid_status()
+ for valid status definition
+
+ subdir: MUST be a relevant subdirectory in the results,
+ or None, which will be represented as '----'
+
+ operation: description of what you ran (e.g. "dbench", or
+ "mkfs -t foobar /dev/sda9")
+
+ status: error message or "completed sucessfully"
+
+ ------------------------------------------------------------
+
+ Initial tabs indicate indent levels for grouping, and is
+ governed by self.record_prefix
+
+ multiline messages have secondary lines prefaced by a double
+ space (' ')
+
+ Executing this method will trigger the logging of all new
+ warnings to date from the various console loggers.
+ """
+ # poll all our warning loggers for new warnings
+ warnings = self._read_warnings()
+ for timestamp, msg in warnings:
+ self._record("WARN", None, None, msg, timestamp)
+
+ # write out the actual status log line
+ self._record(status_code, subdir, operation, status,
+ optional_fields=optional_fields)
+
+
+ def _read_warnings(self):
+ warnings = []
+ while True:
+ # pull in a line of output from every logger that has
+ # output ready to be read
+ loggers, _, _ = select.select(self.warning_loggers,
+ [], [], 0)
+ closed_loggers = set()
+ for logger in loggers:
+ line = logger.readline()
+ # record any broken pipes (aka line == empty)
+ if len(line) == 0:
+ closed_loggers.add(logger)
+ continue
+ timestamp, msg = line.split('\t', 1)
+ warnings.append((int(timestamp), msg.strip()))
+
+ # stop listening to loggers that are closed
+ self.warning_loggers -= closed_loggers
+
+ # stop if none of the loggers have any output left
+ if not loggers:
+ break
+
+ # sort into timestamp order
+ warnings.sort()
+ return warnings
+
+
+ def _render_record(self, status_code, subdir, operation, status='',
+ epoch_time=None, record_prefix=None,
+ optional_fields=None):
+ """
+ Internal Function to generate a record to be written into a
+ status log. For use by server_job.* classes only.
+ """
+ if subdir:
+ if re.match(r'[\n\t]', subdir):
+ raise ValueError(
+ 'Invalid character in subdir string')
+ substr = subdir
+ else:
+ substr = '----'
+
+ if not logging.is_valid_status(status_code):
+ raise ValueError('Invalid status code supplied: %s' %
+ status_code)
+ if not operation:
+ operation = '----'
+ if re.match(r'[\n\t]', operation):
+ raise ValueError(
+ 'Invalid character in operation string')
+ operation = operation.rstrip()
+ status = status.rstrip()
+ status = re.sub(r"\t", " ", status)
+ # Ensure any continuation lines are marked so we can
+ # detect them in the status file to ensure it is parsable.
+ status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
+
+ if not optional_fields:
+ optional_fields = {}
+
+ # Generate timestamps for inclusion in the logs
+ if epoch_time is None:
+ epoch_time = int(time.time())
+ local_time = time.localtime(epoch_time)
+ optional_fields["timestamp"] = str(epoch_time)
+ optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
+ local_time)
+
+ fields = [status_code, substr, operation]
+ fields += ["%s=%s" % x for x in optional_fields.iteritems()]
+ fields.append(status)
+
+ if record_prefix is None:
+ record_prefix = self.record_prefix
+
+ msg = '\t'.join(str(x) for x in fields)
+
+ return record_prefix + msg + '\n'
+
+
+ def _record_prerendered(self, msg):
+ """
+ Record a pre-rendered msg into the status logs. The only
+ change this makes to the message is to add on the local
+ indentation. Should not be called outside of server_job.*
+ classes. Unlike _record, this does not write the message
+ to standard output.
+ """
+ lines = []
+ status_file = os.path.join(self.resultdir, 'status.log')
+ status_log = open(status_file, 'a')
+ for line in msg.splitlines():
+ line = self.record_prefix + line + '\n'
+ lines.append(line)
+ status_log.write(line)
+ status_log.close()
+ self.__parse_status(lines)
+
+
+ def _execute_code(self, code, global_scope, local_scope):
+ exec(code, global_scope, local_scope)
+
+
+ def _record(self, status_code, subdir, operation, status='',
+ epoch_time=None, optional_fields=None):
+ """
+ Actual function for recording a single line into the status
+ logs. Should never be called directly, only by job.record as
+ this would bypass the console monitor logging.
+ """
+
+ msg = self._render_record(status_code, subdir, operation,
+ status, epoch_time,
+ optional_fields=optional_fields)
+
+
+ status_file = os.path.join(self.resultdir, 'status.log')
+ sys.stdout.write(msg)
+ open(status_file, "a").write(msg)
+ if subdir:
+ test_dir = os.path.join(self.resultdir, subdir)
+ status_file = os.path.join(test_dir, 'status')
+ open(status_file, "a").write(msg)
+ self.__parse_status(msg.splitlines())
+
+
+ def __parse_status(self, new_lines):
+ if not self.using_parser:
+ return
+ new_tests = self.parser.process_lines(new_lines)
+ for test in new_tests:
+ self.__insert_test(test)
+
+
+ def __insert_test(self, test):
+ """ An internal method to insert a new test result into the
+ database. This method will not raise an exception, even if an
+ error occurs during the insert, to avoid failing a test
+ simply because of unexpected database issues."""
+ try:
+ self.results_db.insert_test(self.job_model, test)
+ except Exception:
+ msg = ("WARNING: An unexpected error occured while "
+ "inserting test results into the database. "
+ "Ignoring error.\n" + traceback.format_exc())
+ print >> sys.stderr, msg
+
# a file-like object for catching stderr from an autotest client and
# extracting status logs from it
@@ -153,8 +829,8 @@
try:
from autotest_lib.server.site_server_job import site_server_job
except ImportError:
- class site_server_job(base_server_job.base_server_job):
+ class site_server_job(object):
pass
-class server_job(site_server_job):
+class server_job(site_server_job, base_server_job):
pass
diff --git a/server/base_server_job_unittest.py b/server/server_job_unittest.py
similarity index 91%
rename from server/base_server_job_unittest.py
rename to server/server_job_unittest.py
index 6e259cf..f17d52f 100644
--- a/server/base_server_job_unittest.py
+++ b/server/server_job_unittest.py
@@ -2,7 +2,7 @@
import unittest, os, time
import common
-from autotest_lib.server import base_server_job, test, subcommand
+from autotest_lib.server import server_job, test, subcommand
from autotest_lib.client.common_lib import utils, error, host_protections
from autotest_lib.client.common_lib import packages
from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
@@ -28,8 +28,8 @@
self.god.stub_function(os, 'mkdir')
self.god.stub_function(os, 'chdir')
self.god.stub_function(os, 'unlink')
- self.god.stub_function(base_server_job, 'get_site_job_data')
- self.god.stub_function(base_server_job, 'open')
+ self.god.stub_function(server_job, 'get_site_job_data')
+ self.god.stub_function(server_job, 'open')
self.god.stub_function(utils, 'write_keyval')
@@ -42,7 +42,7 @@
def construct_server_job(self):
# setup recording for constructor
file_obj = self.god.create_mock_class(file, "file")
- base_server_job.open.expect_call(self.control,
+ server_job.open.expect_call(self.control,
'r').and_return(file_obj)
file_obj.read.expect_call().and_return('')
os.path.exists.expect_call(
@@ -54,19 +54,19 @@
os.path.exists.expect_call(
mock.is_string_comparator()).and_return(True)
os.unlink.expect_call(mock.is_string_comparator())
- cls = base_server_job.base_server_job
+ cls = server_job.base_server_job
compare = mock.is_instance_comparator(cls)
- base_server_job.get_site_job_data.expect_call(
+ server_job.get_site_job_data.expect_call(
compare).and_return({})
utils.write_keyval.expect_call(mock.is_string_comparator(),
mock.is_instance_comparator(dict))
- self.job = base_server_job.base_server_job(self.control,
- self.args,
- self.resultdir,
- self.label,
- self.user,
- self.machines)
+ self.job = server_job.base_server_job(self.control,
+ self.args,
+ self.resultdir,
+ self.label,
+ self.user,
+ self.machines)
self.god.check_playback()
@@ -91,7 +91,7 @@
# set up recording
file_obj = self.god.create_mock_class(file, "file")
- base_server_job.open.expect_call(log, 'w',
+ server_job.open.expect_call(log, 'w',
0).and_return(file_obj)
tko_utils.redirect_parser_debugging.expect_call(file_obj)
db = self.god.create_mock_class(tko_db.db_sql, "db_sql")
@@ -120,7 +120,7 @@
'ssh_user' : self.job.ssh_user, \
'ssh_port' : self.job.ssh_port, \
'ssh_pass' : self.job.ssh_pass}
- arg = base_server_job.preamble + base_server_job.verify
+ arg = server_job.preamble + server_job.verify
self.job._execute_code.expect_call(arg, namespace,
namespace)
@@ -140,10 +140,10 @@
repair_namespace = verify_namespace.copy()
repair_namespace['protection_level'] = host_protections.default
- arg = base_server_job.preamble + base_server_job.repair
+ arg = server_job.preamble + server_job.repair
self.job._execute_code.expect_call(arg, repair_namespace,
repair_namespace)
- arg = base_server_job.preamble + base_server_job.verify
+ arg = server_job.preamble + server_job.verify
self.job._execute_code.expect_call(arg, verify_namespace,
verify_namespace)
@@ -196,13 +196,13 @@
time.time.expect_call().and_return(my_time)
os.chdir.expect_call(mock.is_string_comparator())
self.job.enable_external_logging.expect_call()
- base_server_job.open.expect_call(
+ server_job.open.expect_call(
'control.srv', 'w').and_return(file_obj)
file_obj.write.expect_call('')
- arg = base_server_job.preamble + ''
+ arg = server_job.preamble + ''
self.job._execute_code.expect_call(arg, namespace,
namespace)
- arg = base_server_job.preamble + base_server_job.crashdumps
+ arg = server_job.preamble + server_job.crashdumps
self.job._execute_code.expect_call(arg, namespace2,
namespace2)
self.job.disable_external_logging.expect_call()