| """The main job wrapper |
| |
| This is the core infrastructure. |
| |
| Copyright Andy Whitcroft, Martin J. Bligh 2006 |
| """ |
| |
| import copy, os, platform, re, shutil, sys, time, traceback, types, glob |
| import logging, logging.config |
| import cPickle as pickle |
| from autotest_lib.client.bin import utils, parallel, kernel, xen |
| from autotest_lib.client.bin import profilers, fd_stack, boottool, harness |
| from autotest_lib.client.bin import config, sysinfo, test |
| from autotest_lib.client.bin import partition as partition_lib |
| from autotest_lib.client.common_lib import error, barrier, log |
| from autotest_lib.client.common_lib import packages |
| |
| LAST_BOOT_TAG = object() |
| NO_DEFAULT = object() |
| JOB_PREAMBLE = """ |
| from autotest_lib.client.common_lib.error import * |
| from autotest_lib.client.bin.utils import * |
| """ |
| |
| |
| class StepError(error.AutotestError): |
| pass |
| |
| class NotAvailableError(error.AutotestError): |
| pass |
| |
| |
| |
| def _run_test_complete_on_exit(f): |
| """Decorator for job methods that automatically calls |
| self.harness.run_test_complete when the method exits, if appropriate.""" |
| def wrapped(self, *args, **dargs): |
| try: |
| return f(self, *args, **dargs) |
| finally: |
| if self.log_filename == self.DEFAULT_LOG_FILENAME: |
| self.harness.run_test_complete() |
| if self.drop_caches: |
| logging.debug("Dropping caches") |
| utils.drop_caches() |
| wrapped.__name__ = f.__name__ |
| wrapped.__doc__ = f.__doc__ |
| wrapped.__dict__.update(f.__dict__) |
| return wrapped |
| |
| |
| class base_job(object): |
| """The actual job against which we do everything. |
| |
| Properties: |
| autodir |
| The top level autotest directory (/usr/local/autotest). |
| Comes from os.environ['AUTODIR']. |
| bindir |
| <autodir>/bin/ |
| libdir |
| <autodir>/lib/ |
| testdir |
| <autodir>/tests/ |
| configdir |
| <autodir>/config/ |
| site_testdir |
| <autodir>/site_tests/ |
| profdir |
| <autodir>/profilers/ |
| tmpdir |
| <autodir>/tmp/ |
| pkgdir |
| <autodir>/packages/ |
| resultdir |
| <autodir>/results/<jobtag> |
| toolsdir |
| <autodir>/tools/ |
| stdout |
| fd_stack object for stdout |
| stderr |
| fd_stack object for stderr |
| profilers |
| the profilers object for this job |
| harness |
| the server harness object for this job |
| config |
| the job configuration for this job |
| drop_caches_between_iterations |
| drop the pagecache between each iteration |
| """ |
| |
| DEFAULT_LOG_FILENAME = "status" |
| |
| def __init__(self, control, jobtag, cont, harness_type=None, |
| use_external_logging=False, drop_caches=True): |
| """ |
| Prepare a client side job object. |
| |
| Args: |
| control: The control file (pathname of). |
| jobtag: The job tag string (eg "default"). |
| cont: If this is the continuation of this job. |
| harness_type: An alternative server harness. [None] |
| use_external_logging: If true, the enable_external_logging |
| method will be called during construction. [False] |
| drop_caches: If true, utils.drop_caches() is |
| called before and between all tests. [True] |
| """ |
| self.autodir = os.environ['AUTODIR'] |
| self.bindir = os.path.join(self.autodir, 'bin') |
| self.libdir = os.path.join(self.autodir, 'lib') |
| self.testdir = os.path.join(self.autodir, 'tests') |
| self.configdir = os.path.join(self.autodir, 'config') |
| self.site_testdir = os.path.join(self.autodir, 'site_tests') |
| self.profdir = os.path.join(self.autodir, 'profilers') |
| self.tmpdir = os.path.join(self.autodir, 'tmp') |
| self.toolsdir = os.path.join(self.autodir, 'tools') |
| self.resultdir = os.path.join(self.autodir, 'results', jobtag) |
| |
| if not os.path.exists(self.resultdir): |
| os.makedirs(self.resultdir) |
| |
| # We export this env variable in order to reference the logging |
| # config system where to put the client logfile. |
| os.environ['AUTOTEST_RESULTS'] = self.resultdir |
| logging.config.fileConfig('%s/debug_client.ini' % self.autodir) |
| |
| self.drop_caches_between_iterations = False |
| self.drop_caches = drop_caches |
| if self.drop_caches: |
| logging.debug("Dropping caches") |
| utils.drop_caches() |
| |
| self.control = os.path.realpath(control) |
| self._is_continuation = cont |
| self.state_file = self.control + '.state' |
| self.current_step_ancestry = [] |
| self.next_step_index = 0 |
| self.testtag = '' |
| self._test_tag_prefix = '' |
| |
| self._load_state() |
| self.pkgmgr = packages.PackageManager( |
| self.autodir, run_function_dargs={'timeout':3600}) |
| self.pkgdir = os.path.join(self.autodir, 'packages') |
| self.run_test_cleanup = self.get_state("__run_test_cleanup", |
| default=True) |
| |
| self.sysinfo = sysinfo.sysinfo(self.resultdir) |
| self._load_sysinfo_state() |
| |
| self.last_boot_tag = self.get_state("__last_boot_tag", default=None) |
| |
| if not cont: |
| """ |
| Don't cleanup the tmp dir (which contains the lockfile) |
| in the constructor, this would be a problem for multiple |
| jobs starting at the same time on the same client. Instead |
| do the delete at the server side. We simply create the tmp |
| directory here if it does not already exist. |
| """ |
| if not os.path.exists(self.tmpdir): |
| os.mkdir(self.tmpdir) |
| |
| if not os.path.exists(self.pkgdir): |
| os.mkdir(self.pkgdir) |
| |
| results = os.path.join(self.autodir, 'results') |
| if not os.path.exists(results): |
| os.mkdir(results) |
| |
| download = os.path.join(self.testdir, 'download') |
| if not os.path.exists(download): |
| os.mkdir(download) |
| |
| # Clean up directory except for the client.log file that |
| # was initialized when job was instantiated anyway |
| client_logfile = os.path.join(self.resultdir, 'client.log') |
| if os.path.exists(self.resultdir): |
| list_files = glob.glob('%s/*' % self.resultdir) |
| for f in list_files: |
| if f != client_logfile: |
| if os.path.isdir(f): |
| shutil.rmtree(f) |
| elif os.path.isfile(f): |
| os.remove(f) |
| |
| os.makedirs(os.path.join(self.resultdir, 'debug')) |
| os.makedirs(os.path.join(self.resultdir, 'analysis')) |
| |
| shutil.copyfile(self.control, |
| os.path.join(self.resultdir, 'control')) |
| |
| |
| self.control = control |
| self.jobtag = jobtag |
| self.log_filename = self.DEFAULT_LOG_FILENAME |
| |
| self.stdout = fd_stack.fd_stack(1, sys.stdout) |
| self.stderr = fd_stack.fd_stack(2, sys.stderr) |
| |
| self._init_group_level() |
| |
| self.config = config.config(self) |
| self.harness = harness.select(harness_type, self) |
| self.profilers = profilers.profilers(self) |
| |
| try: |
| tool = self.config_get('boottool.executable') |
| self.bootloader = boottool.boottool(tool) |
| except: |
| pass |
| |
| self.sysinfo.log_per_reboot_data() |
| |
| if not cont: |
| self.record('START', None, None) |
| self._increment_group_level() |
| |
| self.harness.run_start() |
| |
| if use_external_logging: |
| self.enable_external_logging() |
| |
| # load the max disk usage rate - default to no monitoring |
| self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) |
| |
| def disable_warnings(self, warning_type): |
| self.record("INFO", None, None, |
| "disabling %s warnings" % warning_type, |
| {"warnings.disable": warning_type}) |
| |
| |
| def enable_warnings(self, warning_type): |
| self.record("INFO", None, None, |
| "enabling %s warnings" % warning_type, |
| {"warnings.enable": warning_type}) |
| |
| |
| def monitor_disk_usage(self, max_rate): |
| """\ |
| Signal that the job should monitor disk space usage on / |
| and generate a warning if a test uses up disk space at a |
| rate exceeding 'max_rate'. |
| |
| Parameters: |
| max_rate - the maximium allowed rate of disk consumption |
| during a test, in MB/hour, or 0 to indicate |
| no limit. |
| """ |
| self.set_state('__monitor_disk', max_rate) |
| self.max_disk_usage_rate = max_rate |
| |
| |
| def relative_path(self, path): |
| """\ |
| Return a patch relative to the job results directory |
| """ |
| head = len(self.resultdir) + 1 # remove the / inbetween |
| return path[head:] |
| |
| |
| def control_get(self): |
| return self.control |
| |
| |
| def control_set(self, control): |
| self.control = os.path.abspath(control) |
| |
| |
| def harness_select(self, which): |
| self.harness = harness.select(which, self) |
| |
| |
| def config_set(self, name, value): |
| self.config.set(name, value) |
| |
| |
| def config_get(self, name): |
| return self.config.get(name) |
| |
| |
| def setup_dirs(self, results_dir, tmp_dir): |
| if not tmp_dir: |
| tmp_dir = os.path.join(self.tmpdir, 'build') |
| if not os.path.exists(tmp_dir): |
| os.mkdir(tmp_dir) |
| if not os.path.isdir(tmp_dir): |
| e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir |
| raise ValueError(e_msg) |
| |
| # We label the first build "build" and then subsequent ones |
| # as "build.2", "build.3", etc. Whilst this is a little bit |
| # inconsistent, 99.9% of jobs will only have one build |
| # (that's not done as kernbench, sparse, or buildtest), |
| # so it works out much cleaner. One of life's comprimises. |
| if not results_dir: |
| results_dir = os.path.join(self.resultdir, 'build') |
| i = 2 |
| while os.path.exists(results_dir): |
| results_dir = os.path.join(self.resultdir, 'build.%d' % i) |
| i += 1 |
| if not os.path.exists(results_dir): |
| os.mkdir(results_dir) |
| |
| return (results_dir, tmp_dir) |
| |
| |
| def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ |
| kjob = None ): |
| """Summon a xen object""" |
| (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) |
| build_dir = 'xen' |
| return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, |
| leave, kjob) |
| |
| |
| def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): |
| """Summon a kernel object""" |
| (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) |
| build_dir = 'linux' |
| return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, |
| build_dir, leave) |
| |
| |
| def barrier(self, *args, **kwds): |
| """Create a barrier object""" |
| return barrier.barrier(*args, **kwds) |
| |
| |
| def install_pkg(self, name, pkg_type, install_dir): |
| ''' |
| This method is a simple wrapper around the actual package |
| installation method in the Packager class. This is used |
| internally by the profilers, deps and tests code. |
| name : name of the package (ex: sleeptest, dbench etc.) |
| pkg_type : Type of the package (ex: test, dep etc.) |
| install_dir : The directory in which the source is actually |
| untarred into. (ex: client/profilers/<name> for profilers) |
| ''' |
| if len(self.pkgmgr.repo_urls) > 0: |
| self.pkgmgr.install_pkg(name, pkg_type, |
| self.pkgdir, install_dir) |
| |
| |
| def add_repository(self, repo_urls): |
| ''' |
| Adds the repository locations to the job so that packages |
| can be fetched from them when needed. The repository list |
| needs to be a string list |
| Ex: job.add_repository(['http://blah1','http://blah2']) |
| ''' |
| # TODO(aganti): Validate the list of the repository URLs. |
| repositories = repo_urls + self.pkgmgr.repo_urls |
| self.pkgmgr = packages.PackageManager( |
| self.autodir, repo_urls=repositories, |
| run_function_dargs={'timeout':1800}) |
| # Fetch the packages' checksum file that contains the checksums |
| # of all the packages if it is not already fetched. The checksum |
| # is always fetched whenever a job is first started. This |
| # is not done in the job's constructor as we don't have the list of |
| # the repositories there (and obviously don't care about this file |
| # if we are not using the repos) |
| try: |
| checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, |
| packages.CHECKSUM_FILE) |
| self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, |
| use_checksum=False) |
| except packages.PackageFetchError, e: |
| # packaging system might not be working in this case |
| # Silently fall back to the normal case |
| pass |
| |
| |
| def require_gcc(self): |
| """ |
| Test whether gcc is installed on the machine. |
| """ |
| # check if gcc is installed on the system. |
| try: |
| utils.system('which gcc') |
| except error.CmdError, e: |
| raise NotAvailableError('gcc is required by this job and is ' |
| 'not available on the system') |
| |
| |
| def setup_dep(self, deps): |
| """Set up the dependencies for this test. |
| deps is a list of libraries required for this test. |
| """ |
| # Fetch the deps from the repositories and set them up. |
| for dep in deps: |
| dep_dir = os.path.join(self.autodir, 'deps', dep) |
| # Search for the dependency in the repositories if specified, |
| # else check locally. |
| try: |
| self.install_pkg(dep, 'dep', dep_dir) |
| except packages.PackageInstallError: |
| # see if the dep is there locally |
| pass |
| |
| # dep_dir might not exist if it is not fetched from the repos |
| if not os.path.exists(dep_dir): |
| raise error.TestError("Dependency %s does not exist" % dep) |
| |
| os.chdir(dep_dir) |
| utils.system('./' + dep + '.py') |
| |
| |
| def _runtest(self, url, tag, args, dargs): |
| try: |
| l = lambda : test.runtest(self, url, tag, args, dargs) |
| pid = parallel.fork_start(self.resultdir, l) |
| parallel.fork_waitfor(self.resultdir, pid) |
| except error.TestBaseException: |
| # These are already classified with an error type (exit_status) |
| raise |
| except error.JobError: |
| raise # Caught further up and turned into an ABORT. |
| except Exception, e: |
| # Converts all other exceptions thrown by the test regardless |
| # of phase into a TestError(TestBaseException) subclass that |
| # reports them with their full stack trace. |
| raise error.UnhandledTestError(e) |
| |
| |
| @_run_test_complete_on_exit |
| def run_test(self, url, *args, **dargs): |
| """Summon a test object and run it. |
| |
| tag |
| tag to add to testname |
| url |
| url of the test to run |
| """ |
| |
| if not url: |
| raise TypeError("Test name is invalid. " |
| "Switched arguments?") |
| (group, testname) = self.pkgmgr.get_package_name(url, 'test') |
| namelen = len(testname) |
| dargs = dargs.copy() |
| tntag = dargs.pop('tag', None) |
| if tntag: # per-test tag is included in reported test name |
| testname += '.' + str(tntag) |
| run_number = self.get_run_number() |
| if run_number: |
| testname += '._%02d_' % run_number |
| self.set_run_number(run_number + 1) |
| if self._is_kernel_in_test_tag(): |
| testname += '.' + platform.release() |
| if self._test_tag_prefix: |
| testname += '.' + self._test_tag_prefix |
| if self.testtag: # job-level tag is included in reported test name |
| testname += '.' + self.testtag |
| subdir = testname |
| sdtag = dargs.pop('subdir_tag', None) |
| if sdtag: # subdir-only tag is not included in reports |
| subdir = subdir + '.' + str(sdtag) |
| tag = subdir[namelen+1:] # '' if none |
| |
| outputdir = os.path.join(self.resultdir, subdir) |
| if os.path.exists(outputdir): |
| msg = ("%s already exists, test <%s> may have" |
| " already run with tag <%s>" |
| % (outputdir, testname, tag) ) |
| raise error.TestError(msg) |
| # NOTE: client/common_lib/test.py runtest() depends directory names |
| # being constructed the same way as in this code. |
| os.mkdir(outputdir) |
| |
| def log_warning(reason): |
| self.record("WARN", subdir, testname, reason) |
| @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) |
| def group_func(): |
| try: |
| self._runtest(url, tag, args, dargs) |
| except error.TestBaseException, detail: |
| # The error is already classified, record it properly. |
| self.record(detail.exit_status, subdir, testname, str(detail)) |
| raise |
| else: |
| self.record('GOOD', subdir, testname, 'completed successfully') |
| |
| try: |
| self._rungroup(subdir, testname, group_func) |
| return True |
| except error.TestBaseException: |
| return False |
| # Any other exception here will be given to the caller |
| # |
| # NOTE: The only exception possible from the control file here |
| # is error.JobError as _runtest() turns all others into an |
| # UnhandledTestError that is caught above. |
| |
| |
| def _rungroup(self, subdir, testname, function, *args, **dargs): |
| """\ |
| subdir: |
| name of the group |
| testname: |
| name of the test to run, or support step |
| function: |
| subroutine to run |
| *args: |
| arguments for the function |
| |
| Returns the result of the passed in function |
| """ |
| |
| try: |
| self.record('START', subdir, testname) |
| self._increment_group_level() |
| result = function(*args, **dargs) |
| self._decrement_group_level() |
| self.record('END GOOD', subdir, testname) |
| return result |
| except error.TestBaseException, e: |
| self._decrement_group_level() |
| self.record('END %s' % e.exit_status, subdir, testname) |
| raise |
| except error.JobError, e: |
| self._decrement_group_level() |
| self.record('END ABORT', subdir, testname) |
| raise |
| except Exception, e: |
| # This should only ever happen due to a bug in the given |
| # function's code. The common case of being called by |
| # run_test() will never reach this. If a control file called |
| # run_group() itself, bugs in its function will be caught |
| # here. |
| self._decrement_group_level() |
| err_msg = str(e) + '\n' + traceback.format_exc() |
| self.record('END ERROR', subdir, testname, err_msg) |
| raise |
| |
| |
| def run_group(self, function, tag=None, **dargs): |
| """ |
| Run a function nested within a group level. |
| |
| function: |
| Callable to run. |
| tag: |
| An optional tag name for the group. If None (default) |
| function.__name__ will be used. |
| **dargs: |
| Named arguments for the function. |
| """ |
| if tag: |
| name = tag |
| else: |
| name = function.__name__ |
| |
| try: |
| return self._rungroup(subdir=None, testname=name, |
| function=function, **dargs) |
| except (SystemExit, error.TestBaseException): |
| raise |
| # If there was a different exception, turn it into a TestError. |
| # It will be caught by step_engine or _run_step_fn. |
| except Exception, e: |
| raise error.UnhandledTestError(e) |
| |
| |
| _RUN_NUMBER_STATE = '__run_number' |
| def get_run_number(self): |
| """Get the run number or 0 if no run number has been set.""" |
| return self.get_state(self._RUN_NUMBER_STATE, default=0) |
| |
| |
| def set_run_number(self, number): |
| """If the run number is non-zero it will be in the output dir name.""" |
| self.set_state(self._RUN_NUMBER_STATE, number) |
| |
| |
| _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' |
| def _is_kernel_in_test_tag(self): |
| """Boolean: should the kernel version be included in the test tag.""" |
| return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) |
| |
| |
| def show_kernel_in_test_tag(self, value=True): |
| """If True, the kernel version at test time will prefix test tags.""" |
| self.set_state(self._KERNEL_IN_TAG_STATE, value) |
| |
| |
| def set_test_tag(self, tag=''): |
| """Set tag to be added to test name of all following run_test steps.""" |
| self.testtag = tag |
| |
| |
| def set_test_tag_prefix(self, prefix): |
| """Set a prefix string to prepend to all future run_test steps. |
| |
| Args: |
| prefix: A string to prepend to any run_test() step tags separated by |
| a '.'; use the empty string '' to clear it. |
| """ |
| self._test_tag_prefix = prefix |
| |
| |
| def cpu_count(self): |
| return utils.count_cpus() # use total system count |
| |
| |
| def start_reboot(self): |
| self.record('START', None, 'reboot') |
| self._increment_group_level() |
| self.record('GOOD', None, 'reboot.start') |
| |
| |
| def end_reboot(self, subdir, kernel, patches): |
| kernel_info = {"kernel": kernel} |
| for i, patch in enumerate(patches): |
| kernel_info["patch%d" % i] = patch |
| self._decrement_group_level() |
| self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) |
| |
| |
| def end_reboot_and_verify(self, expected_when, expected_id, subdir, |
| type='src', patches=[]): |
| """ Check the passed kernel identifier against the command line |
| and the running kernel, abort the job on missmatch. """ |
| |
| print (("POST BOOT: checking booted kernel " + |
| "mark=%d identity='%s' type='%s'") % |
| (expected_when, expected_id, type)) |
| |
| running_id = utils.running_os_ident() |
| |
| cmdline = utils.read_one_line("/proc/cmdline") |
| |
| find_sum = re.compile(r'.*IDENT=(\d+)') |
| m = find_sum.match(cmdline) |
| cmdline_when = -1 |
| if m: |
| cmdline_when = int(m.groups()[0]) |
| |
| # We have all the facts, see if they indicate we |
| # booted the requested kernel or not. |
| bad = False |
| if (type == 'src' and expected_id != running_id or |
| type == 'rpm' and |
| not running_id.startswith(expected_id + '::')): |
| print "check_kernel_ident: kernel identifier mismatch" |
| bad = True |
| if expected_when != cmdline_when: |
| print "check_kernel_ident: kernel command line mismatch" |
| bad = True |
| |
| if bad: |
| print " Expected Ident: " + expected_id |
| print " Running Ident: " + running_id |
| print " Expected Mark: %d" % (expected_when) |
| print "Command Line Mark: %d" % (cmdline_when) |
| print " Command Line: " + cmdline |
| |
| self.record("ABORT", subdir, "reboot.verify", "boot failure") |
| self._decrement_group_level() |
| kernel = {"kernel": running_id.split("::")[0]} |
| self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) |
| raise error.JobError("reboot returned with the wrong kernel") |
| |
| self.record('GOOD', subdir, 'reboot.verify', |
| utils.running_os_full_version()) |
| self.end_reboot(subdir, expected_id, patches) |
| |
| |
| def partition(self, device, loop_size=0, mountpoint=None): |
| """ |
| Work with a machine partition |
| |
| @param device: e.g. /dev/sda2, /dev/sdb1 etc... |
| @param mountpoint: Specify a directory to mount to. If not specified |
| autotest tmp directory will be used. |
| @param loop_size: Size of loopback device (in MB). Defaults to 0. |
| |
| @return: A L{client.bin.partition.partition} object |
| """ |
| |
| if not mountpoint: |
| mountpoint = self.tmpdir |
| return partition_lib.partition(self, device, loop_size, mountpoint) |
| |
| @utils.deprecated |
| def filesystem(self, device, mountpoint=None, loop_size=0): |
| """ Same as partition |
| |
| @deprecated: Use partition method instead |
| """ |
| return self.partition(device, loop_size, mountpoint) |
| |
| |
| def enable_external_logging(self): |
| pass |
| |
| |
| def disable_external_logging(self): |
| pass |
| |
| |
| def enable_test_cleanup(self): |
| """ By default tests run test.cleanup """ |
| self.set_state("__run_test_cleanup", True) |
| self.run_test_cleanup = True |
| |
| |
| def disable_test_cleanup(self): |
| """ By default tests do not run test.cleanup """ |
| self.set_state("__run_test_cleanup", False) |
| self.run_test_cleanup = False |
| |
| |
| def default_test_cleanup(self, val): |
| if not self._is_continuation: |
| self.set_state("__run_test_cleanup", val) |
| self.run_test_cleanup = val |
| |
| |
| def default_boot_tag(self, tag): |
| if not self._is_continuation: |
| self.set_state("__last_boot_tag", tag) |
| self.last_boot_tag = tag |
| |
| |
| def reboot_setup(self): |
| pass |
| |
| |
| def reboot(self, tag=LAST_BOOT_TAG): |
| if tag == LAST_BOOT_TAG: |
| tag = self.last_boot_tag |
| else: |
| self.set_state("__last_boot_tag", tag) |
| self.last_boot_tag = tag |
| |
| self.reboot_setup() |
| self.harness.run_reboot() |
| default = self.config_get('boot.set_default') |
| if default: |
| self.bootloader.set_default(tag) |
| else: |
| self.bootloader.boot_once(tag) |
| |
| # HACK: using this as a module sometimes hangs shutdown, so if it's |
| # installed unload it first |
| utils.system("modprobe -r netconsole", ignore_status=True) |
| |
| # sync first, so that a sync during shutdown doesn't time out |
| utils.system("sync; sync", ignore_status=True) |
| |
| utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") |
| self.quit() |
| |
| |
| def noop(self, text): |
| print "job: noop: " + text |
| |
| |
| @_run_test_complete_on_exit |
| def parallel(self, *tasklist): |
| """Run tasks in parallel""" |
| |
| pids = [] |
| old_log_filename = self.log_filename |
| for i, task in enumerate(tasklist): |
| assert isinstance(task, (tuple, list)) |
| self.log_filename = old_log_filename + (".%d" % i) |
| task_func = lambda: task[0](*task[1:]) |
| pids.append(parallel.fork_start(self.resultdir, task_func)) |
| |
| old_log_path = os.path.join(self.resultdir, old_log_filename) |
| old_log = open(old_log_path, "a") |
| exceptions = [] |
| for i, pid in enumerate(pids): |
| # wait for the task to finish |
| try: |
| parallel.fork_waitfor(self.resultdir, pid) |
| except Exception, e: |
| exceptions.append(e) |
| # copy the logs from the subtask into the main log |
| new_log_path = old_log_path + (".%d" % i) |
| if os.path.exists(new_log_path): |
| new_log = open(new_log_path) |
| old_log.write(new_log.read()) |
| new_log.close() |
| old_log.flush() |
| os.remove(new_log_path) |
| old_log.close() |
| |
| self.log_filename = old_log_filename |
| |
| # handle any exceptions raised by the parallel tasks |
| if exceptions: |
| msg = "%d task(s) failed in job.parallel" % len(exceptions) |
| raise error.JobError(msg) |
| |
| |
| def quit(self): |
| # XXX: should have a better name. |
| self.harness.run_pause() |
| raise error.JobContinue("more to come") |
| |
| |
| def complete(self, status): |
| """Clean up and exit""" |
| # We are about to exit 'complete' so clean up the control file. |
| dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) |
| os.rename(self.state_file, dest) |
| |
| self.harness.run_complete() |
| self.disable_external_logging() |
| sys.exit(status) |
| |
| |
| def set_state(self, var, val): |
| # Deep copies make sure that the state can't be altered |
| # without it being re-written. Perf wise, deep copies |
| # are overshadowed by pickling/loading. |
| self.state[var] = copy.deepcopy(val) |
| outfile = open(self.state_file, 'w') |
| try: |
| pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) |
| finally: |
| outfile.close() |
| logging.debug("Persistent state variable %s now set to %r", var, val) |
| |
| |
| def _load_state(self): |
| if hasattr(self, 'state'): |
| raise RuntimeError('state already exists') |
| infile = None |
| try: |
| try: |
| infile = open(self.state_file, 'rb') |
| self.state = pickle.load(infile) |
| except IOError: |
| self.state = {} |
| initialize = True |
| finally: |
| if infile: |
| infile.close() |
| |
| initialize = '__steps' not in self.state |
| if not (self._is_continuation or initialize): |
| raise RuntimeError('Loaded state must contain __steps or be a ' |
| 'continuation.') |
| |
| if initialize: |
| logging.info('Initializing the state engine') |
| self.set_state('__steps', []) # writes pickle file |
| |
| |
| def get_state(self, var, default=NO_DEFAULT): |
| if var in self.state or default == NO_DEFAULT: |
| val = self.state[var] |
| else: |
| val = default |
| return copy.deepcopy(val) |
| |
| |
| def __create_step_tuple(self, fn, args, dargs): |
| # Legacy code passes in an array where the first arg is |
| # the function or its name. |
| if isinstance(fn, list): |
| assert(len(args) == 0) |
| assert(len(dargs) == 0) |
| args = fn[1:] |
| fn = fn[0] |
| # Pickling actual functions is hairy, thus we have to call |
| # them by name. Unfortunately, this means only functions |
| # defined globally can be used as a next step. |
| if callable(fn): |
| fn = fn.__name__ |
| if not isinstance(fn, types.StringTypes): |
| raise StepError("Next steps must be functions or " |
| "strings containing the function name") |
| ancestry = copy.copy(self.current_step_ancestry) |
| return (ancestry, fn, args, dargs) |
| |
| |
| def next_step_append(self, fn, *args, **dargs): |
| """Define the next step and place it at the end""" |
| steps = self.get_state('__steps') |
| steps.append(self.__create_step_tuple(fn, args, dargs)) |
| self.set_state('__steps', steps) |
| |
| |
| def next_step(self, fn, *args, **dargs): |
| """Create a new step and place it after any steps added |
| while running the current step but before any steps added in |
| previous steps""" |
| steps = self.get_state('__steps') |
| steps.insert(self.next_step_index, |
| self.__create_step_tuple(fn, args, dargs)) |
| self.next_step_index += 1 |
| self.set_state('__steps', steps) |
| |
| |
| def next_step_prepend(self, fn, *args, **dargs): |
| """Insert a new step, executing first""" |
| steps = self.get_state('__steps') |
| steps.insert(0, self.__create_step_tuple(fn, args, dargs)) |
| self.next_step_index += 1 |
| self.set_state('__steps', steps) |
| |
| |
| def _run_step_fn(self, local_vars, fn, args, dargs): |
| """Run a (step) function within the given context""" |
| |
| local_vars['__args'] = args |
| local_vars['__dargs'] = dargs |
| try: |
| exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) |
| return local_vars['__ret'] |
| except SystemExit: |
| raise # Send error.JobContinue and JobComplete on up to runjob. |
| except error.TestNAError, detail: |
| self.record(detail.exit_status, None, fn, str(detail)) |
| except Exception, detail: |
| raise error.UnhandledJobError(detail) |
| |
| |
| def _create_frame(self, global_vars, ancestry, fn_name): |
| """Set up the environment like it would have been when this |
| function was first defined. |
| |
| Child step engine 'implementations' must have 'return locals()' |
| at end end of their steps. Because of this, we can call the |
| parent function and get back all child functions (i.e. those |
| defined within it). |
| |
| Unfortunately, the call stack of the function calling |
| job.next_step might have been deeper than the function it |
| added. In order to make sure that the environment is what it |
| should be, we need to then pop off the frames we built until |
| we find the frame where the function was first defined.""" |
| |
| # The copies ensure that the parent frames are not modified |
| # while building child frames. This matters if we then |
| # pop some frames in the next part of this function. |
| current_frame = copy.copy(global_vars) |
| frames = [current_frame] |
| for steps_fn_name in ancestry: |
| ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) |
| current_frame = copy.copy(ret) |
| frames.append(current_frame) |
| |
| # Walk up the stack frames until we find the place fn_name was defined. |
| while len(frames) > 2: |
| if fn_name not in frames[-2]: |
| break |
| if frames[-2][fn_name] != frames[-1][fn_name]: |
| break |
| frames.pop() |
| ancestry.pop() |
| |
| return (frames[-1], ancestry) |
| |
| |
| def _add_step_init(self, local_vars, current_function): |
| """If the function returned a dictionary that includes a |
| function named 'step_init', prepend it to our list of steps. |
| This will only get run the first time a function with a nested |
| use of the step engine is run.""" |
| |
| if (isinstance(local_vars, dict) and |
| 'step_init' in local_vars and |
| callable(local_vars['step_init'])): |
| # The init step is a child of the function |
| # we were just running. |
| self.current_step_ancestry.append(current_function) |
| self.next_step_prepend('step_init') |
| |
| |
| def step_engine(self): |
| """The multi-run engine used when the control file defines step_init. |
| |
| Does the next step. |
| """ |
| |
| # Set up the environment and then interpret the control file. |
| # Some control files will have code outside of functions, |
| # which means we need to have our state engine initialized |
| # before reading in the file. |
| global_control_vars = {'job': self} |
| exec(JOB_PREAMBLE, global_control_vars, global_control_vars) |
| try: |
| execfile(self.control, global_control_vars, global_control_vars) |
| except error.TestNAError, detail: |
| self.record(detail.exit_status, None, self.control, str(detail)) |
| except SystemExit: |
| raise # Send error.JobContinue and JobComplete on up to runjob. |
| except Exception, detail: |
| # Syntax errors or other general Python exceptions coming out of |
| # the top level of the control file itself go through here. |
| raise error.UnhandledJobError(detail) |
| |
| # If we loaded in a mid-job state file, then we presumably |
| # know what steps we have yet to run. |
| if not self._is_continuation: |
| if 'step_init' in global_control_vars: |
| self.next_step(global_control_vars['step_init']) |
| |
| # Iterate through the steps. If we reboot, we'll simply |
| # continue iterating on the next step. |
| while len(self.get_state('__steps')) > 0: |
| steps = self.get_state('__steps') |
| (ancestry, fn_name, args, dargs) = steps.pop(0) |
| self.set_state('__steps', steps) |
| |
| self.next_step_index = 0 |
| ret = self._create_frame(global_control_vars, ancestry, fn_name) |
| local_vars, self.current_step_ancestry = ret |
| local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) |
| self._add_step_init(local_vars, fn_name) |
| |
| |
| def add_sysinfo_command(self, command, logfile=None, on_every_test=False): |
| self._add_sysinfo_loggable(sysinfo.command(command, logfile), |
| on_every_test) |
| |
| |
| def add_sysinfo_logfile(self, file, on_every_test=False): |
| self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) |
| |
| |
| def _add_sysinfo_loggable(self, loggable, on_every_test): |
| if on_every_test: |
| self.sysinfo.test_loggables.add(loggable) |
| else: |
| self.sysinfo.boot_loggables.add(loggable) |
| self._save_sysinfo_state() |
| |
| |
| def _load_sysinfo_state(self): |
| state = self.get_state("__sysinfo", None) |
| if state: |
| self.sysinfo.deserialize(state) |
| |
| |
| def _save_sysinfo_state(self): |
| state = self.sysinfo.serialize() |
| self.set_state("__sysinfo", state) |
| |
| |
| def _init_group_level(self): |
| self.group_level = self.get_state("__group_level", default=0) |
| |
| |
| def _increment_group_level(self): |
| self.group_level += 1 |
| self.set_state("__group_level", self.group_level) |
| |
| |
| def _decrement_group_level(self): |
| self.group_level -= 1 |
| self.set_state("__group_level", self.group_level) |
| |
| |
| def record(self, status_code, subdir, operation, status = '', |
| optional_fields=None): |
| """ |
| Record job-level status |
| |
| The intent is to make this file both machine parseable and |
| human readable. That involves a little more complexity, but |
| really isn't all that bad ;-) |
| |
| Format is <status code>\t<subdir>\t<operation>\t<status> |
| |
| status code: (GOOD|WARN|FAIL|ABORT) |
| or START |
| or END (GOOD|WARN|FAIL|ABORT) |
| |
| subdir: MUST be a relevant subdirectory in the results, |
| or None, which will be represented as '----' |
| |
| operation: description of what you ran (e.g. "dbench", or |
| "mkfs -t foobar /dev/sda9") |
| |
| status: error message or "completed sucessfully" |
| |
| ------------------------------------------------------------ |
| |
| Initial tabs indicate indent levels for grouping, and is |
| governed by self.group_level |
| |
| multiline messages have secondary lines prefaced by a double |
| space (' ') |
| """ |
| |
| if subdir: |
| if re.match(r'[\n\t]', subdir): |
| raise ValueError("Invalid character in subdir string") |
| substr = subdir |
| else: |
| substr = '----' |
| |
| if not log.is_valid_status(status_code): |
| raise ValueError("Invalid status code supplied: %s" % status_code) |
| if not operation: |
| operation = '----' |
| |
| if re.match(r'[\n\t]', operation): |
| raise ValueError("Invalid character in operation string") |
| operation = operation.rstrip() |
| |
| if not optional_fields: |
| optional_fields = {} |
| |
| status = status.rstrip() |
| status = re.sub(r"\t", " ", status) |
| # Ensure any continuation lines are marked so we can |
| # detect them in the status file to ensure it is parsable. |
| status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) |
| |
| # Generate timestamps for inclusion in the logs |
| epoch_time = int(time.time()) # seconds since epoch, in UTC |
| local_time = time.localtime(epoch_time) |
| optional_fields["timestamp"] = str(epoch_time) |
| optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", |
| local_time) |
| |
| fields = [status_code, substr, operation] |
| fields += ["%s=%s" % x for x in optional_fields.iteritems()] |
| fields.append(status) |
| |
| msg = '\t'.join(str(x) for x in fields) |
| msg = '\t' * self.group_level + msg |
| |
| msg_tag = "" |
| if "." in self.log_filename: |
| msg_tag = self.log_filename.split(".", 1)[1] |
| |
| self.harness.test_status_detail(status_code, substr, operation, status, |
| msg_tag) |
| self.harness.test_status(msg, msg_tag) |
| |
| # log to stdout (if enabled) |
| #if self.log_filename == self.DEFAULT_LOG_FILENAME: |
| logging.info(msg) |
| |
| # log to the "root" status log |
| status_file = os.path.join(self.resultdir, self.log_filename) |
| open(status_file, "a").write(msg + "\n") |
| |
| # log to the subdir status log (if subdir is set) |
| if subdir: |
| dir = os.path.join(self.resultdir, subdir) |
| status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) |
| open(status_file, "a").write(msg + "\n") |
| |
| |
| class disk_usage_monitor: |
| def __init__(self, logging_func, device, max_mb_per_hour): |
| self.func = logging_func |
| self.device = device |
| self.max_mb_per_hour = max_mb_per_hour |
| |
| |
| def start(self): |
| self.initial_space = utils.freespace(self.device) |
| self.start_time = time.time() |
| |
| |
| def stop(self): |
| # if no maximum usage rate was set, we don't need to |
| # generate any warnings |
| if not self.max_mb_per_hour: |
| return |
| |
| final_space = utils.freespace(self.device) |
| used_space = self.initial_space - final_space |
| stop_time = time.time() |
| total_time = stop_time - self.start_time |
| # round up the time to one minute, to keep extremely short |
| # tests from generating false positives due to short, badly |
| # timed bursts of activity |
| total_time = max(total_time, 60.0) |
| |
| # determine the usage rate |
| bytes_per_sec = used_space / total_time |
| mb_per_sec = bytes_per_sec / 1024**2 |
| mb_per_hour = mb_per_sec * 60 * 60 |
| |
| if mb_per_hour > self.max_mb_per_hour: |
| msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") |
| msg %= (self.device, mb_per_hour) |
| self.func(msg) |
| |
| |
| @classmethod |
| def watch(cls, *monitor_args, **monitor_dargs): |
| """ Generic decorator to wrap a function call with the |
| standard create-monitor -> start -> call -> stop idiom.""" |
| def decorator(func): |
| def watched_func(*args, **dargs): |
| monitor = cls(*monitor_args, **monitor_dargs) |
| monitor.start() |
| try: |
| func(*args, **dargs) |
| finally: |
| monitor.stop() |
| return watched_func |
| return decorator |
| |
| |
| def runjob(control, cont=False, tag="default", harness_type='', |
| use_external_logging=False): |
| """ |
| Run a job using the given control file. |
| |
| This is the main interface to this module. |
| |
| Args: |
| control: The control file to use for this job. |
| cont: Whether this is the continuation of a previously started job. |
| tag: The job tag string. ['default'] |
| harness_type: An alternative server harness. [None] |
| use_external_logging: Should external logging be enabled? [False] |
| """ |
| control = os.path.abspath(control) |
| state = control + '.state' |
| |
| # instantiate the job object ready for the control file. |
| myjob = None |
| try: |
| # Check that the control file is valid |
| if not os.path.exists(control): |
| raise error.JobError(control + ": control file not found") |
| |
| # When continuing, the job is complete when there is no |
| # state file, ensure we don't try and continue. |
| if cont and not os.path.exists(state): |
| raise error.JobComplete("all done") |
| |
| myjob = job(control, tag, cont, harness_type=harness_type, |
| use_external_logging=use_external_logging) |
| |
| # Load in the users control file, may do any one of: |
| # 1) execute in toto |
| # 2) define steps, and select the first via next_step() |
| myjob.step_engine() |
| |
| except error.JobContinue: |
| sys.exit(5) |
| |
| except error.JobComplete: |
| sys.exit(1) |
| |
| except error.JobError, instance: |
| print "JOB ERROR: " + instance.args[0] |
| if myjob: |
| command = None |
| if len(instance.args) > 1: |
| command = instance.args[1] |
| myjob.record('ABORT', None, command, instance.args[0]) |
| myjob._decrement_group_level() |
| myjob.record('END ABORT', None, None, instance.args[0]) |
| assert (myjob.group_level == 0), ('myjob.group_level must be 0,' |
| ' not %d' % myjob.group_level) |
| myjob.complete(1) |
| else: |
| sys.exit(1) |
| |
| except Exception, e: |
| # NOTE: job._run_step_fn and job.step_engine will turn things into |
| # a JobError for us. If we get here, its likely an autotest bug. |
| msg = str(e) + '\n' + traceback.format_exc() |
| print "JOB ERROR (autotest bug?): " + msg |
| if myjob: |
| myjob._decrement_group_level() |
| myjob.record('END ABORT', None, None, msg) |
| assert(myjob.group_level == 0) |
| myjob.complete(1) |
| else: |
| sys.exit(1) |
| |
| # If we get here, then we assume the job is complete and good. |
| myjob._decrement_group_level() |
| myjob.record('END GOOD', None, None) |
| assert(myjob.group_level == 0) |
| |
| myjob.complete(0) |
| |
| |
| site_job = utils.import_site_class( |
| __file__, "autotest_lib.client.bin.site_job", "site_job", base_job) |
| |
| class job(site_job, base_job): |
| pass |