aboutsummaryrefslogtreecommitdiff
path: root/pipeline/task_spec.py
blob: c46bb358c0e2e12efdd10aeed31c1e22ddc2b6bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#!/usr/bin/python
"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.

Each line represents a task, or R process invocation.  The params on each line
are passed to ./dist.sh decode-many or ./assoc.sh decode-many.
"""

import collections
import csv
import errno
import optparse
import os
import pprint
import re
import sys

import util


def _ReadDistMaps(f):
  dist_maps = {}
  c = csv.reader(f)
  for i, row in enumerate(c):
    if i == 0:
      expected = ['var', 'map_filename']
      if row != expected:
        raise RuntimeError('Expected CSV header %s' % expected)
      continue  # skip header

    var_name, map_filename = row
    dist_maps[var_name] = map_filename
  return dist_maps


class DistMapLookup(object):
  """Create a dictionary of var -> map to analyze against.

  TODO: Support a LIST of maps.  Users should be able to specify more than one.
  """
  def __init__(self, f, map_dir):
    self.dist_maps = _ReadDistMaps(f)
    self.map_dir = map_dir

  def GetMapPath(self, var_name):
    filename = self.dist_maps[var_name]
    return os.path.join(self.map_dir, filename)


def CreateFieldIdLookup(f):
  """Create a dictionary that specifies single variable analysis each var.

  Args:
    config_dir: directory of metadata, output by update_rappor.par

  Returns:
    A dictionary from field ID -> full field name

  NOTE: Right now we're only doing single variable analysis for strings, so we
  don't have the "type".
  """
  field_id_lookup = {}
  c = csv.reader(f)
  for i, row in enumerate(c):
    if i == 0:
      expected = ['metric', 'field', 'field_type', 'params', 'field_id']
      if row != expected:
        raise RuntimeError('Expected CSV header %s' % expected)
      continue

    metric, field, field_type, _, field_id = row

    if field_type != 'string':
      continue

    # Paper over the difference between plain metrics (single variable) and
    # metrics with fields (multiple variables, for association analysis).
    if field:
      full_field_name = '%s.%s' % (metric, field)
    else:
      full_field_name = metric

    field_id_lookup[field_id] = full_field_name
  return field_id_lookup


def _ReadVarSchema(f):
  """Given the rappor-vars.csv file, return a list of metric/var/type."""
  # metric -> list of (variable name, type)
  assoc_metrics = collections.defaultdict(list)
  params_lookup = {}

  c = csv.reader(f)
  for i, row in enumerate(c):
    if i == 0:
      expected = ['metric', 'var', 'var_type', 'params']
      if row != expected:
        raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
      continue

    metric, var, var_type, params = row
    if var == '':
      full_var_name = metric
    else:
      full_var_name = '%s.%s' % (metric, var)
      # Also group multi-dimensional reports
      assoc_metrics[metric].append((var, var_type))

    params_lookup[full_var_name] = params

  return assoc_metrics, params_lookup


class VarSchema(object):
  """Object representing rappor-vars.csv.

  Right now we use it for slightly different purposes for dist and assoc
  analysis.
  """
  def __init__(self, f, params_dir):
    self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
    self.params_dir = params_dir

  def GetParamsPath(self, var_name):
    filename = self.params_lookup[var_name]
    return os.path.join(self.params_dir, filename + '.csv')

  def GetAssocMetrics(self):
    return self.assoc_metrics


def CountReports(f):
  num_reports = 0
  for line in f:
    first_col = line.split(',')[0]
    num_reports += int(first_col)
  return num_reports


DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')


def DistInputIter(stdin):
  """Read lines from stdin and extract fields to construct analysis tasks."""
  for line in stdin:
    m = DIST_INPUT_PATH_RE.match(line)
    if not m:
      raise RuntimeError('Invalid path %r' % line)

    counts_path = line.strip()
    date, field_id = m.groups()

    yield counts_path, date, field_id


def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
  """Print task spec for single variable RAPPOR to stdout."""

  num_bad = 0
  unique_ids = set()

  for counts_path, date, field_id in input_iter:
    unique_ids.add(field_id)

    # num_reports is used for filtering
    with open(counts_path) as f:
      num_reports = CountReports(f)

    # Look up field name from field ID
    if field_id_lookup:
      field_name = field_id_lookup.get(field_id)
      if field_name is None:
        # The metric id is the md5 hash of the name.  We can miss some, e.g. due
        # to debug builds.
        if bad_c:
          bad_c.writerow((date, field_id, num_reports))
          num_bad += 1
        continue
    else:
      field_name = field_id

    # NOTE: We could remove the params from the spec if decode_dist.R took the
    # --schema flag.  The var type is there too.
    params_path = var_schema.GetParamsPath(field_name)
    map_path= dist_maps.GetMapPath(field_name)

    yield num_reports, field_name, date, counts_path, params_path, map_path

  util.log('%d unique field IDs', len(unique_ids))
  if num_bad:
    util.log('Failed field ID -> field name lookup on %d files '
             '(check --field-ids file)', num_bad)


ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')


def AssocInputIter(stdin):
  """Read lines from stdin and extract fields to construct analysis tasks."""
  for line in stdin:
    m = ASSOC_INPUT_PATH_RE.match(line)
    if not m:
      raise RuntimeError('Invalid path %r' % line)

    reports_path = line.strip()
    date, metric_name = m.groups()

    yield reports_path, date, metric_name


def CreateAssocVarPairs(rappor_metrics):
  """Yield a list of pairs of variables that should be associated.

  For now just do all (string x boolean) analysis.
  """
  var_pairs = collections.defaultdict(list)

  for metric, var_list in rappor_metrics.iteritems():
    string_vars = []
    boolean_vars = []

    # Separate variables into strings and booleans
    for var_name, var_type in var_list:
      if var_type == 'string':
        string_vars.append(var_name)
      elif var_type == 'boolean':
        boolean_vars.append(var_name)
      else:
        util.log('Unknown type variable type %r', var_type)

    for s in string_vars:
      for b in boolean_vars:
        var_pairs[metric].append((s, b))
  return var_pairs


# For debugging
def PrintAssocVarPairs(var_pairs):
  for metric, var_list in var_pairs.iteritems():
    print metric
    for var_name, var_type in var_list:
      print '\t', var_name, var_type


def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
  """Print the task spec for multiple variable RAPPOR to stdout."""
  # Flow:
  #
  # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
  #
  # Short term: update_rappor.py should print every combination of string vs.
  # bool?  Or I guess we have it in rappor-vars.csv

  for reports_path, date, metric_name in input_iter:
    pairs = var_pairs[metric_name]
    for var1, var2 in pairs:
      # Assuming var1 is a string.  TODO: Use an assoc file, not dist_maps?
      field1_name = '%s.%s' % (metric_name, var1)
      map1_path = dist_maps.GetMapPath(field1_name)

      # e.g. domain_X_flags__DID_PROCEED
      # Don't use .. in filenames since it could be confusing.
      pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
      output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)

      yield metric_name, date, reports_path, var1, var2, map1_path, output_dir


def CreateOptionsParser():
  p = optparse.OptionParser()

  p.add_option(
      '--bad-report-out', dest='bad_report', metavar='PATH', type='str',
      default='',
      help='Optionally write a report of input filenames with invalid field '
           'IDs to this file.')
  p.add_option(
      '--config-dir', dest='config_dir', metavar='PATH', type='str',
      default='',
      help='Directory with metadata schema and params files to read.')
  p.add_option(
      '--map-dir', dest='map_dir', metavar='PATH', type='str',
      default='',
      help='Directory with map files to read.')
  p.add_option(
      '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
      default='',
      help='Root of the directory tree where analysis output will be placed.')
  p.add_option(
      '--field-ids', dest='field_ids', metavar='PATH', type='str',
      default='',
      help='Optional CSV file with field IDs (generally should not be used).')

  return p


def main(argv):
  (opts, argv) = CreateOptionsParser().parse_args(argv)

  if opts.bad_report:
    bad_f = open(opts.bad_report, 'w')
    bad_c = csv.writer(bad_f)
  else:
    bad_c = None

  action = argv[1]

  if not opts.config_dir:
    raise RuntimeError('--config-dir is required')
  if not opts.map_dir:
    raise RuntimeError('--map-dir is required')
  if not opts.output_base_dir:
    raise RuntimeError('--output-base-dir is required')

  # This is shared between the two specs.
  path = os.path.join(opts.config_dir, 'dist-analysis.csv')
  with open(path) as f:
    dist_maps = DistMapLookup(f, opts.map_dir)

  path = os.path.join(opts.config_dir, 'rappor-vars.csv')
  with open(path) as f:
    var_schema = VarSchema(f, opts.config_dir)

  if action == 'dist':
    if opts.field_ids:
      with open(opts.field_ids) as f:
        field_id_lookup = CreateFieldIdLookup(f)
    else:
      field_id_lookup = {}

    input_iter = DistInputIter(sys.stdin)
    for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
                            bad_c):
      # The spec is a series of space-separated tokens.
      tokens = row + (opts.output_base_dir,)
      print ' '.join(str(t) for t in tokens)

  elif action == 'assoc':
    # Parse input
    input_iter = AssocInputIter(sys.stdin)

    # Create M x N association tasks
    var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())

    # Now add the other constant stuff
    for row in AssocTaskSpec(
        input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):

      num_reports = 0  # placeholder, not filtering yet
      tokens = (num_reports,) + row
      print ' '.join(str(t) for t in tokens)

  else:
    raise RuntimeError('Invalid action %r' % action)


if __name__ == '__main__':
  try:
    main(sys.argv)
  except IOError, e:
    if e.errno != errno.EPIPE:  # ignore broken pipe
      raise
  except RuntimeError, e:
    print >>sys.stderr, 'FATAL: %s' % e
    sys.exit(1)