diff options
Diffstat (limited to 'pipeline/task_spec.py')
-rwxr-xr-x | pipeline/task_spec.py | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/pipeline/task_spec.py b/pipeline/task_spec.py new file mode 100755 index 0000000..c46bb35 --- /dev/null +++ b/pipeline/task_spec.py @@ -0,0 +1,364 @@ +#!/usr/bin/python +"""Read a list of 'counts' paths on stdin, and write a task spec on stdout. + +Each line represents a task, or R process invocation. The params on each line +are passed to ./dist.sh decode-many or ./assoc.sh decode-many. +""" + +import collections +import csv +import errno +import optparse +import os +import pprint +import re +import sys + +import util + + +def _ReadDistMaps(f): + dist_maps = {} + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['var', 'map_filename'] + if row != expected: + raise RuntimeError('Expected CSV header %s' % expected) + continue # skip header + + var_name, map_filename = row + dist_maps[var_name] = map_filename + return dist_maps + + +class DistMapLookup(object): + """Create a dictionary of var -> map to analyze against. + + TODO: Support a LIST of maps. Users should be able to specify more than one. + """ + def __init__(self, f, map_dir): + self.dist_maps = _ReadDistMaps(f) + self.map_dir = map_dir + + def GetMapPath(self, var_name): + filename = self.dist_maps[var_name] + return os.path.join(self.map_dir, filename) + + +def CreateFieldIdLookup(f): + """Create a dictionary that specifies single variable analysis each var. + + Args: + config_dir: directory of metadata, output by update_rappor.par + + Returns: + A dictionary from field ID -> full field name + + NOTE: Right now we're only doing single variable analysis for strings, so we + don't have the "type". + """ + field_id_lookup = {} + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['metric', 'field', 'field_type', 'params', 'field_id'] + if row != expected: + raise RuntimeError('Expected CSV header %s' % expected) + continue + + metric, field, field_type, _, field_id = row + + if field_type != 'string': + continue + + # Paper over the difference between plain metrics (single variable) and + # metrics with fields (multiple variables, for association analysis). + if field: + full_field_name = '%s.%s' % (metric, field) + else: + full_field_name = metric + + field_id_lookup[field_id] = full_field_name + return field_id_lookup + + +def _ReadVarSchema(f): + """Given the rappor-vars.csv file, return a list of metric/var/type.""" + # metric -> list of (variable name, type) + assoc_metrics = collections.defaultdict(list) + params_lookup = {} + + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['metric', 'var', 'var_type', 'params'] + if row != expected: + raise RuntimeError('Expected CSV header %s, got %s' % (expected, row)) + continue + + metric, var, var_type, params = row + if var == '': + full_var_name = metric + else: + full_var_name = '%s.%s' % (metric, var) + # Also group multi-dimensional reports + assoc_metrics[metric].append((var, var_type)) + + params_lookup[full_var_name] = params + + return assoc_metrics, params_lookup + + +class VarSchema(object): + """Object representing rappor-vars.csv. + + Right now we use it for slightly different purposes for dist and assoc + analysis. + """ + def __init__(self, f, params_dir): + self.assoc_metrics, self.params_lookup = _ReadVarSchema(f) + self.params_dir = params_dir + + def GetParamsPath(self, var_name): + filename = self.params_lookup[var_name] + return os.path.join(self.params_dir, filename + '.csv') + + def GetAssocMetrics(self): + return self.assoc_metrics + + +def CountReports(f): + num_reports = 0 + for line in f: + first_col = line.split(',')[0] + num_reports += int(first_col) + return num_reports + + +DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv') + + +def DistInputIter(stdin): + """Read lines from stdin and extract fields to construct analysis tasks.""" + for line in stdin: + m = DIST_INPUT_PATH_RE.match(line) + if not m: + raise RuntimeError('Invalid path %r' % line) + + counts_path = line.strip() + date, field_id = m.groups() + + yield counts_path, date, field_id + + +def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): + """Print task spec for single variable RAPPOR to stdout.""" + + num_bad = 0 + unique_ids = set() + + for counts_path, date, field_id in input_iter: + unique_ids.add(field_id) + + # num_reports is used for filtering + with open(counts_path) as f: + num_reports = CountReports(f) + + # Look up field name from field ID + if field_id_lookup: + field_name = field_id_lookup.get(field_id) + if field_name is None: + # The metric id is the md5 hash of the name. We can miss some, e.g. due + # to debug builds. + if bad_c: + bad_c.writerow((date, field_id, num_reports)) + num_bad += 1 + continue + else: + field_name = field_id + + # NOTE: We could remove the params from the spec if decode_dist.R took the + # --schema flag. The var type is there too. + params_path = var_schema.GetParamsPath(field_name) + map_path= dist_maps.GetMapPath(field_name) + + yield num_reports, field_name, date, counts_path, params_path, map_path + + util.log('%d unique field IDs', len(unique_ids)) + if num_bad: + util.log('Failed field ID -> field name lookup on %d files ' + '(check --field-ids file)', num_bad) + + +ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv') + + +def AssocInputIter(stdin): + """Read lines from stdin and extract fields to construct analysis tasks.""" + for line in stdin: + m = ASSOC_INPUT_PATH_RE.match(line) + if not m: + raise RuntimeError('Invalid path %r' % line) + + reports_path = line.strip() + date, metric_name = m.groups() + + yield reports_path, date, metric_name + + +def CreateAssocVarPairs(rappor_metrics): + """Yield a list of pairs of variables that should be associated. + + For now just do all (string x boolean) analysis. + """ + var_pairs = collections.defaultdict(list) + + for metric, var_list in rappor_metrics.iteritems(): + string_vars = [] + boolean_vars = [] + + # Separate variables into strings and booleans + for var_name, var_type in var_list: + if var_type == 'string': + string_vars.append(var_name) + elif var_type == 'boolean': + boolean_vars.append(var_name) + else: + util.log('Unknown type variable type %r', var_type) + + for s in string_vars: + for b in boolean_vars: + var_pairs[metric].append((s, b)) + return var_pairs + + +# For debugging +def PrintAssocVarPairs(var_pairs): + for metric, var_list in var_pairs.iteritems(): + print metric + for var_name, var_type in var_list: + print '\t', var_name, var_type + + +def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c): + """Print the task spec for multiple variable RAPPOR to stdout.""" + # Flow: + # + # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml? + # + # Short term: update_rappor.py should print every combination of string vs. + # bool? Or I guess we have it in rappor-vars.csv + + for reports_path, date, metric_name in input_iter: + pairs = var_pairs[metric_name] + for var1, var2 in pairs: + # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps? + field1_name = '%s.%s' % (metric_name, var1) + map1_path = dist_maps.GetMapPath(field1_name) + + # e.g. domain_X_flags__DID_PROCEED + # Don't use .. in filenames since it could be confusing. + pair_name = '%s_X_%s' % (var1, var2.replace('..', '_')) + output_dir = os.path.join(output_base_dir, metric_name, pair_name, date) + + yield metric_name, date, reports_path, var1, var2, map1_path, output_dir + + +def CreateOptionsParser(): + p = optparse.OptionParser() + + p.add_option( + '--bad-report-out', dest='bad_report', metavar='PATH', type='str', + default='', + help='Optionally write a report of input filenames with invalid field ' + 'IDs to this file.') + p.add_option( + '--config-dir', dest='config_dir', metavar='PATH', type='str', + default='', + help='Directory with metadata schema and params files to read.') + p.add_option( + '--map-dir', dest='map_dir', metavar='PATH', type='str', + default='', + help='Directory with map files to read.') + p.add_option( + '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str', + default='', + help='Root of the directory tree where analysis output will be placed.') + p.add_option( + '--field-ids', dest='field_ids', metavar='PATH', type='str', + default='', + help='Optional CSV file with field IDs (generally should not be used).') + + return p + + +def main(argv): + (opts, argv) = CreateOptionsParser().parse_args(argv) + + if opts.bad_report: + bad_f = open(opts.bad_report, 'w') + bad_c = csv.writer(bad_f) + else: + bad_c = None + + action = argv[1] + + if not opts.config_dir: + raise RuntimeError('--config-dir is required') + if not opts.map_dir: + raise RuntimeError('--map-dir is required') + if not opts.output_base_dir: + raise RuntimeError('--output-base-dir is required') + + # This is shared between the two specs. + path = os.path.join(opts.config_dir, 'dist-analysis.csv') + with open(path) as f: + dist_maps = DistMapLookup(f, opts.map_dir) + + path = os.path.join(opts.config_dir, 'rappor-vars.csv') + with open(path) as f: + var_schema = VarSchema(f, opts.config_dir) + + if action == 'dist': + if opts.field_ids: + with open(opts.field_ids) as f: + field_id_lookup = CreateFieldIdLookup(f) + else: + field_id_lookup = {} + + input_iter = DistInputIter(sys.stdin) + for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, + bad_c): + # The spec is a series of space-separated tokens. + tokens = row + (opts.output_base_dir,) + print ' '.join(str(t) for t in tokens) + + elif action == 'assoc': + # Parse input + input_iter = AssocInputIter(sys.stdin) + + # Create M x N association tasks + var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics()) + + # Now add the other constant stuff + for row in AssocTaskSpec( + input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c): + + num_reports = 0 # placeholder, not filtering yet + tokens = (num_reports,) + row + print ' '.join(str(t) for t in tokens) + + else: + raise RuntimeError('Invalid action %r' % action) + + +if __name__ == '__main__': + try: + main(sys.argv) + except IOError, e: + if e.errno != errno.EPIPE: # ignore broken pipe + raise + except RuntimeError, e: + print >>sys.stderr, 'FATAL: %s' % e + sys.exit(1) |