blob: 4fbb36acb1b03877206b57ac3b2c853a1a8a7918 [file] [log] [blame]
#!/usr/bin/python
"""Summarize the results of many RAPPOR analysis runs.
Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
and log.txt files. Writes a CSV to stdout. Row key is (metric, date).
"""
import collections
import csv
import json
import os
import re
import sys
# Parse bash 'time' output:
# real 0m11.578s
# TODO: Parse the time from metrics.json instead.
TIMING_RE = re.compile(
r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
# TODO: Could have decode-dist and decode-assoc output the PID?
PID_RE = re.compile(
r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal
def ParseMemCsv(f):
"""Compute summary stats for memory.
vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses
the kernel, it's accurate except for takes that spike in their last 4
seconds.
vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals
"""
peak_by_pid = collections.defaultdict(list)
size_by_pid = collections.defaultdict(list)
# Parse columns we care about, by PID
c = csv.reader(f)
for i, row in enumerate(c):
if i == 0:
continue # skip header
# looks like timestamp, pid, then (rss, peak, size)
_, pid, _, peak, size = row
if peak != '':
peak_by_pid[pid].append(int(peak))
if size != '':
size_by_pid[pid].append(int(size))
mem_by_pid = {}
# Now compute summaries
pids = peak_by_pid.keys()
for pid in pids:
peaks = peak_by_pid[pid]
vm5_peak_kib = max(peaks)
sizes = size_by_pid[pid]
vm5_mean_kib = sum(sizes) / len(sizes)
mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
return mem_by_pid
def CheckJobId(job_id, parts):
"""Sanity check for date or smoke test."""
if not job_id.startswith('201') and not job_id.startswith('smoke'):
raise RuntimeError(
"Expected job ID to start with '201' or 'smoke': got %r (%s)" %
(job_id, parts))
def ReadStatus(f):
status_line = f.readline().strip()
return status_line.split()[0] # OK, TIMEOUT, FAIL
def CombineDistTaskStatus(stdin, c_out, mem_by_pid):
"""Read status task paths from stdin, write CSV summary to c_out'."""
#util.log('%s', mem_by_pid)
# Parses:
# - input path for metric name and date
# - spec.txt for task params
# - STATUS.txt for task success/failure
# - metrics.json for output metrics
# - log.txt for timing, if it ran to completion
# - and for structured data
# - join with mem by PID
header = (
'job_id', 'params_file', 'map_file',
'metric', 'date',
'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped
'seconds', 'status',
# only set when OK
'num_reports', 'num_rappor', 'allocated_mass',
# only set when failed
'fail_reason')
c_out.writerow(header)
for line in stdin:
#
# Receive a STATUS.txt path on each line of stdin, and parse it.
#
status_path = line.strip()
with open(status_path) as f:
status = ReadStatus(f)
# Path should look like this:
# ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
parts = status_path.split('/')
job_id = parts[-5]
CheckJobId(job_id, parts)
#
# Parse the job spec
#
result_dir = os.path.dirname(status_path)
spec_file = os.path.join(result_dir, 'spec.txt')
with open(spec_file) as f:
spec_line = f.readline()
# See backfill.sh analyze-one for the order of these 7 fields.
# There are 3 job constants on the front.
(num_reports, metric_name, date, counts_path, params_path,
map_path, _) = spec_line.split()
# NOTE: These are all constant per metric. Could have another CSV and
# join. But denormalizing is OK for now.
params_file = os.path.basename(params_path)
map_file = os.path.basename(map_path)
# remove extension
params_file, _ = os.path.splitext(params_file)
map_file, _ = os.path.splitext(map_file)
#
# Read the log
#
log_file = os.path.join(result_dir, 'log.txt')
with open(log_file) as f:
lines = f.readlines()
# Search lines in reverse order for total time. It could have output from
# multiple 'time' statements, and we want the last one.
seconds = None # for skipped
for i in xrange(len(lines) - 1, -1, -1):
# TODO: Parse the R timing too. Could use LOG_RECORD_RE.
m = TIMING_RE.search(lines[i])
if m:
min_part, sec_part = m.groups()
seconds = float(min_part) * 60 + float(sec_part)
break
# Extract stack trace
if status == 'FAIL':
# Stack trace looks like: "Calls: main -> RunOne ..."
fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
else:
fail_reason = None
# Extract PID and join with memory results
pid = None
vm5_peak_kib = None
vm5_mean_kib = None
if mem_by_pid:
for line in lines:
m = PID_RE.match(line)
if m:
pid = m.group(1)
# Could the PID not exist if the process was super short was less
# than 5 seconds?
try:
vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
except KeyError: # sometimes we don't add mem-track on the front
vm5_peak_kib, vm5_mean_kib = None, None
break
else:
pass # we weren't passed memory.csv
#
# Read the metrics
#
metrics = {}
metrics_file = os.path.join(result_dir, 'metrics.json')
if os.path.isfile(metrics_file):
with open(metrics_file) as f:
metrics = json.load(f)
num_rappor = metrics.get('num_detected')
allocated_mass = metrics.get('allocated_mass')
# Construct and write row
row = (
job_id, params_file, map_file,
metric_name, date,
vm5_peak_kib, vm5_mean_kib,
seconds, status,
num_reports, num_rappor, allocated_mass,
fail_reason)
c_out.writerow(row)
def CombineAssocTaskStatus(stdin, c_out):
"""Read status task paths from stdin, write CSV summary to c_out'."""
header = (
'job_id', 'metric', 'date', 'status', 'num_reports',
'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
'd2')
c_out.writerow(header)
for line in stdin:
status_path = line.strip()
with open(status_path) as f:
status = ReadStatus(f)
parts = status_path.split('/')
job_id = parts[-6]
CheckJobId(job_id, parts)
#
# Parse the job spec
#
result_dir = os.path.dirname(status_path)
spec_file = os.path.join(result_dir, 'assoc-spec.txt')
with open(spec_file) as f:
spec_line = f.readline()
# See backfill.sh analyze-one for the order of these 7 fields.
# There are 3 job constants on the front.
# 5 job params
(_, _, _, _, _,
dummy_num_reports, metric_name, date, reports, var1, var2, map1,
output_dir) = spec_line.split()
#
# Parse decode-assoc metrics
#
metrics = {}
metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
if os.path.isfile(metrics_file):
with open(metrics_file) as f:
metrics = json.load(f)
# After we run it we have the actual number of reports
num_reports = metrics.get('num_reports')
total_elapsed_seconds = metrics.get('total_elapsed_time')
em_elapsed_seconds = metrics.get('em_elapsed_time')
estimate_dimensions = metrics.get('estimate_dimensions')
if estimate_dimensions:
d1, d2 = estimate_dimensions
else:
d1, d2 = (0, 0) # unknown
row = (
job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
em_elapsed_seconds, var1, var2, d1, d2)
c_out.writerow(row)
def main(argv):
action = argv[1]
try:
mem_csv = argv[2]
except IndexError:
mem_by_pid = None
else:
with open(mem_csv) as f:
mem_by_pid = ParseMemCsv(f)
if action == 'dist':
c_out = csv.writer(sys.stdout)
CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
elif action == 'assoc':
c_out = csv.writer(sys.stdout)
CombineAssocTaskStatus(sys.stdin, c_out)
else:
raise RuntimeError('Invalid action %r' % action)
if __name__ == '__main__':
try:
main(sys.argv)
except RuntimeError, e:
print >>sys.stderr, 'FATAL: %s' % e
sys.exit(1)