| #!/usr/bin/python |
| """Summarize the results of many RAPPOR analysis runs. |
| |
| Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt |
| and log.txt files. Writes a CSV to stdout. Row key is (metric, date). |
| """ |
| |
| import collections |
| import csv |
| import json |
| import os |
| import re |
| import sys |
| |
| |
| # Parse bash 'time' output: |
| # real 0m11.578s |
| |
| # TODO: Parse the time from metrics.json instead. |
| TIMING_RE = re.compile( |
| r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE) |
| |
| # TODO: Could have decode-dist and decode-assoc output the PID? |
| PID_RE = re.compile( |
| r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal |
| |
| |
| def ParseMemCsv(f): |
| """Compute summary stats for memory. |
| |
| vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses |
| the kernel, it's accurate except for takes that spike in their last 4 |
| seconds. |
| |
| vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals |
| """ |
| peak_by_pid = collections.defaultdict(list) |
| size_by_pid = collections.defaultdict(list) |
| |
| # Parse columns we care about, by PID |
| c = csv.reader(f) |
| for i, row in enumerate(c): |
| if i == 0: |
| continue # skip header |
| # looks like timestamp, pid, then (rss, peak, size) |
| _, pid, _, peak, size = row |
| if peak != '': |
| peak_by_pid[pid].append(int(peak)) |
| if size != '': |
| size_by_pid[pid].append(int(size)) |
| |
| mem_by_pid = {} |
| |
| # Now compute summaries |
| pids = peak_by_pid.keys() |
| for pid in pids: |
| peaks = peak_by_pid[pid] |
| vm5_peak_kib = max(peaks) |
| |
| sizes = size_by_pid[pid] |
| vm5_mean_kib = sum(sizes) / len(sizes) |
| |
| mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib) |
| |
| return mem_by_pid |
| |
| |
| def CheckJobId(job_id, parts): |
| """Sanity check for date or smoke test.""" |
| if not job_id.startswith('201') and not job_id.startswith('smoke'): |
| raise RuntimeError( |
| "Expected job ID to start with '201' or 'smoke': got %r (%s)" % |
| (job_id, parts)) |
| |
| |
| def ReadStatus(f): |
| status_line = f.readline().strip() |
| return status_line.split()[0] # OK, TIMEOUT, FAIL |
| |
| |
| def CombineDistTaskStatus(stdin, c_out, mem_by_pid): |
| """Read status task paths from stdin, write CSV summary to c_out'.""" |
| |
| #util.log('%s', mem_by_pid) |
| |
| # Parses: |
| # - input path for metric name and date |
| # - spec.txt for task params |
| # - STATUS.txt for task success/failure |
| # - metrics.json for output metrics |
| # - log.txt for timing, if it ran to completion |
| # - and for structured data |
| # - join with mem by PID |
| |
| header = ( |
| 'job_id', 'params_file', 'map_file', |
| 'metric', 'date', |
| 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped |
| 'seconds', 'status', |
| # only set when OK |
| 'num_reports', 'num_rappor', 'allocated_mass', |
| # only set when failed |
| 'fail_reason') |
| c_out.writerow(header) |
| |
| for line in stdin: |
| # |
| # Receive a STATUS.txt path on each line of stdin, and parse it. |
| # |
| status_path = line.strip() |
| |
| with open(status_path) as f: |
| status = ReadStatus(f) |
| |
| # Path should look like this: |
| # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt |
| parts = status_path.split('/') |
| job_id = parts[-5] |
| CheckJobId(job_id, parts) |
| |
| # |
| # Parse the job spec |
| # |
| result_dir = os.path.dirname(status_path) |
| spec_file = os.path.join(result_dir, 'spec.txt') |
| with open(spec_file) as f: |
| spec_line = f.readline() |
| # See backfill.sh analyze-one for the order of these 7 fields. |
| # There are 3 job constants on the front. |
| (num_reports, metric_name, date, counts_path, params_path, |
| map_path, _) = spec_line.split() |
| |
| # NOTE: These are all constant per metric. Could have another CSV and |
| # join. But denormalizing is OK for now. |
| params_file = os.path.basename(params_path) |
| map_file = os.path.basename(map_path) |
| |
| # remove extension |
| params_file, _ = os.path.splitext(params_file) |
| map_file, _ = os.path.splitext(map_file) |
| |
| # |
| # Read the log |
| # |
| log_file = os.path.join(result_dir, 'log.txt') |
| with open(log_file) as f: |
| lines = f.readlines() |
| |
| # Search lines in reverse order for total time. It could have output from |
| # multiple 'time' statements, and we want the last one. |
| seconds = None # for skipped |
| for i in xrange(len(lines) - 1, -1, -1): |
| # TODO: Parse the R timing too. Could use LOG_RECORD_RE. |
| m = TIMING_RE.search(lines[i]) |
| if m: |
| min_part, sec_part = m.groups() |
| seconds = float(min_part) * 60 + float(sec_part) |
| break |
| |
| # Extract stack trace |
| if status == 'FAIL': |
| # Stack trace looks like: "Calls: main -> RunOne ..." |
| fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line) |
| else: |
| fail_reason = None |
| |
| # Extract PID and join with memory results |
| pid = None |
| vm5_peak_kib = None |
| vm5_mean_kib = None |
| if mem_by_pid: |
| for line in lines: |
| m = PID_RE.match(line) |
| if m: |
| pid = m.group(1) |
| # Could the PID not exist if the process was super short was less |
| # than 5 seconds? |
| try: |
| vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid] |
| except KeyError: # sometimes we don't add mem-track on the front |
| vm5_peak_kib, vm5_mean_kib = None, None |
| break |
| else: |
| pass # we weren't passed memory.csv |
| |
| # |
| # Read the metrics |
| # |
| metrics = {} |
| metrics_file = os.path.join(result_dir, 'metrics.json') |
| if os.path.isfile(metrics_file): |
| with open(metrics_file) as f: |
| metrics = json.load(f) |
| |
| num_rappor = metrics.get('num_detected') |
| allocated_mass = metrics.get('allocated_mass') |
| |
| # Construct and write row |
| row = ( |
| job_id, params_file, map_file, |
| metric_name, date, |
| vm5_peak_kib, vm5_mean_kib, |
| seconds, status, |
| num_reports, num_rappor, allocated_mass, |
| fail_reason) |
| |
| c_out.writerow(row) |
| |
| |
| def CombineAssocTaskStatus(stdin, c_out): |
| """Read status task paths from stdin, write CSV summary to c_out'.""" |
| |
| header = ( |
| 'job_id', 'metric', 'date', 'status', 'num_reports', |
| 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1', |
| 'd2') |
| |
| c_out.writerow(header) |
| |
| for line in stdin: |
| status_path = line.strip() |
| |
| with open(status_path) as f: |
| status = ReadStatus(f) |
| |
| parts = status_path.split('/') |
| job_id = parts[-6] |
| CheckJobId(job_id, parts) |
| |
| # |
| # Parse the job spec |
| # |
| result_dir = os.path.dirname(status_path) |
| spec_file = os.path.join(result_dir, 'assoc-spec.txt') |
| with open(spec_file) as f: |
| spec_line = f.readline() |
| # See backfill.sh analyze-one for the order of these 7 fields. |
| # There are 3 job constants on the front. |
| |
| # 5 job params |
| (_, _, _, _, _, |
| dummy_num_reports, metric_name, date, reports, var1, var2, map1, |
| output_dir) = spec_line.split() |
| |
| # |
| # Parse decode-assoc metrics |
| # |
| metrics = {} |
| metrics_file = os.path.join(result_dir, 'assoc-metrics.json') |
| if os.path.isfile(metrics_file): |
| with open(metrics_file) as f: |
| metrics = json.load(f) |
| |
| # After we run it we have the actual number of reports |
| num_reports = metrics.get('num_reports') |
| total_elapsed_seconds = metrics.get('total_elapsed_time') |
| em_elapsed_seconds = metrics.get('em_elapsed_time') |
| estimate_dimensions = metrics.get('estimate_dimensions') |
| if estimate_dimensions: |
| d1, d2 = estimate_dimensions |
| else: |
| d1, d2 = (0, 0) # unknown |
| |
| row = ( |
| job_id, metric_name, date, status, num_reports, total_elapsed_seconds, |
| em_elapsed_seconds, var1, var2, d1, d2) |
| c_out.writerow(row) |
| |
| |
| def main(argv): |
| action = argv[1] |
| |
| try: |
| mem_csv = argv[2] |
| except IndexError: |
| mem_by_pid = None |
| else: |
| with open(mem_csv) as f: |
| mem_by_pid = ParseMemCsv(f) |
| |
| if action == 'dist': |
| c_out = csv.writer(sys.stdout) |
| CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid) |
| |
| elif action == 'assoc': |
| c_out = csv.writer(sys.stdout) |
| CombineAssocTaskStatus(sys.stdin, c_out) |
| |
| else: |
| raise RuntimeError('Invalid action %r' % action) |
| |
| |
| if __name__ == '__main__': |
| try: |
| main(sys.argv) |
| except RuntimeError, e: |
| print >>sys.stderr, 'FATAL: %s' % e |
| sys.exit(1) |