path: root/pipeline/task_spec.py
diff options
Diffstat (limited to 'pipeline/task_spec.py')
1 files changed, 364 insertions, 0 deletions
diff --git a/pipeline/task_spec.py b/pipeline/task_spec.py
new file mode 100755
index 0000000..c46bb35
--- /dev/null
+++ b/pipeline/task_spec.py
@@ -0,0 +1,364 @@
+"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.
+Each line represents a task, or R process invocation. The params on each line
+are passed to ./dist.sh decode-many or ./assoc.sh decode-many.
+import collections
+import csv
+import errno
+import optparse
+import os
+import pprint
+import re
+import sys
+import util
+def _ReadDistMaps(f):
+ dist_maps = {}
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['var', 'map_filename']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s' % expected)
+ continue # skip header
+ var_name, map_filename = row
+ dist_maps[var_name] = map_filename
+ return dist_maps
+class DistMapLookup(object):
+ """Create a dictionary of var -> map to analyze against.
+ TODO: Support a LIST of maps. Users should be able to specify more than one.
+ """
+ def __init__(self, f, map_dir):
+ self.dist_maps = _ReadDistMaps(f)
+ self.map_dir = map_dir
+ def GetMapPath(self, var_name):
+ filename = self.dist_maps[var_name]
+ return os.path.join(self.map_dir, filename)
+def CreateFieldIdLookup(f):
+ """Create a dictionary that specifies single variable analysis each var.
+ Args:
+ config_dir: directory of metadata, output by update_rappor.par
+ Returns:
+ A dictionary from field ID -> full field name
+ NOTE: Right now we're only doing single variable analysis for strings, so we
+ don't have the "type".
+ """
+ field_id_lookup = {}
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['metric', 'field', 'field_type', 'params', 'field_id']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s' % expected)
+ continue
+ metric, field, field_type, _, field_id = row
+ if field_type != 'string':
+ continue
+ # Paper over the difference between plain metrics (single variable) and
+ # metrics with fields (multiple variables, for association analysis).
+ if field:
+ full_field_name = '%s.%s' % (metric, field)
+ else:
+ full_field_name = metric
+ field_id_lookup[field_id] = full_field_name
+ return field_id_lookup
+def _ReadVarSchema(f):
+ """Given the rappor-vars.csv file, return a list of metric/var/type."""
+ # metric -> list of (variable name, type)
+ assoc_metrics = collections.defaultdict(list)
+ params_lookup = {}
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['metric', 'var', 'var_type', 'params']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
+ continue
+ metric, var, var_type, params = row
+ if var == '':
+ full_var_name = metric
+ else:
+ full_var_name = '%s.%s' % (metric, var)
+ # Also group multi-dimensional reports
+ assoc_metrics[metric].append((var, var_type))
+ params_lookup[full_var_name] = params
+ return assoc_metrics, params_lookup
+class VarSchema(object):
+ """Object representing rappor-vars.csv.
+ Right now we use it for slightly different purposes for dist and assoc
+ analysis.
+ """
+ def __init__(self, f, params_dir):
+ self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
+ self.params_dir = params_dir
+ def GetParamsPath(self, var_name):
+ filename = self.params_lookup[var_name]
+ return os.path.join(self.params_dir, filename + '.csv')
+ def GetAssocMetrics(self):
+ return self.assoc_metrics
+def CountReports(f):
+ num_reports = 0
+ for line in f:
+ first_col = line.split(',')[0]
+ num_reports += int(first_col)
+ return num_reports
+DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')
+def DistInputIter(stdin):
+ """Read lines from stdin and extract fields to construct analysis tasks."""
+ for line in stdin:
+ m = DIST_INPUT_PATH_RE.match(line)
+ if not m:
+ raise RuntimeError('Invalid path %r' % line)
+ counts_path = line.strip()
+ date, field_id = m.groups()
+ yield counts_path, date, field_id
+def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
+ """Print task spec for single variable RAPPOR to stdout."""
+ num_bad = 0
+ unique_ids = set()
+ for counts_path, date, field_id in input_iter:
+ unique_ids.add(field_id)
+ # num_reports is used for filtering
+ with open(counts_path) as f:
+ num_reports = CountReports(f)
+ # Look up field name from field ID
+ if field_id_lookup:
+ field_name = field_id_lookup.get(field_id)
+ if field_name is None:
+ # The metric id is the md5 hash of the name. We can miss some, e.g. due
+ # to debug builds.
+ if bad_c:
+ bad_c.writerow((date, field_id, num_reports))
+ num_bad += 1
+ continue
+ else:
+ field_name = field_id
+ # NOTE: We could remove the params from the spec if decode_dist.R took the
+ # --schema flag. The var type is there too.
+ params_path = var_schema.GetParamsPath(field_name)
+ map_path= dist_maps.GetMapPath(field_name)
+ yield num_reports, field_name, date, counts_path, params_path, map_path
+ util.log('%d unique field IDs', len(unique_ids))
+ if num_bad:
+ util.log('Failed field ID -> field name lookup on %d files '
+ '(check --field-ids file)', num_bad)
+ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')
+def AssocInputIter(stdin):
+ """Read lines from stdin and extract fields to construct analysis tasks."""
+ for line in stdin:
+ m = ASSOC_INPUT_PATH_RE.match(line)
+ if not m:
+ raise RuntimeError('Invalid path %r' % line)
+ reports_path = line.strip()
+ date, metric_name = m.groups()
+ yield reports_path, date, metric_name
+def CreateAssocVarPairs(rappor_metrics):
+ """Yield a list of pairs of variables that should be associated.
+ For now just do all (string x boolean) analysis.
+ """
+ var_pairs = collections.defaultdict(list)
+ for metric, var_list in rappor_metrics.iteritems():
+ string_vars = []
+ boolean_vars = []
+ # Separate variables into strings and booleans
+ for var_name, var_type in var_list:
+ if var_type == 'string':
+ string_vars.append(var_name)
+ elif var_type == 'boolean':
+ boolean_vars.append(var_name)
+ else:
+ util.log('Unknown type variable type %r', var_type)
+ for s in string_vars:
+ for b in boolean_vars:
+ var_pairs[metric].append((s, b))
+ return var_pairs
+# For debugging
+def PrintAssocVarPairs(var_pairs):
+ for metric, var_list in var_pairs.iteritems():
+ print metric
+ for var_name, var_type in var_list:
+ print '\t', var_name, var_type
+def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
+ """Print the task spec for multiple variable RAPPOR to stdout."""
+ # Flow:
+ #
+ # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
+ #
+ # Short term: update_rappor.py should print every combination of string vs.
+ # bool? Or I guess we have it in rappor-vars.csv
+ for reports_path, date, metric_name in input_iter:
+ pairs = var_pairs[metric_name]
+ for var1, var2 in pairs:
+ # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps?
+ field1_name = '%s.%s' % (metric_name, var1)
+ map1_path = dist_maps.GetMapPath(field1_name)
+ # e.g. domain_X_flags__DID_PROCEED
+ # Don't use .. in filenames since it could be confusing.
+ pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
+ output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)
+ yield metric_name, date, reports_path, var1, var2, map1_path, output_dir
+def CreateOptionsParser():
+ p = optparse.OptionParser()
+ p.add_option(
+ '--bad-report-out', dest='bad_report', metavar='PATH', type='str',
+ default='',
+ help='Optionally write a report of input filenames with invalid field '
+ 'IDs to this file.')
+ p.add_option(
+ '--config-dir', dest='config_dir', metavar='PATH', type='str',
+ default='',
+ help='Directory with metadata schema and params files to read.')
+ p.add_option(
+ '--map-dir', dest='map_dir', metavar='PATH', type='str',
+ default='',
+ help='Directory with map files to read.')
+ p.add_option(
+ '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
+ default='',
+ help='Root of the directory tree where analysis output will be placed.')
+ p.add_option(
+ '--field-ids', dest='field_ids', metavar='PATH', type='str',
+ default='',
+ help='Optional CSV file with field IDs (generally should not be used).')
+ return p
+def main(argv):
+ (opts, argv) = CreateOptionsParser().parse_args(argv)
+ if opts.bad_report:
+ bad_f = open(opts.bad_report, 'w')
+ bad_c = csv.writer(bad_f)
+ else:
+ bad_c = None
+ action = argv[1]
+ if not opts.config_dir:
+ raise RuntimeError('--config-dir is required')
+ if not opts.map_dir:
+ raise RuntimeError('--map-dir is required')
+ if not opts.output_base_dir:
+ raise RuntimeError('--output-base-dir is required')
+ # This is shared between the two specs.
+ path = os.path.join(opts.config_dir, 'dist-analysis.csv')
+ with open(path) as f:
+ dist_maps = DistMapLookup(f, opts.map_dir)
+ path = os.path.join(opts.config_dir, 'rappor-vars.csv')
+ with open(path) as f:
+ var_schema = VarSchema(f, opts.config_dir)
+ if action == 'dist':
+ if opts.field_ids:
+ with open(opts.field_ids) as f:
+ field_id_lookup = CreateFieldIdLookup(f)
+ else:
+ field_id_lookup = {}
+ input_iter = DistInputIter(sys.stdin)
+ for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
+ bad_c):
+ # The spec is a series of space-separated tokens.
+ tokens = row + (opts.output_base_dir,)
+ print ' '.join(str(t) for t in tokens)
+ elif action == 'assoc':
+ # Parse input
+ input_iter = AssocInputIter(sys.stdin)
+ # Create M x N association tasks
+ var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())
+ # Now add the other constant stuff
+ for row in AssocTaskSpec(
+ input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):
+ num_reports = 0 # placeholder, not filtering yet
+ tokens = (num_reports,) + row
+ print ' '.join(str(t) for t in tokens)
+ else:
+ raise RuntimeError('Invalid action %r' % action)
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except IOError, e:
+ if e.errno != errno.EPIPE: # ignore broken pipe
+ raise
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)