This depends on Martin's "run autoserv without a results dir" patch
to be applied cleanly.
A rather large refactoring and stubbing change working towards
client-side profilers from the server side. This makes a few major
changes:
- refactors almost all of client/bin/profilers.py into a common lib
profiler_manager class; except for the code for actually loading
a profiler object, all of that code is already completely generic
- add a server-side profiler_proxy class that will act as a proxy
on the server for using a client-side profiler, this doesn't
actually do anything useful right now but it basically just a
stub for later changes
- add a server-side profiler_manager implementation that creates a
profiler_proxy class instead of actually loading a real profiler
The intended changes still in the pipeline that will build on this are:
- add code to the profiler_proxy for actually making sure the
profiler is set up and installed on the remote host(s)
- add a mechanism for client-side profilers to deal with reboots
Risk: Medium
Visibility: Adds a bunch of stubs that don't actually do anything yet
but will do things soon.
Signed-off-by: John Admanski <[email protected]>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2447 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/autotest.py b/server/autotest.py
index 6910f99..ba45f86 100644
--- a/server/autotest.py
+++ b/server/autotest.py
@@ -1,7 +1,7 @@
# Copyright 2007 Google Inc. Released under the GPL v2
import re, os, sys, traceback, subprocess, tempfile, shutil, time, pickle
-from autotest_lib.server import installable_object, utils, server_job
+from autotest_lib.server import installable_object, utils
from autotest_lib.client.common_lib import log, error, debug
from autotest_lib.client.common_lib import global_config, packages
@@ -251,7 +251,7 @@
try:
atrun.execute_control(timeout=timeout)
finally:
- collector = server_job.log_collector(host, atrun.tag, results_dir)
+ collector = log_collector(host, atrun.tag, results_dir)
collector.collect_client_job_results()
self._process_client_state_file(host, atrun, results_dir)
@@ -436,8 +436,7 @@
section = 0
start_time = time.time()
- client_logger = server_job.client_logger(self.host, self.tag,
- self.results_dir)
+ logger = client_logger(self.host, self.tag, self.results_dir)
try:
while not timeout or time.time() < start_time + timeout:
if timeout:
@@ -445,7 +444,7 @@
else:
section_timeout = None
last = self.execute_section(section, section_timeout,
- client_logger)
+ logger)
section += 1
if re.match(r'^END .*\t----\t----\t.*$', last):
print "Client complete"
@@ -454,8 +453,8 @@
try:
self._wait_for_reboot()
except error.AutotestRunError, e:
- job.record("ABORT", None, "reboot", str(e))
- job.record("END ABORT", None, None, str(e))
+ self.host.job.record("ABORT", None, "reboot", str(e))
+ self.host.job.record("END ABORT", None, None, str(e))
raise
continue
@@ -469,7 +468,7 @@
"client: %s\n") % last
raise error.AutotestRunError(msg)
finally:
- client_logger.close()
+ logger.close()
# should only get here if we timed out
assert timeout
@@ -498,6 +497,248 @@
raise error.AutotestRunError("Cannot figure out autotest directory")
+class log_collector(object):
+ def __init__(self, host, client_tag, results_dir):
+ self.host = host
+ if not client_tag:
+ client_tag = "default"
+ self.client_results_dir = os.path.join(host.get_autodir(), "results",
+ client_tag)
+ self.server_results_dir = results_dir
+
+
+ def collect_client_job_results(self):
+ """ A method that collects all the current results of a running
+ client job into the results dir. By default does nothing as no
+ client job is running, but when running a client job you can override
+ this with something that will actually do something. """
+
+ # make an effort to wait for the machine to come up
+ try:
+ self.host.wait_up(timeout=30)
+ except error.AutoservError:
+ # don't worry about any errors, we'll try and
+ # get the results anyway
+ pass
+
+
+ # Copy all dirs in default to results_dir
+ try:
+ keyval_path = self._prepare_for_copying_logs()
+ self.host.get_file(self.client_results_dir + '/',
+ self.server_results_dir)
+ self._process_copied_logs(keyval_path)
+ self._postprocess_copied_logs()
+ except Exception:
+ # well, don't stop running just because we couldn't get logs
+ print "Unexpected error copying test result logs, continuing ..."
+ traceback.print_exc(file=sys.stdout)
+
+
+ def _prepare_for_copying_logs(self):
+ server_keyval = os.path.join(self.server_results_dir, 'keyval')
+ if not os.path.exists(server_keyval):
+ # Client-side keyval file can be copied directly
+ return
+
+ # Copy client-side keyval to temporary location
+ suffix = '.keyval_%s' % self.host.hostname
+ fd, keyval_path = tempfile.mkstemp(suffix)
+ os.close(fd)
+ try:
+ client_keyval = os.path.join(self.client_results_dir, 'keyval')
+ try:
+ self.host.get_file(client_keyval, keyval_path)
+ finally:
+ # We will squirrel away the client side keyval
+ # away and move it back when we are done
+ remote_temp_dir = self.host.get_tmp_dir()
+ self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
+ self.host.run('mv %s %s' % (client_keyval,
+ self.temp_keyval_path))
+ except (error.AutoservRunError, error.AutoservSSHTimeout):
+ print "Prepare for copying logs failed"
+ return keyval_path
+
+
+ def _process_copied_logs(self, keyval_path):
+ if not keyval_path:
+ # Client-side keyval file was copied directly
+ return
+
+ # Append contents of keyval_<host> file to keyval file
+ try:
+ # Read in new and old keyval files
+ new_keyval = utils.read_keyval(keyval_path)
+ old_keyval = utils.read_keyval(self.server_results_dir)
+ # 'Delete' from new keyval entries that are in both
+ tmp_keyval = {}
+ for key, val in new_keyval.iteritems():
+ if key not in old_keyval:
+ tmp_keyval[key] = val
+ # Append new info to keyval file
+ utils.write_keyval(self.server_results_dir, tmp_keyval)
+ # Delete keyval_<host> file
+ os.remove(keyval_path)
+ except IOError:
+ print "Process copied logs failed"
+
+
+ def _postprocess_copied_logs(self):
+ # we can now put our keyval file back
+ client_keyval = os.path.join(self.client_results_dir, 'keyval')
+ try:
+ self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
+ except Exception:
+ pass
+
+
+
+# a file-like object for catching stderr from an autotest client and
+# extracting status logs from it
+class client_logger(object):
+ """Partial file object to write to both stdout and
+ the status log file. We only implement those methods
+ utils.run() actually calls.
+
+ Note that this class is fairly closely coupled with server_job, as it
+ uses special job._ methods to actually carry out the loggging.
+ """
+ status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
+ test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
+ extract_indent = re.compile(r"^(\t*).*$")
+
+ def __init__(self, host, tag, server_results_dir):
+ self.host = host
+ self.job = host.job
+ self.log_collector = log_collector(host, tag, server_results_dir)
+ self.leftover = ""
+ self.last_line = ""
+ self.logs = {}
+
+
+ def _process_log_dict(self, log_dict):
+ log_list = log_dict.pop("logs", [])
+ for key in sorted(log_dict.iterkeys()):
+ log_list += self._process_log_dict(log_dict.pop(key))
+ return log_list
+
+
+ def _process_logs(self):
+ """Go through the accumulated logs in self.log and print them
+ out to stdout and the status log. Note that this processes
+ logs in an ordering where:
+
+ 1) logs to different tags are never interleaved
+ 2) logs to x.y come before logs to x.y.z for all z
+ 3) logs to x.y come before x.z whenever y < z
+
+ Note that this will in general not be the same as the
+ chronological ordering of the logs. However, if a chronological
+ ordering is desired that one can be reconstructed from the
+ status log by looking at timestamp lines."""
+ log_list = self._process_log_dict(self.logs)
+ for line in log_list:
+ self.job._record_prerendered(line + '\n')
+ if log_list:
+ self.last_line = log_list[-1]
+
+
+ def _process_quoted_line(self, tag, line):
+ """Process a line quoted with an AUTOTEST_STATUS flag. If the
+ tag is blank then we want to push out all the data we've been
+ building up in self.logs, and then the newest line. If the
+ tag is not blank, then push the line into the logs for handling
+ later."""
+ print line
+ if tag == "":
+ self._process_logs()
+ self.job._record_prerendered(line + '\n')
+ self.last_line = line
+ else:
+ tag_parts = [int(x) for x in tag.split(".")]
+ log_dict = self.logs
+ for part in tag_parts:
+ log_dict = log_dict.setdefault(part, {})
+ log_list = log_dict.setdefault("logs", [])
+ log_list.append(line)
+
+
+ def _process_line(self, line):
+ """Write out a line of data to the appropriate stream. Status
+ lines sent by autotest will be prepended with
+ "AUTOTEST_STATUS", and all other lines are ssh error
+ messages."""
+ status_match = self.status_parser.search(line)
+ test_complete_match = self.test_complete_parser.search(line)
+ if status_match:
+ tag, line = status_match.groups()
+ self._process_quoted_line(tag, line)
+ elif test_complete_match:
+ fifo_path, = test_complete_match.groups()
+ self.log_collector.collect_client_job_results()
+ self.host.run("echo A > %s" % fifo_path)
+ else:
+ print line
+
+
+ def _format_warnings(self, last_line, warnings):
+ # use the indentation of whatever the last log line was
+ indent = self.extract_indent.match(last_line).group(1)
+ # if the last line starts a new group, add an extra indent
+ if last_line.lstrip('\t').startswith("START\t"):
+ indent += '\t'
+ return [self.job._render_record("WARN", None, None, msg,
+ timestamp, indent).rstrip('\n')
+ for timestamp, msg in warnings]
+
+
+ def _process_warnings(self, last_line, log_dict, warnings):
+ if log_dict.keys() in ([], ["logs"]):
+ # there are no sub-jobs, just append the warnings here
+ warnings = self._format_warnings(last_line, warnings)
+ log_list = log_dict.setdefault("logs", [])
+ log_list += warnings
+ for warning in warnings:
+ sys.stdout.write(warning + '\n')
+ else:
+ # there are sub-jobs, so put the warnings in there
+ log_list = log_dict.get("logs", [])
+ if log_list:
+ last_line = log_list[-1]
+ for key in sorted(log_dict.iterkeys()):
+ if key != "logs":
+ self._process_warnings(last_line,
+ log_dict[key],
+ warnings)
+
+
+ def write(self, data):
+ # first check for any new console warnings
+ warnings = self.job._read_warnings()
+ self._process_warnings(self.last_line, self.logs, warnings)
+ # now process the newest data written out
+ data = self.leftover + data
+ lines = data.split("\n")
+ # process every line but the last one
+ for line in lines[:-1]:
+ self._process_line(line)
+ # save the last line for later processing
+ # since we may not have the whole line yet
+ self.leftover = lines[-1]
+
+
+ def flush(self):
+ sys.stdout.flush()
+
+
+ def close(self):
+ if self.leftover:
+ self._process_line(self.leftover)
+ self._process_logs()
+ self.flush()
+
+
# site_autotest.py may be non-existant or empty, make sure that an appropriate
# SiteAutotest class is created nevertheless
try:
diff --git a/server/autotest_unittest.py b/server/autotest_unittest.py
index c7247ff..619106d 100644
--- a/server/autotest_unittest.py
+++ b/server/autotest_unittest.py
@@ -46,7 +46,7 @@
self.god.stub_function(autotest.global_config.global_config,
"get_config_value")
self.god.stub_class(autotest, "_Run")
- self.god.stub_class(server_job, "log_collector")
+ self.god.stub_class(autotest, "log_collector")
def tearDown(self):
@@ -190,7 +190,7 @@
os.path.abspath.expect_call('control').and_return('control')
os.remove.expect_call("temp")
run_obj.execute_control.expect_call(timeout=30)
- collector = server_job.log_collector.expect_new(self.host, tag, '.')
+ collector = autotest.log_collector.expect_new(self.host, tag, '.')
collector.collect_client_job_results.expect_call()
autotest.open.expect_call('./control.None.autoserv.state').and_raises(
@@ -203,5 +203,63 @@
self.god.check_playback()
+class CopyLogsTest(unittest.TestCase):
+ def setUp(self):
+ self.god = mock.mock_god()
+
+ self.host = self.god.create_mock_class(hosts.RemoteHost, "host")
+ self.host.hostname = "testhost"
+
+ self.god.stub_function(os.path, "exists")
+ self.god.stub_function(os, "close")
+ self.god.stub_function(os, "remove")
+ self.god.stub_function(tempfile, "mkstemp")
+ self.god.stub_function(utils, "read_keyval")
+ self.god.stub_function(utils, "write_keyval")
+
+
+ def tearDown(self):
+ self.god.unstub_all()
+
+
+ def test_prepare_for_copying_logs(self):
+ self.host.get_autodir.expect_call().and_return("/autodir")
+ collector = autotest.log_collector(self.host, None, "/resultsdir")
+ self.god.check_playback()
+
+ os.path.exists.expect_call("/resultsdir/keyval").and_return(True)
+ tempfile.mkstemp.expect_call(".keyval_testhost").and_return(
+ (10, "tmp.keyval_testhost"))
+ os.close.expect_call(10)
+ self.host.get_file.expect_call("/autodir/results/default/keyval",
+ "tmp.keyval_testhost")
+ self.host.get_tmp_dir.expect_call().and_return("/autotmp")
+ self.host.run.expect_call(
+ "mv /autodir/results/default/keyval /autotmp/keyval")
+
+ # run and check
+ keyval = collector._prepare_for_copying_logs()
+ self.assertEquals(keyval, "tmp.keyval_testhost")
+ self.god.check_playback()
+
+
+ def test_process_copied_logs(self):
+ self.host.get_autodir.expect_call().and_return("/autodir")
+ collector = autotest.log_collector(self.host, None, "/resultsdir")
+ self.god.check_playback()
+
+ utils.read_keyval.expect_call("tmp.keyval_testhost").and_return(
+ {"field1": "new thing", "field3": "other new thing"})
+ utils.read_keyval.expect_call("/resultsdir").and_return(
+ {"field1": "thing", "field2": "otherthing"})
+ utils.write_keyval.expect_call("/resultsdir",
+ {"field3": "other new thing"})
+ os.remove.expect_call("tmp.keyval_testhost")
+
+ # run and check
+ collector._process_copied_logs("tmp.keyval_testhost")
+ self.god.check_playback()
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/server/profiler.py b/server/profiler.py
new file mode 100644
index 0000000..a23c655
--- /dev/null
+++ b/server/profiler.py
@@ -0,0 +1,84 @@
+import itertools
+from autotest_lib.server import autotest
+
+
+
+def get_unpassable_types(arg):
+ """ Given an argument, returns a set of types contained in arg that are
+ unpassable. If arg is an atomic type (e.g. int) it either returns an
+ empty set (if the type is passable) or a singleton of the type (if the
+ type is not passable). """
+ if isinstance(arg, (basestring, int, long)):
+ return set()
+ elif isinstance(arg, (list, tuple, set, frozenset, dict)):
+ if isinstance(arg, dict):
+ # keys and values must both be passable
+ parts = itertools.chain(arg.iterkeys(), arg.itervalues())
+ else:
+ # for all other containers we just iterate
+ parts = iter(arg)
+ types = set()
+ for part in parts:
+ types |= get_unpassable_types(arg)
+ return types
+ else:
+ return set([type(arg)])
+
+
+def validate_args(args):
+ """ Validates arguments. Lists and dictionaries are valid argument types,
+ so you can pass *args and **dargs in directly, rather than having to
+ iterate over them yourself. """
+ unpassable_types = get_unpassable_types(args)
+ if unpassable_types:
+ msg = "arguments of type '%s' cannot be passed to remote profilers"
+ msg %= ", ".join(t.__name__ for t in unpassable_types)
+ raise TypeError(msg)
+
+
+class profiler_proxy(object):
+ """ This is a server-side class that acts as a proxy to a real client-side
+ profiler class."""
+
+ def __init__(self, job, profiler_name):
+ self.job = job
+ self.name = profiler_name
+ self.installed_hosts = {}
+
+
+ def _install(self):
+ """ Install autotest on any current job hosts. """
+ current_job_hosts = self.job.hosts
+ current_profiler_hosts = set(self.installed_hosts.keys())
+ # install autotest on any new hosts in job.hosts
+ for host in current_job_hosts - current_profiler_hosts:
+ tmp_dir = host.get_tmp_dir(parent="/tmp/profilers")
+ at = autotest.Autotest(host)
+ at.install(autodir=tmp_dir)
+ self.installed_hosts[host] = at
+ # drop any installs from hosts no longer in job.hosts
+ for host in current_profiler_hosts - current_job_hosts:
+ del self.installed_hosts[host]
+
+
+ def setup(self, *args, **dargs):
+ validate_args(args)
+ validate_args(dargs)
+ self._install()
+
+
+ def initialize(self, *args, **dargs):
+ validate_args(args)
+ validate_args(dargs)
+
+
+ def start(self, test):
+ pass
+
+
+ def stop(self, test):
+ pass
+
+
+ def report(self, test):
+ pass
diff --git a/server/profilers.py b/server/profilers.py
new file mode 100644
index 0000000..62122ae
--- /dev/null
+++ b/server/profilers.py
@@ -0,0 +1,13 @@
+import os, sys
+import common
+
+from autotest_lib.client.common_lib import utils, packages, profiler_manager
+from autotest_lib.server import profiler
+
+
+class profilers(profiler_manager.profiler_manager):
+ def load_profiler(self, profiler_name, args, dargs):
+ newprofiler = profiler.profiler_proxy(self.job, profiler_name)
+ newprofiler.initialize(*args, **dargs)
+ newprofiler.setup(*args, **dargs) # lazy setup is done client-side
+ return newprofiler
diff --git a/server/server_job.py b/server/server_job.py
index cf404ad..9ddf7b9 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -10,7 +10,7 @@
import shutil, warnings
from autotest_lib.client.bin import fd_stack, sysinfo
from autotest_lib.client.common_lib import error, log, utils, packages
-from autotest_lib.server import test, subcommand
+from autotest_lib.server import test, subcommand, profilers
from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
@@ -128,6 +128,7 @@
if resultdir:
self.sysinfo = sysinfo.sysinfo(self.resultdir)
+ self.profilers = profilers.profilers(self)
if not os.access(self.tmpdir, os.W_OK):
try:
@@ -802,245 +803,6 @@
print >> sys.stderr, msg
-
-class log_collector(object):
- def __init__(self, host, client_tag, results_dir):
- self.host = host
- if not client_tag:
- client_tag = "default"
- self.client_results_dir = os.path.join(host.get_autodir(), "results",
- client_tag)
- self.server_results_dir = results_dir
-
-
- def collect_client_job_results(self):
- """ A method that collects all the current results of a running
- client job into the results dir. By default does nothing as no
- client job is running, but when running a client job you can override
- this with something that will actually do something. """
-
- # make an effort to wait for the machine to come up
- try:
- self.host.wait_up(timeout=30)
- except error.AutoservError:
- # don't worry about any errors, we'll try and
- # get the results anyway
- pass
-
-
- # Copy all dirs in default to results_dir
- try:
- keyval_path = self._prepare_for_copying_logs()
- self.host.get_file(self.client_results_dir + '/',
- self.server_results_dir)
- self._process_copied_logs(keyval_path)
- self._postprocess_copied_logs()
- except Exception:
- # well, don't stop running just because we couldn't get logs
- print "Unexpected error copying test result logs, continuing ..."
- traceback.print_exc(file=sys.stdout)
-
-
- def _prepare_for_copying_logs(self):
- server_keyval = os.path.join(self.server_results_dir, 'keyval')
- if not os.path.exists(server_keyval):
- # Client-side keyval file can be copied directly
- return
-
- # Copy client-side keyval to temporary location
- suffix = '.keyval_%s' % self.host.hostname
- fd, keyval_path = tempfile.mkstemp(suffix)
- os.close(fd)
- try:
- client_keyval = os.path.join(self.client_results_dir, 'keyval')
- try:
- self.host.get_file(client_keyval, keyval_path)
- finally:
- # We will squirrel away the client side keyval
- # away and move it back when we are done
- remote_temp_dir = self.host.get_tmp_dir()
- self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
- self.host.run('mv %s %s' % (client_keyval,
- self.temp_keyval_path))
- except (error.AutoservRunError, error.AutoservSSHTimeout):
- print "Prepare for copying logs failed"
- return keyval_path
-
-
- def _process_copied_logs(self, keyval_path):
- if not keyval_path:
- # Client-side keyval file was copied directly
- return
-
- # Append contents of keyval_<host> file to keyval file
- try:
- # Read in new and old keyval files
- new_keyval = utils.read_keyval(keyval_path)
- old_keyval = utils.read_keyval(self.server_results_dir)
- # 'Delete' from new keyval entries that are in both
- tmp_keyval = {}
- for key, val in new_keyval.iteritems():
- if key not in old_keyval:
- tmp_keyval[key] = val
- # Append new info to keyval file
- utils.write_keyval(self.server_results_dir, tmp_keyval)
- # Delete keyval_<host> file
- os.remove(keyval_path)
- except IOError:
- print "Process copied logs failed"
-
-
- def _postprocess_copied_logs(self):
- # we can now put our keyval file back
- client_keyval = os.path.join(self.client_results_dir, 'keyval')
- try:
- self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
- except Exception:
- pass
-
-
-# a file-like object for catching stderr from an autotest client and
-# extracting status logs from it
-class client_logger(object):
- """Partial file object to write to both stdout and
- the status log file. We only implement those methods
- utils.run() actually calls.
- """
- status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
- test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
- extract_indent = re.compile(r"^(\t*).*$")
-
- def __init__(self, host, tag, server_results_dir):
- self.host = host
- self.job = host.job
- self.log_collector = log_collector(host, tag, server_results_dir)
- self.leftover = ""
- self.last_line = ""
- self.logs = {}
-
-
- def _process_log_dict(self, log_dict):
- log_list = log_dict.pop("logs", [])
- for key in sorted(log_dict.iterkeys()):
- log_list += self._process_log_dict(log_dict.pop(key))
- return log_list
-
-
- def _process_logs(self):
- """Go through the accumulated logs in self.log and print them
- out to stdout and the status log. Note that this processes
- logs in an ordering where:
-
- 1) logs to different tags are never interleaved
- 2) logs to x.y come before logs to x.y.z for all z
- 3) logs to x.y come before x.z whenever y < z
-
- Note that this will in general not be the same as the
- chronological ordering of the logs. However, if a chronological
- ordering is desired that one can be reconstructed from the
- status log by looking at timestamp lines."""
- log_list = self._process_log_dict(self.logs)
- for line in log_list:
- self.job._record_prerendered(line + '\n')
- if log_list:
- self.last_line = log_list[-1]
-
-
- def _process_quoted_line(self, tag, line):
- """Process a line quoted with an AUTOTEST_STATUS flag. If the
- tag is blank then we want to push out all the data we've been
- building up in self.logs, and then the newest line. If the
- tag is not blank, then push the line into the logs for handling
- later."""
- print line
- if tag == "":
- self._process_logs()
- self.job._record_prerendered(line + '\n')
- self.last_line = line
- else:
- tag_parts = [int(x) for x in tag.split(".")]
- log_dict = self.logs
- for part in tag_parts:
- log_dict = log_dict.setdefault(part, {})
- log_list = log_dict.setdefault("logs", [])
- log_list.append(line)
-
-
- def _process_line(self, line):
- """Write out a line of data to the appropriate stream. Status
- lines sent by autotest will be prepended with
- "AUTOTEST_STATUS", and all other lines are ssh error
- messages."""
- status_match = self.status_parser.search(line)
- test_complete_match = self.test_complete_parser.search(line)
- if status_match:
- tag, line = status_match.groups()
- self._process_quoted_line(tag, line)
- elif test_complete_match:
- fifo_path, = test_complete_match.groups()
- self.log_collector.collect_client_job_results()
- self.host.run("echo A > %s" % fifo_path)
- else:
- print line
-
-
- def _format_warnings(self, last_line, warnings):
- # use the indentation of whatever the last log line was
- indent = self.extract_indent.match(last_line).group(1)
- # if the last line starts a new group, add an extra indent
- if last_line.lstrip('\t').startswith("START\t"):
- indent += '\t'
- return [self.job._render_record("WARN", None, None, msg,
- timestamp, indent).rstrip('\n')
- for timestamp, msg in warnings]
-
-
- def _process_warnings(self, last_line, log_dict, warnings):
- if log_dict.keys() in ([], ["logs"]):
- # there are no sub-jobs, just append the warnings here
- warnings = self._format_warnings(last_line, warnings)
- log_list = log_dict.setdefault("logs", [])
- log_list += warnings
- for warning in warnings:
- sys.stdout.write(warning + '\n')
- else:
- # there are sub-jobs, so put the warnings in there
- log_list = log_dict.get("logs", [])
- if log_list:
- last_line = log_list[-1]
- for key in sorted(log_dict.iterkeys()):
- if key != "logs":
- self._process_warnings(last_line,
- log_dict[key],
- warnings)
-
-
- def write(self, data):
- # first check for any new console warnings
- warnings = self.job._read_warnings()
- self._process_warnings(self.last_line, self.logs, warnings)
- # now process the newest data written out
- data = self.leftover + data
- lines = data.split("\n")
- # process every line but the last one
- for line in lines[:-1]:
- self._process_line(line)
- # save the last line for later processing
- # since we may not have the whole line yet
- self.leftover = lines[-1]
-
-
- def flush(self):
- sys.stdout.flush()
-
-
- def close(self):
- if self.leftover:
- self._process_line(self.leftover)
- self._process_logs()
- self.flush()
-
-
# site_server_job.py may be non-existant or empty, make sure that an
# appropriate site_server_job class is created nevertheless
try:
diff --git a/server/server_job_unittest.py b/server/server_job_unittest.py
index 96af5c6..15f952d 100644
--- a/server/server_job_unittest.py
+++ b/server/server_job_unittest.py
@@ -198,10 +198,6 @@
'ssh_user' : self.job.ssh_user, \
'ssh_port' : self.job.ssh_port, \
'ssh_pass' : self.job.ssh_pass}
- os.path.exists.expect_call(
- server_job.SITE_VERIFY_CONTROL_FILE).and_return(True)
- self.job._execute_code.expect_call(server_job.SITE_VERIFY_CONTROL_FILE,
- namespace, protect=False)
self.job._execute_code.expect_call(server_job.VERIFY_CONTROL_FILE,
namespace, protect=False)
@@ -221,17 +217,9 @@
repair_namespace = verify_namespace.copy()
repair_namespace['protection_level'] = host_protections.default
- os.path.exists.expect_call(
- server_job.SITE_REPAIR_CONTROL_FILE).and_return(True)
- self.job._execute_code.expect_call(server_job.SITE_REPAIR_CONTROL_FILE,
- repair_namespace, protect=False)
self.job._execute_code.expect_call(server_job.REPAIR_CONTROL_FILE,
repair_namespace, protect=False)
- os.path.exists.expect_call(
- server_job.SITE_VERIFY_CONTROL_FILE).and_return(True)
- self.job._execute_code.expect_call(server_job.SITE_VERIFY_CONTROL_FILE,
- verify_namespace, protect=False)
self.job._execute_code.expect_call(server_job.VERIFY_CONTROL_FILE,
verify_namespace, protect=False)
@@ -450,63 +438,5 @@
self.god.check_playback()
-class CopyLogsTest(unittest.TestCase):
- def setUp(self):
- self.god = mock.mock_god()
-
- self.host = self.god.create_mock_class(hosts.RemoteHost, "host")
- self.host.hostname = "testhost"
-
- self.god.stub_function(os.path, "exists")
- self.god.stub_function(os, "close")
- self.god.stub_function(os, "remove")
- self.god.stub_function(tempfile, "mkstemp")
- self.god.stub_function(utils, "read_keyval")
- self.god.stub_function(utils, "write_keyval")
-
-
- def tearDown(self):
- self.god.unstub_all()
-
-
- def test_prepare_for_copying_logs(self):
- self.host.get_autodir.expect_call().and_return("/autodir")
- collector = server_job.log_collector(self.host, None, "/resultsdir")
- self.god.check_playback()
-
- os.path.exists.expect_call("/resultsdir/keyval").and_return(True)
- tempfile.mkstemp.expect_call(".keyval_testhost").and_return(
- (10, "tmp.keyval_testhost"))
- os.close.expect_call(10)
- self.host.get_file.expect_call("/autodir/results/default/keyval",
- "tmp.keyval_testhost")
- self.host.get_tmp_dir.expect_call().and_return("/autotmp")
- self.host.run.expect_call(
- "mv /autodir/results/default/keyval /autotmp/keyval")
-
- # run and check
- keyval = collector._prepare_for_copying_logs()
- self.assertEquals(keyval, "tmp.keyval_testhost")
- self.god.check_playback()
-
-
- def test_process_copied_logs(self):
- self.host.get_autodir.expect_call().and_return("/autodir")
- collector = server_job.log_collector(self.host, None, "/resultsdir")
- self.god.check_playback()
-
- utils.read_keyval.expect_call("tmp.keyval_testhost").and_return(
- {"field1": "new thing", "field3": "other new thing"})
- utils.read_keyval.expect_call("/resultsdir").and_return(
- {"field1": "thing", "field2": "otherthing"})
- utils.write_keyval.expect_call("/resultsdir",
- {"field3": "other new thing"})
- os.remove.expect_call("tmp.keyval_testhost")
-
- # run and check
- collector._process_copied_logs("tmp.keyval_testhost")
- self.god.check_playback()
-
-
if __name__ == "__main__":
unittest.main()