blob: c46bb358c0e2e12efdd10aeed31c1e22ddc2b6bf [file] [log] [blame] [edit]
#!/usr/bin/python
"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.
Each line represents a task, or R process invocation. The params on each line
are passed to ./dist.sh decode-many or ./assoc.sh decode-many.
"""
import collections
import csv
import errno
import optparse
import os
import pprint
import re
import sys
import util
def _ReadDistMaps(f):
dist_maps = {}
c = csv.reader(f)
for i, row in enumerate(c):
if i == 0:
expected = ['var', 'map_filename']
if row != expected:
raise RuntimeError('Expected CSV header %s' % expected)
continue # skip header
var_name, map_filename = row
dist_maps[var_name] = map_filename
return dist_maps
class DistMapLookup(object):
"""Create a dictionary of var -> map to analyze against.
TODO: Support a LIST of maps. Users should be able to specify more than one.
"""
def __init__(self, f, map_dir):
self.dist_maps = _ReadDistMaps(f)
self.map_dir = map_dir
def GetMapPath(self, var_name):
filename = self.dist_maps[var_name]
return os.path.join(self.map_dir, filename)
def CreateFieldIdLookup(f):
"""Create a dictionary that specifies single variable analysis each var.
Args:
config_dir: directory of metadata, output by update_rappor.par
Returns:
A dictionary from field ID -> full field name
NOTE: Right now we're only doing single variable analysis for strings, so we
don't have the "type".
"""
field_id_lookup = {}
c = csv.reader(f)
for i, row in enumerate(c):
if i == 0:
expected = ['metric', 'field', 'field_type', 'params', 'field_id']
if row != expected:
raise RuntimeError('Expected CSV header %s' % expected)
continue
metric, field, field_type, _, field_id = row
if field_type != 'string':
continue
# Paper over the difference between plain metrics (single variable) and
# metrics with fields (multiple variables, for association analysis).
if field:
full_field_name = '%s.%s' % (metric, field)
else:
full_field_name = metric
field_id_lookup[field_id] = full_field_name
return field_id_lookup
def _ReadVarSchema(f):
"""Given the rappor-vars.csv file, return a list of metric/var/type."""
# metric -> list of (variable name, type)
assoc_metrics = collections.defaultdict(list)
params_lookup = {}
c = csv.reader(f)
for i, row in enumerate(c):
if i == 0:
expected = ['metric', 'var', 'var_type', 'params']
if row != expected:
raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
continue
metric, var, var_type, params = row
if var == '':
full_var_name = metric
else:
full_var_name = '%s.%s' % (metric, var)
# Also group multi-dimensional reports
assoc_metrics[metric].append((var, var_type))
params_lookup[full_var_name] = params
return assoc_metrics, params_lookup
class VarSchema(object):
"""Object representing rappor-vars.csv.
Right now we use it for slightly different purposes for dist and assoc
analysis.
"""
def __init__(self, f, params_dir):
self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
self.params_dir = params_dir
def GetParamsPath(self, var_name):
filename = self.params_lookup[var_name]
return os.path.join(self.params_dir, filename + '.csv')
def GetAssocMetrics(self):
return self.assoc_metrics
def CountReports(f):
num_reports = 0
for line in f:
first_col = line.split(',')[0]
num_reports += int(first_col)
return num_reports
DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')
def DistInputIter(stdin):
"""Read lines from stdin and extract fields to construct analysis tasks."""
for line in stdin:
m = DIST_INPUT_PATH_RE.match(line)
if not m:
raise RuntimeError('Invalid path %r' % line)
counts_path = line.strip()
date, field_id = m.groups()
yield counts_path, date, field_id
def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
"""Print task spec for single variable RAPPOR to stdout."""
num_bad = 0
unique_ids = set()
for counts_path, date, field_id in input_iter:
unique_ids.add(field_id)
# num_reports is used for filtering
with open(counts_path) as f:
num_reports = CountReports(f)
# Look up field name from field ID
if field_id_lookup:
field_name = field_id_lookup.get(field_id)
if field_name is None:
# The metric id is the md5 hash of the name. We can miss some, e.g. due
# to debug builds.
if bad_c:
bad_c.writerow((date, field_id, num_reports))
num_bad += 1
continue
else:
field_name = field_id
# NOTE: We could remove the params from the spec if decode_dist.R took the
# --schema flag. The var type is there too.
params_path = var_schema.GetParamsPath(field_name)
map_path= dist_maps.GetMapPath(field_name)
yield num_reports, field_name, date, counts_path, params_path, map_path
util.log('%d unique field IDs', len(unique_ids))
if num_bad:
util.log('Failed field ID -> field name lookup on %d files '
'(check --field-ids file)', num_bad)
ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')
def AssocInputIter(stdin):
"""Read lines from stdin and extract fields to construct analysis tasks."""
for line in stdin:
m = ASSOC_INPUT_PATH_RE.match(line)
if not m:
raise RuntimeError('Invalid path %r' % line)
reports_path = line.strip()
date, metric_name = m.groups()
yield reports_path, date, metric_name
def CreateAssocVarPairs(rappor_metrics):
"""Yield a list of pairs of variables that should be associated.
For now just do all (string x boolean) analysis.
"""
var_pairs = collections.defaultdict(list)
for metric, var_list in rappor_metrics.iteritems():
string_vars = []
boolean_vars = []
# Separate variables into strings and booleans
for var_name, var_type in var_list:
if var_type == 'string':
string_vars.append(var_name)
elif var_type == 'boolean':
boolean_vars.append(var_name)
else:
util.log('Unknown type variable type %r', var_type)
for s in string_vars:
for b in boolean_vars:
var_pairs[metric].append((s, b))
return var_pairs
# For debugging
def PrintAssocVarPairs(var_pairs):
for metric, var_list in var_pairs.iteritems():
print metric
for var_name, var_type in var_list:
print '\t', var_name, var_type
def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
"""Print the task spec for multiple variable RAPPOR to stdout."""
# Flow:
#
# Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
#
# Short term: update_rappor.py should print every combination of string vs.
# bool? Or I guess we have it in rappor-vars.csv
for reports_path, date, metric_name in input_iter:
pairs = var_pairs[metric_name]
for var1, var2 in pairs:
# Assuming var1 is a string. TODO: Use an assoc file, not dist_maps?
field1_name = '%s.%s' % (metric_name, var1)
map1_path = dist_maps.GetMapPath(field1_name)
# e.g. domain_X_flags__DID_PROCEED
# Don't use .. in filenames since it could be confusing.
pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)
yield metric_name, date, reports_path, var1, var2, map1_path, output_dir
def CreateOptionsParser():
p = optparse.OptionParser()
p.add_option(
'--bad-report-out', dest='bad_report', metavar='PATH', type='str',
default='',
help='Optionally write a report of input filenames with invalid field '
'IDs to this file.')
p.add_option(
'--config-dir', dest='config_dir', metavar='PATH', type='str',
default='',
help='Directory with metadata schema and params files to read.')
p.add_option(
'--map-dir', dest='map_dir', metavar='PATH', type='str',
default='',
help='Directory with map files to read.')
p.add_option(
'--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
default='',
help='Root of the directory tree where analysis output will be placed.')
p.add_option(
'--field-ids', dest='field_ids', metavar='PATH', type='str',
default='',
help='Optional CSV file with field IDs (generally should not be used).')
return p
def main(argv):
(opts, argv) = CreateOptionsParser().parse_args(argv)
if opts.bad_report:
bad_f = open(opts.bad_report, 'w')
bad_c = csv.writer(bad_f)
else:
bad_c = None
action = argv[1]
if not opts.config_dir:
raise RuntimeError('--config-dir is required')
if not opts.map_dir:
raise RuntimeError('--map-dir is required')
if not opts.output_base_dir:
raise RuntimeError('--output-base-dir is required')
# This is shared between the two specs.
path = os.path.join(opts.config_dir, 'dist-analysis.csv')
with open(path) as f:
dist_maps = DistMapLookup(f, opts.map_dir)
path = os.path.join(opts.config_dir, 'rappor-vars.csv')
with open(path) as f:
var_schema = VarSchema(f, opts.config_dir)
if action == 'dist':
if opts.field_ids:
with open(opts.field_ids) as f:
field_id_lookup = CreateFieldIdLookup(f)
else:
field_id_lookup = {}
input_iter = DistInputIter(sys.stdin)
for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
bad_c):
# The spec is a series of space-separated tokens.
tokens = row + (opts.output_base_dir,)
print ' '.join(str(t) for t in tokens)
elif action == 'assoc':
# Parse input
input_iter = AssocInputIter(sys.stdin)
# Create M x N association tasks
var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())
# Now add the other constant stuff
for row in AssocTaskSpec(
input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):
num_reports = 0 # placeholder, not filtering yet
tokens = (num_reports,) + row
print ' '.join(str(t) for t in tokens)
else:
raise RuntimeError('Invalid action %r' % action)
if __name__ == '__main__':
try:
main(sys.argv)
except IOError, e:
if e.errno != errno.EPIPE: # ignore broken pipe
raise
except RuntimeError, e:
print >>sys.stderr, 'FATAL: %s' % e
sys.exit(1)