aboutsummaryrefslogtreecommitdiff
path: root/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'pipeline')
-rw-r--r--pipeline/README.md52
-rwxr-xr-xpipeline/alarm-lib.sh124
-rwxr-xr-xpipeline/assoc.sh152
-rwxr-xr-xpipeline/combine_results.py138
-rwxr-xr-xpipeline/combine_results_test.py38
-rwxr-xr-xpipeline/combine_status.py298
-rwxr-xr-xpipeline/combine_status_test.py38
-rwxr-xr-xpipeline/cook.sh147
-rwxr-xr-xpipeline/csv-to-html-test.sh63
-rwxr-xr-xpipeline/csv_to_html.py218
-rwxr-xr-xpipeline/csv_to_html_test.py24
-rwxr-xr-xpipeline/dist.sh135
-rwxr-xr-xpipeline/metric_status.R343
-rwxr-xr-xpipeline/regtest.sh161
-rwxr-xr-xpipeline/task_spec.py364
-rwxr-xr-xpipeline/task_spec_test.py61
-rwxr-xr-xpipeline/tools-lib.sh64
-rwxr-xr-xpipeline/ui.sh322
-rwxr-xr-xpipeline/util.py9
19 files changed, 2751 insertions, 0 deletions
diff --git a/pipeline/README.md b/pipeline/README.md
new file mode 100644
index 0000000..052ea9d
--- /dev/null
+++ b/pipeline/README.md
@@ -0,0 +1,52 @@
+pipeline
+========
+
+This directory contains tools and scripts for running a cron job that does
+RAPPOR analysis and generates an HTML dashboard.
+
+It works like this:
+
+1. `task_spec.py` generates a text file where each line corresponds to a process
+ to be run (a "task"). The process is `bin/decode-dist` or
+ `bin/decode-assoc`. The line contains the task parameters.
+
+2. `xargs -P` is used to run processes in parallel. Our analysis is generally
+ single-threaded (i.e. because R is single-threaded), so this helps utilize
+ the machine fully. Each task places its output in a different subdirectory.
+
+3. `cook.sh` calls `combine_results.py` to combine analysis results into a time
+ series. It also calls `combine_status.py` to keep track of task data for
+ "meta-analysis". `metric_status.R` generates more summary CSV files.
+
+4. `ui.sh` calls `csv_to_html.py` to generate an HTML fragments from the CSV
+ files.
+
+5. The JavaScript in `ui/ui.js` is loaded from static HTML, and makes AJAX calls
+ to retrieve the HTML fragments. The page is made interactive with
+ `ui/table-lib.js`.
+
+`dist.sh` and `assoc.sh` contain functions which coordinate this process.
+
+`alarm-lib.sh` is used to kill processes that have been running for too long.
+
+Testing
+-------
+
+`pipeline/regtest.sh` contains end-to-end demos of this process. Right now it
+depends on testdata from elsewhere in the tree:
+
+
+ rappor$ ./demo.sh run # prepare dist testdata
+ rappor$ cd bin
+
+ bin$ ./test.sh write-assoc-testdata # prepare assoc testdata
+ bin$ cd ../pipeline
+
+ pipeline$ ./regtest.sh dist
+ pipeline$ ./regtest.sh assoc
+
+ pipeline$ python -m SimpleHTTPServer # start a static web server
+
+ http://localhost:8000/_tmp/
+
+
diff --git a/pipeline/alarm-lib.sh b/pipeline/alarm-lib.sh
new file mode 100755
index 0000000..90495ce
--- /dev/null
+++ b/pipeline/alarm-lib.sh
@@ -0,0 +1,124 @@
+#!/bin/bash
+#
+# Alarm tool.
+#
+# Usage:
+# ./alarm.sh <function name>
+
+# You can source this file and use the alarm-status function.
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+# Run a command with a timeout, and print its status to a directory.
+#
+# Usage:
+# alarm-status job_dir/STATUS 10 \
+# flaky_command ...
+
+alarm-status() {
+ set +o errexit
+ local status_file=$1
+ shift # everything except the status file goes to perl
+
+ # NOTE: It would be nice to setpgrp() before exec? And then can the signal
+ # be delivered to the entire group, like kill -SIGALRM -PID?
+
+ # NOTE: If we did this in Python, the error message would also be clearer.
+ perl -e 'alarm shift; exec @ARGV or die "ERROR: after exec @ARGV"' "$@"
+ local exit_code=$?
+
+ set -o errexit
+
+ local result=''
+ case $exit_code in
+ 0)
+ # Would be nice to show elapsed time?
+ result='OK'
+ ;;
+ 9)
+ # decode_assoc.R will exit 9 if there are no reports AFTER
+ # --remove-bad-rows. A task can also be marked SKIPPED before running
+ # the child process (see backfill.sh).
+ result='SKIPPED by child process'
+ ;;
+ # exit code 142 means SIGALARM. 128 + 14 = 142. See 'kill -l'.
+ 142)
+ local seconds=$1
+ result="TIMEOUT after $seconds seconds"
+ ;;
+ *)
+ result="FAIL with status $exit_code"
+ ;;
+ esac
+ echo "$result"
+ echo "$result" > $status_file
+}
+
+_work() {
+ local n=10 # 2 seconds
+ for i in $(seq $n); do
+ echo $i - "$@"
+ sleep 0.2
+ done
+}
+
+_succeed() {
+ _work "$@"
+ exit 0
+}
+
+_fail() {
+ _work "$@"
+ exit 1
+}
+
+_skip() {
+ exit 9
+}
+
+# http://perldoc.perl.org/functions/alarm.html
+#
+# Delivers alarm. But how to get the process to have a distinct exit code?
+
+demo() {
+ mkdir -p _tmp
+
+ # timeout
+ alarm-status _tmp/A 1 $0 _succeed foo
+ echo
+
+ # ok
+ alarm-status _tmp/B 3 $0 _succeed bar
+ echo
+
+ # fail
+ alarm-status _tmp/C 3 $0 _fail baz
+ echo
+
+ # skip
+ alarm-status _tmp/D 3 $0 _skip baz
+ echo
+
+ head _tmp/{A,B,C,D}
+}
+
+test-simple() {
+ alarm-status _tmp/status.txt 1 sleep 2
+}
+
+test-bad-command() {
+ alarm-status _tmp/status.txt 1 nonexistent_sleep 2
+}
+
+# BUG
+test-perl() {
+ set +o errexit
+ perl -e 'alarm shift; exec @ARGV or die "ERROR after exec @ARGV"' 1 _sleep 2
+ echo $?
+}
+
+if test $(basename $0) = 'alarm-lib.sh'; then
+ "$@"
+fi
diff --git a/pipeline/assoc.sh b/pipeline/assoc.sh
new file mode 100755
index 0000000..2c6a54d
--- /dev/null
+++ b/pipeline/assoc.sh
@@ -0,0 +1,152 @@
+#!/bin/bash
+#
+# Usage:
+# ./assoc.sh <function name>
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+readonly THIS_DIR=$(dirname $0)
+readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd)
+
+source $RAPPOR_SRC/util.sh # log, banner
+source $RAPPOR_SRC/pipeline/tools-lib.sh
+source $RAPPOR_SRC/pipeline/alarm-lib.sh
+
+# Change the default location of these tools by setting DEP_*
+readonly DECODE_ASSOC=${DEP_DECODE_ASSOC:-$RAPPOR_SRC/bin/decode-assoc}
+readonly FAST_EM=${DEP_FAST_EM:-$RAPPOR_SRC/analysis/cpp/_tmp/fast_em}
+
+# Run a single decode-assoc process, to analyze one variable pair for one
+# metric. The arguments to this function are one row of the task spec.
+decode-one() {
+ # Job constants, from decode-many
+ local rappor_src=$1
+ local timeout_secs=$2
+ local min_reports=$3
+ local job_dir=$4
+ local sample_size=$5
+
+ # Task spec variables, from task_spec.py
+ local num_reports=$6
+ local metric_name=$7
+ local date=$8 # for output naming only
+ local reports=$9 # file with reports
+ local var1=${10}
+ local var2=${11}
+ local map1=${12}
+ local output_dir=${13}
+
+ local log_file=$output_dir/assoc-log.txt
+ local status_file=$output_dir/assoc-status.txt
+ mkdir --verbose -p $output_dir
+
+ # Flags drived from job constants
+ local schema=$job_dir/config/rappor-vars.csv
+ local params_dir=$job_dir/config
+ local em_executable=$FAST_EM
+
+ # TODO:
+ # - Skip jobs with few reports, like ./backfill.sh analyze-one.
+
+ # Output the spec for combine_status.py.
+ echo "$@" > $output_dir/assoc-spec.txt
+
+ # NOTE: Not passing --num-cores since we're parallelizing already.
+
+ # NOTE: --tmp-dir is the output dir. Then we just delete all the .bin files
+ # afterward so we don't copy them to x20 (they are big).
+
+ { time \
+ alarm-status $status_file $timeout_secs \
+ $DECODE_ASSOC \
+ --create-bool-map \
+ --remove-bad-rows \
+ --em-executable $em_executable \
+ --schema $schema \
+ --params-dir $params_dir \
+ --metric-name $metric_name \
+ --reports $reports \
+ --var1 $var1 \
+ --var2 $var2 \
+ --map1 $map1 \
+ --reports-sample-size $sample_size \
+ --tmp-dir $output_dir \
+ --output-dir $output_dir
+ } >$log_file 2>&1
+}
+
+test-decode-one() {
+ decode-one $RAPPOR_SRC
+}
+
+readonly DEFAULT_MIN_REPORTS=5000
+
+#readonly DEFAULT_TIMEOUT_SECONDS=300 # 5 minutes as a quick test.
+readonly DEFAULT_TIMEOUT_SECONDS=3600 # 1 hour
+
+readonly DEFAULT_MAX_PROCS=6 # TODO: Share with backfill.sh
+
+# Limit to 1M for now. Raise it when we have a full run.
+readonly DEFAULT_SAMPLE_SIZE=1000000
+
+readonly NUM_ARGS=8 # number of tokens in the task spec, used for xargs
+
+# Run many decode-assoc processes in parallel.
+decode-many() {
+ local job_dir=$1
+ local spec_list=$2
+
+ # These 3 params affect speed
+ local timeout_secs=${3:-$DEFAULT_TIMEOUT_SECONDS}
+ local sample_size=${4:-$DEFAULT_SAMPLE_SIZE}
+ local max_procs=${5:-$DEFAULT_MAX_PROCS}
+
+ local rappor_src=${6:-$RAPPOR_SRC}
+ local min_reports=${7:-$DEFAULT_MIN_REPORTS}
+
+ time cat $spec_list \
+ | xargs --verbose -n $NUM_ARGS -P $max_procs --no-run-if-empty -- \
+ $0 decode-one $rappor_src $timeout_secs $min_reports $job_dir $sample_size
+}
+
+# Combine assoc results and render HTML.
+
+combine-and-render-html() {
+ local jobs_base_dir=$1
+ local job_dir=$2
+
+ banner "Combining assoc task status"
+ TOOLS-cook combine-assoc-task-status $jobs_base_dir $job_dir
+
+ banner "Combining assoc results"
+ TOOLS-cook combine-assoc-results $jobs_base_dir $job_dir
+
+ banner "Splitting out status per metric, and writing overview"
+ TOOLS-cook assoc-metric-status $job_dir
+
+ TOOLS-gen-ui symlink-static assoc $job_dir
+
+ banner "Building overview .part.html from CSV"
+ TOOLS-gen-ui assoc-overview-part-html $job_dir
+
+ banner "Building metric .part.html from CSV"
+ TOOLS-gen-ui assoc-metric-part-html $job_dir
+
+ banner "Building pair .part.html from CSV"
+ TOOLS-gen-ui assoc-pair-part-html $job_dir
+
+ banner "Building day .part.html from CSV"
+ TOOLS-gen-ui assoc-day-part-html $job_dir
+}
+
+# Temp files left over by the fast_em R <-> C++.
+list-and-remove-bin() {
+ local job_dir=$1
+ # If everything failed, we might not have anything to list/delete.
+ find $job_dir -name \*.bin | xargs --no-run-if-empty -- ls -l --si
+ find $job_dir -name \*.bin | xargs --no-run-if-empty -- rm -f --verbose
+}
+
+"$@"
diff --git a/pipeline/combine_results.py b/pipeline/combine_results.py
new file mode 100755
index 0000000..6cb0150
--- /dev/null
+++ b/pipeline/combine_results.py
@@ -0,0 +1,138 @@
+#!/usr/bin/python
+"""Combines results from multiple days of a single metric.
+
+Feed it the STATUS.txt files on stdin. It then finds the corresponding
+results.csv, and takes the top N items.
+
+Example:
+
+Date, "google.com,", yahoo.com
+2015-03-01, 0.0, 0.9
+2015-03-02, 0.1, 0.8
+
+Dygraphs can load this CSV file directly.
+
+TODO: Use different dygraph API?
+
+Also we need error bars.
+
+ new Dygraph(document.getElementById("graphdiv2"),
+ [
+ [1,10,100],
+ [2,20,80],
+ [3,50,60],
+ [4,70,80]
+ ],
+ {
+ labels: [ "Date", "failure", "timeout", "google.com" ]
+ });
+"""
+
+import collections
+import csv
+import json
+import os
+import sys
+
+import util
+
+
+def CombineDistResults(stdin, c_out, num_top):
+ dates = []
+ var_cols = collections.defaultdict(dict) # {name: {date: value}}
+
+ seen_dates = set()
+
+ for line in stdin:
+ status_path = line.strip()
+
+ # Assume it looks like .../2015-03-01/STATUS.txt
+ task_dir = os.path.dirname(status_path)
+ date = os.path.basename(task_dir)
+
+ # Get rid of duplicate dates. These could be caused by retries.
+ if date in seen_dates:
+ continue
+
+ seen_dates.add(date)
+
+ with open(status_path) as f:
+ status = f.readline().split()[0] # OK, FAIL, TIMEOUT, SKIPPED
+
+ dates.append(date)
+
+ if status != 'OK':
+ continue # won't have results.csv
+
+ results_path = os.path.join(task_dir, 'results.csv')
+ with open(results_path) as f:
+ c = csv.reader(f)
+ unused_header = c.next() # header row
+
+ # they are sorted by decreasing "estimate", which is what we want
+ for i in xrange(0, num_top):
+ try:
+ row = c.next()
+ except StopIteration:
+ # It's OK if it doesn't have enough
+ util.log('Stopping early. Fewer than %d results to render.', num_top)
+ break
+
+ string, _, _, proportion, _, prop_low, prop_high = row
+
+ # dygraphs has a weird format with semicolons:
+ # value;lower;upper,value;lower;upper.
+
+ # http://dygraphs.com/data.html#csv
+
+ # Arbitrarily use 4 digits after decimal point (for dygraphs, not
+ # directly displayed)
+ dygraph_triple = '%.4f;%.4f;%.4f' % (
+ float(prop_low), float(proportion), float(prop_high))
+
+ var_cols[string][date] = dygraph_triple
+
+ # Now print CSV on stdout.
+ cols = sorted(var_cols.keys()) # sort columns alphabetically
+ c_out.writerow(['date'] + cols)
+
+ dates.sort()
+
+ for date in dates:
+ row = [date]
+ for col in cols:
+ cell = var_cols[col].get(date) # None mean sthere is no row
+ row.append(cell)
+ c_out.writerow(row)
+
+ #util.log("Number of dynamic cols: %d", len(var_cols))
+
+
+def CombineAssocResults(stdin, c_out, num_top):
+ header = ('dummy',)
+ c_out.writerow(header)
+
+
+def main(argv):
+ action = argv[1]
+
+ if action == 'dist':
+ num_top = int(argv[2]) # number of values to keep
+ c_out = csv.writer(sys.stdout)
+ CombineDistResults(sys.stdin, c_out, num_top)
+
+ elif action == 'assoc':
+ num_top = int(argv[2]) # number of values to keep
+ c_out = csv.writer(sys.stdout)
+ CombineAssocResults(sys.stdin, c_out, num_top)
+
+ else:
+ raise RuntimeError('Invalid action %r' % action)
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)
diff --git a/pipeline/combine_results_test.py b/pipeline/combine_results_test.py
new file mode 100755
index 0000000..84c4cb7
--- /dev/null
+++ b/pipeline/combine_results_test.py
@@ -0,0 +1,38 @@
+#!/usr/bin/python -S
+"""
+combine_results_test.py: Tests for combine_results.py
+"""
+
+import csv
+import cStringIO
+import unittest
+
+import combine_results # module under test
+
+
+# TODO: Make these test more the header row. They rely heavily on the file
+# system!
+
+class CombineResultsTest(unittest.TestCase):
+
+ def testCombineDistResults(self):
+ stdin = cStringIO.StringIO('')
+ out = cStringIO.StringIO()
+ c_out = csv.writer(out)
+
+ combine_results.CombineDistResults(stdin, c_out, 10)
+ actual = out.getvalue()
+ self.assert_(actual.startswith('date'), actual)
+
+ def testCombineAssocResults(self):
+ stdin = cStringIO.StringIO('')
+ out = cStringIO.StringIO()
+ c_out = csv.writer(out)
+
+ combine_results.CombineAssocResults(stdin, c_out, 10)
+ actual = out.getvalue()
+ self.assert_(actual.startswith('dummy'), actual)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pipeline/combine_status.py b/pipeline/combine_status.py
new file mode 100755
index 0000000..4fbb36a
--- /dev/null
+++ b/pipeline/combine_status.py
@@ -0,0 +1,298 @@
+#!/usr/bin/python
+"""Summarize the results of many RAPPOR analysis runs.
+
+Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
+and log.txt files. Writes a CSV to stdout. Row key is (metric, date).
+"""
+
+import collections
+import csv
+import json
+import os
+import re
+import sys
+
+
+# Parse bash 'time' output:
+# real 0m11.578s
+
+# TODO: Parse the time from metrics.json instead.
+TIMING_RE = re.compile(
+ r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
+
+# TODO: Could have decode-dist and decode-assoc output the PID?
+PID_RE = re.compile(
+ r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal
+
+
+def ParseMemCsv(f):
+ """Compute summary stats for memory.
+
+ vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses
+ the kernel, it's accurate except for takes that spike in their last 4
+ seconds.
+
+ vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals
+ """
+ peak_by_pid = collections.defaultdict(list)
+ size_by_pid = collections.defaultdict(list)
+
+ # Parse columns we care about, by PID
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ continue # skip header
+ # looks like timestamp, pid, then (rss, peak, size)
+ _, pid, _, peak, size = row
+ if peak != '':
+ peak_by_pid[pid].append(int(peak))
+ if size != '':
+ size_by_pid[pid].append(int(size))
+
+ mem_by_pid = {}
+
+ # Now compute summaries
+ pids = peak_by_pid.keys()
+ for pid in pids:
+ peaks = peak_by_pid[pid]
+ vm5_peak_kib = max(peaks)
+
+ sizes = size_by_pid[pid]
+ vm5_mean_kib = sum(sizes) / len(sizes)
+
+ mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
+
+ return mem_by_pid
+
+
+def CheckJobId(job_id, parts):
+ """Sanity check for date or smoke test."""
+ if not job_id.startswith('201') and not job_id.startswith('smoke'):
+ raise RuntimeError(
+ "Expected job ID to start with '201' or 'smoke': got %r (%s)" %
+ (job_id, parts))
+
+
+def ReadStatus(f):
+ status_line = f.readline().strip()
+ return status_line.split()[0] # OK, TIMEOUT, FAIL
+
+
+def CombineDistTaskStatus(stdin, c_out, mem_by_pid):
+ """Read status task paths from stdin, write CSV summary to c_out'."""
+
+ #util.log('%s', mem_by_pid)
+
+ # Parses:
+ # - input path for metric name and date
+ # - spec.txt for task params
+ # - STATUS.txt for task success/failure
+ # - metrics.json for output metrics
+ # - log.txt for timing, if it ran to completion
+ # - and for structured data
+ # - join with mem by PID
+
+ header = (
+ 'job_id', 'params_file', 'map_file',
+ 'metric', 'date',
+ 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped
+ 'seconds', 'status',
+ # only set when OK
+ 'num_reports', 'num_rappor', 'allocated_mass',
+ # only set when failed
+ 'fail_reason')
+ c_out.writerow(header)
+
+ for line in stdin:
+ #
+ # Receive a STATUS.txt path on each line of stdin, and parse it.
+ #
+ status_path = line.strip()
+
+ with open(status_path) as f:
+ status = ReadStatus(f)
+
+ # Path should look like this:
+ # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
+ parts = status_path.split('/')
+ job_id = parts[-5]
+ CheckJobId(job_id, parts)
+
+ #
+ # Parse the job spec
+ #
+ result_dir = os.path.dirname(status_path)
+ spec_file = os.path.join(result_dir, 'spec.txt')
+ with open(spec_file) as f:
+ spec_line = f.readline()
+ # See backfill.sh analyze-one for the order of these 7 fields.
+ # There are 3 job constants on the front.
+ (num_reports, metric_name, date, counts_path, params_path,
+ map_path, _) = spec_line.split()
+
+ # NOTE: These are all constant per metric. Could have another CSV and
+ # join. But denormalizing is OK for now.
+ params_file = os.path.basename(params_path)
+ map_file = os.path.basename(map_path)
+
+ # remove extension
+ params_file, _ = os.path.splitext(params_file)
+ map_file, _ = os.path.splitext(map_file)
+
+ #
+ # Read the log
+ #
+ log_file = os.path.join(result_dir, 'log.txt')
+ with open(log_file) as f:
+ lines = f.readlines()
+
+ # Search lines in reverse order for total time. It could have output from
+ # multiple 'time' statements, and we want the last one.
+ seconds = None # for skipped
+ for i in xrange(len(lines) - 1, -1, -1):
+ # TODO: Parse the R timing too. Could use LOG_RECORD_RE.
+ m = TIMING_RE.search(lines[i])
+ if m:
+ min_part, sec_part = m.groups()
+ seconds = float(min_part) * 60 + float(sec_part)
+ break
+
+ # Extract stack trace
+ if status == 'FAIL':
+ # Stack trace looks like: "Calls: main -> RunOne ..."
+ fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
+ else:
+ fail_reason = None
+
+ # Extract PID and join with memory results
+ pid = None
+ vm5_peak_kib = None
+ vm5_mean_kib = None
+ if mem_by_pid:
+ for line in lines:
+ m = PID_RE.match(line)
+ if m:
+ pid = m.group(1)
+ # Could the PID not exist if the process was super short was less
+ # than 5 seconds?
+ try:
+ vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
+ except KeyError: # sometimes we don't add mem-track on the front
+ vm5_peak_kib, vm5_mean_kib = None, None
+ break
+ else:
+ pass # we weren't passed memory.csv
+
+ #
+ # Read the metrics
+ #
+ metrics = {}
+ metrics_file = os.path.join(result_dir, 'metrics.json')
+ if os.path.isfile(metrics_file):
+ with open(metrics_file) as f:
+ metrics = json.load(f)
+
+ num_rappor = metrics.get('num_detected')
+ allocated_mass = metrics.get('allocated_mass')
+
+ # Construct and write row
+ row = (
+ job_id, params_file, map_file,
+ metric_name, date,
+ vm5_peak_kib, vm5_mean_kib,
+ seconds, status,
+ num_reports, num_rappor, allocated_mass,
+ fail_reason)
+
+ c_out.writerow(row)
+
+
+def CombineAssocTaskStatus(stdin, c_out):
+ """Read status task paths from stdin, write CSV summary to c_out'."""
+
+ header = (
+ 'job_id', 'metric', 'date', 'status', 'num_reports',
+ 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
+ 'd2')
+
+ c_out.writerow(header)
+
+ for line in stdin:
+ status_path = line.strip()
+
+ with open(status_path) as f:
+ status = ReadStatus(f)
+
+ parts = status_path.split('/')
+ job_id = parts[-6]
+ CheckJobId(job_id, parts)
+
+ #
+ # Parse the job spec
+ #
+ result_dir = os.path.dirname(status_path)
+ spec_file = os.path.join(result_dir, 'assoc-spec.txt')
+ with open(spec_file) as f:
+ spec_line = f.readline()
+ # See backfill.sh analyze-one for the order of these 7 fields.
+ # There are 3 job constants on the front.
+
+ # 5 job params
+ (_, _, _, _, _,
+ dummy_num_reports, metric_name, date, reports, var1, var2, map1,
+ output_dir) = spec_line.split()
+
+ #
+ # Parse decode-assoc metrics
+ #
+ metrics = {}
+ metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
+ if os.path.isfile(metrics_file):
+ with open(metrics_file) as f:
+ metrics = json.load(f)
+
+ # After we run it we have the actual number of reports
+ num_reports = metrics.get('num_reports')
+ total_elapsed_seconds = metrics.get('total_elapsed_time')
+ em_elapsed_seconds = metrics.get('em_elapsed_time')
+ estimate_dimensions = metrics.get('estimate_dimensions')
+ if estimate_dimensions:
+ d1, d2 = estimate_dimensions
+ else:
+ d1, d2 = (0, 0) # unknown
+
+ row = (
+ job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
+ em_elapsed_seconds, var1, var2, d1, d2)
+ c_out.writerow(row)
+
+
+def main(argv):
+ action = argv[1]
+
+ try:
+ mem_csv = argv[2]
+ except IndexError:
+ mem_by_pid = None
+ else:
+ with open(mem_csv) as f:
+ mem_by_pid = ParseMemCsv(f)
+
+ if action == 'dist':
+ c_out = csv.writer(sys.stdout)
+ CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
+
+ elif action == 'assoc':
+ c_out = csv.writer(sys.stdout)
+ CombineAssocTaskStatus(sys.stdin, c_out)
+
+ else:
+ raise RuntimeError('Invalid action %r' % action)
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)
diff --git a/pipeline/combine_status_test.py b/pipeline/combine_status_test.py
new file mode 100755
index 0000000..4606587
--- /dev/null
+++ b/pipeline/combine_status_test.py
@@ -0,0 +1,38 @@
+#!/usr/bin/python -S
+"""
+combine_status_test.py: Tests for combine_status.py
+"""
+
+import csv
+import cStringIO
+import unittest
+
+import combine_status # module under test
+
+
+# TODO: Make these test more the header row. They rely heavily on the file
+# system!
+
+class CombineStatusTest(unittest.TestCase):
+
+ def testCombineDistTaskStatus(self):
+ stdin = cStringIO.StringIO('')
+ out = cStringIO.StringIO()
+ c_out = csv.writer(out)
+
+ combine_status.CombineDistTaskStatus(stdin, c_out, {})
+ actual = out.getvalue()
+ self.assert_(actual.startswith('job_id,params_file,'), actual)
+
+ def testCombineAssocTaskStatus(self):
+ stdin = cStringIO.StringIO('')
+ out = cStringIO.StringIO()
+ c_out = csv.writer(out)
+
+ combine_status.CombineAssocTaskStatus(stdin, c_out)
+ actual = out.getvalue()
+ self.assert_(actual.startswith('job_id,metric,'), actual)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pipeline/cook.sh b/pipeline/cook.sh
new file mode 100755
index 0000000..e820d44
--- /dev/null
+++ b/pipeline/cook.sh
@@ -0,0 +1,147 @@
+#!/bin/bash
+#
+# Take the raw data from the analysis and massage it into various formats
+# suitable for display.
+#
+# Usage:
+# ./cook.sh <function name>
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+readonly THIS_DIR=$(dirname $0)
+readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd)
+
+source $RAPPOR_SRC/pipeline/tools-lib.sh
+
+
+status-files() {
+ local dir=$1
+ find $dir -name STATUS.txt
+}
+
+results-files() {
+ local dir=$1
+ find $dir -name results.csv
+}
+
+count-results() {
+ # first field of each line is one of {OK, TIMEOUT, FAIL, SKIPPED}
+ status-files "$@" \
+ | xargs cat \
+ | cut -d ' ' -f 1 \
+ | sort | uniq -c | sort -n -r
+}
+
+#
+# For dist cron job
+#
+
+# Combine status of tasks over multiple jobs. Each row is a task (decode-dist
+# invocation). This has the number of reports.
+combine-dist-task-status() {
+ local base_dir=${1:-~/rappor/cron}
+ local job_dir=${2:-~/rappor/cron/2015-05-22__05-58-01}
+
+ local out=$job_dir/task-status.csv
+
+ # Ignore memory for now.
+ time status-files $base_dir | TOOLS-combine-status dist > $out
+ echo "Wrote $out"
+}
+
+# Create a single dist.csv time series for a GIVEN metric.
+combine-dist-results-one() {
+ local base_dir=$1
+ local job_dir=$2
+ local metric_name=$3
+ #echo FOO $base_dir $metric_name
+
+ local out_dir=$job_dir/cooked/$metric_name
+ mkdir -p $out_dir
+
+ # Glob to capture this specific metric name over ALL job IDs.
+ find $base_dir/*/raw/$metric_name -name STATUS.txt \
+ | TOOLS-combine-results dist 5 \
+ > $out_dir/dist.csv
+}
+
+# Creates a dist.csv file for EACH metric. TODO: Rename one/many
+combine-dist-results() {
+ local base_dir=${1:-~/rappor/cron}
+ local job_dir=${2:-~/rappor/cron/2015-05-22__05-58-01}
+
+ # Direct subdirs of 'raw' are metrics. Just print filename.
+ find $base_dir/*/raw -mindepth 1 -maxdepth 1 -type d -a -printf '%f\n' \
+ | sort | uniq \
+ | xargs --verbose -n1 -- \
+ $0 combine-dist-results-one $base_dir $job_dir
+}
+
+# Take the task-status.csv file, which has row key (metric, date). Writes
+# num_reports.csv and status.csv per metric, and a single overview.csv for all
+# metrics.
+dist-metric-status() {
+ local job_dir=${1:-_tmp/results-10}
+ local out_dir=$job_dir/cooked
+
+ TOOLS-metric-status dist $job_dir/task-status.csv $out_dir
+}
+
+#
+# For association analysis cron job
+#
+
+combine-assoc-task-status() {
+ local base_dir=${1:-~/rappor/chrome-assoc-smoke}
+ local job_dir=${2:-$base_dir/smoke1}
+
+ local out=$job_dir/assoc-task-status.csv
+
+ time find $base_dir -name assoc-status.txt \
+ | TOOLS-combine-status assoc \
+ > $out
+
+ echo "Wrote $out"
+}
+
+# Create a single assoc.csv time series for a GIVEN (var1, var2) pair.
+combine-assoc-results-one() {
+ local base_dir=$1
+ local job_dir=$2
+ local metric_pair_rel_path=$3
+
+ local out_dir=$job_dir/cooked/$metric_pair_rel_path
+ mkdir -p $out_dir
+
+ # Glob to capture this specific metric name over ALL job IDs.
+ find $base_dir/*/raw/$metric_pair_rel_path -name assoc-status.txt \
+ | TOOLS-combine-results assoc 5 \
+ > $out_dir/assoc-results-series.csv
+}
+
+# Creates a dist.csv file for EACH metric. TODO: Rename one/many
+combine-assoc-results() {
+ local base_dir=${1:-~/rappor/chrome-assoc-smoke}
+ local job_dir=${2:-$base_dir/smoke3}
+
+ # Direct subdirs of 'raw' are metrics, and subdirs of that are variable
+ # pairs. Print "$metric_name/$pair_name".
+ find $base_dir/*/raw -mindepth 2 -maxdepth 2 -type d -a -printf '%P\n' \
+ | sort | uniq \
+ | xargs --verbose -n1 -- \
+ $0 combine-assoc-results-one $base_dir $job_dir
+}
+
+# Take the assoc-task-status.csv file, which has row key (metric, date). Writes
+# num_reports.csv and status.csv per metric, and a single overview.csv for all
+# metrics.
+assoc-metric-status() {
+ local job_dir=${1:-~/rappor/chrome-assoc-smoke/smoke3}
+ local out_dir=$job_dir/cooked
+
+ TOOLS-metric-status assoc $job_dir/assoc-task-status.csv $out_dir
+}
+
+"$@"
diff --git a/pipeline/csv-to-html-test.sh b/pipeline/csv-to-html-test.sh
new file mode 100755
index 0000000..754d083
--- /dev/null
+++ b/pipeline/csv-to-html-test.sh
@@ -0,0 +1,63 @@
+#!/bin/bash
+#
+# Test for csv_to_html.py.
+#
+# Usage:
+# ./csv-to-html-test.sh <function name>
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+test-basic() {
+ ./csv_to_html.py <<EOF
+a_number,b
+1,2
+3,4
+NA,4
+EOF
+}
+
+test-col-format() {
+ ./csv_to_html.py \
+ --col-format 'b <a href="../{b}/metric.html">{b}</a>' <<EOF
+a,b
+1,2015-05-01
+3,2015-05-02
+EOF
+}
+
+test-var-def() {
+ ./csv_to_html.py \
+ --def 'v VALUE' \
+ --col-format 'b <a href="../{b}/metric.html">{v}</a>' <<EOF
+a,b
+1,2
+3,4
+EOF
+}
+
+test-as-percent() {
+ ./csv_to_html.py \
+ --as-percent b <<EOF
+a,b
+A,0.21
+B,0.001
+C,0.0009
+D,0.0001
+EOF
+}
+
+if test $# -eq 0; then
+ test-basic
+ echo '--'
+ test-col-format
+ echo '--'
+ test-var-def
+ echo '--'
+ test-as-percent
+ echo '--'
+ echo 'OK'
+else
+ "$@"
+fi
diff --git a/pipeline/csv_to_html.py b/pipeline/csv_to_html.py
new file mode 100755
index 0000000..e4d76ae
--- /dev/null
+++ b/pipeline/csv_to_html.py
@@ -0,0 +1,218 @@
+#!/usr/bin/python
+"""Reads a CSV file on stdin, and prints an an HTML table on stdout.
+
+The static HTML can then be made made dynamic with JavaScript, e.g. jQuery
+DataTable.
+
+Use Cases:
+
+ - overview.csv -- each row is a metric
+ - links: to metric page
+
+ - status.csv -- each row is a day
+ - links: to log.txt, to results.html
+"""
+
+import cgi
+import csv
+import optparse
+import sys
+
+import util
+
+
+def CreateOptionsParser():
+ p = optparse.OptionParser()
+
+ # We are taking a path, and not using stdin, because we read it twice.
+ p.add_option(
+ '--col-format', dest='col_formats', metavar="'COLNAME FMT'", type='str',
+ default=[], action='append',
+ help='Add HTML links to the named column, using the given Python '
+ '.format() string')
+
+ p.add_option(
+ '--def', dest='defs', metavar="'NAME VALUE'", type='str',
+ default=[], action='append',
+ help='Define varaibles for use in format strings')
+
+ p.add_option(
+ '--as-percent', dest='percent_cols', metavar="COLNAME", type='str',
+ default=[], action='append',
+ help='Format this floating point column as a percentage string')
+
+ # TODO: We could include this by default, and then change all the HTML to
+ # have <div> placeholders instead of <table>.
+ p.add_option(
+ '--table', dest='table', default=False, action='store_true',
+ help='Add <table></table> tags (useful for testing)')
+
+ return p
+
+
+def ParseSpec(arg_list):
+ """Given an argument list, return a string -> string dictionary."""
+ # The format string is passed the cell value. Escaped as HTML?
+ d = {}
+ for s in arg_list:
+ try:
+ name, value = s.split(' ', 1)
+ except ValueError:
+ raise RuntimeError('Invalid column format %r' % s)
+ d[name] = value
+ return d
+
+
+def PrintRow(row, col_names, col_formats, defs, percent_cols):
+ """Print a CSV row as HTML, using the given formatting.
+
+ Returns:
+ An array of booleans indicating whether each cell is a number.
+ """
+ is_number_flags = [False] * len(col_names)
+
+ for i, cell in enumerate(row):
+ # The cell as a string. By default we leave it as is; it may be mutated
+ # below.
+ cell_str = cell
+ css_class = '' # CSS class for the cell.
+ col_name = col_names[i] # column that the cell is under
+
+ # Does the cell look like a float?
+ try:
+ cell_float = float(cell)
+ if col_name in percent_cols: # Floats can be formatted as percentages.
+ cell_str = '{:.1f}%'.format(cell_float * 100)
+ else:
+ # Arbitrarily use 3 digits of precision for display
+ cell_str = '{:.3f}'.format(cell_float)
+ css_class = 'num'
+ is_number_flags[i] = True
+ except ValueError:
+ pass
+
+ # Does it look lik an int?
+ try:
+ cell_int = int(cell)
+ cell_str = '{:,}'.format(cell_int)
+ css_class = 'num'
+ is_number_flags[i] = True
+ except ValueError:
+ pass
+
+ # Special CSS class for R NA values.
+ if cell_str.strip() == 'NA':
+ css_class = 'num na' # num should right justify; na should make it red
+ is_number_flags[i] = True
+
+ if css_class:
+ print ' <td class="{}">'.format(css_class),
+ else:
+ print ' <td>',
+
+ cell_safe = cgi.escape(cell_str)
+
+ # If the cell has a format string, print it this way.
+
+ fmt = col_formats.get(col_name) # e.g. "../{date}.html"
+ if fmt:
+ # Copy variable bindings
+ bindings = dict(defs)
+
+ # Also let the format string use other column names. TODO: Is there a
+ # more efficient way?
+ bindings.update(zip(col_names, [cgi.escape(c) for c in row]))
+
+ bindings[col_name] = cell_safe
+
+ print fmt.format(**bindings), # no newline
+ else:
+ print cell_safe, # no newline
+
+ print '</td>'
+
+ return is_number_flags
+
+
+def ReadCsv(f):
+ """Read the CSV file, returning the column names and rows."""
+ c = csv.reader(f)
+
+ # The first row of the CSV is assumed to be a header. The rest are data.
+ col_names = []
+ rows = []
+ for i, row in enumerate(c):
+ if i == 0:
+ col_names = row
+ continue
+ rows.append(row)
+ return col_names, rows
+
+
+def PrintColGroup(col_names, col_is_numeric):
+ """Print HTML colgroup element, used for JavaScript sorting."""
+ print '<colgroup>'
+ for i, col in enumerate(col_names):
+ # CSS class is used for sorting
+ if col_is_numeric[i]:
+ css_class = 'number'
+ else:
+ css_class = 'case-insensitive'
+
+ # NOTE: id is a comment only; not used
+ print ' <col id="{}" type="{}" />'.format(col, css_class)
+ print '</colgroup>'
+
+
+def main(argv):
+ (opts, argv) = CreateOptionsParser().parse_args(argv)
+
+ col_formats = ParseSpec(opts.col_formats)
+ defs = ParseSpec(opts.defs)
+
+ col_names, rows = ReadCsv(sys.stdin)
+
+ for col in opts.percent_cols:
+ if col not in col_names:
+ raise RuntimeError('--percent-col %s is not a valid column' % col)
+
+ # By default, we don't print the <table> bit -- that's up to the host page
+ if opts.table:
+ print '<table>'
+
+ print '<thead>'
+ for col in col_names:
+ # change _ to space so long column names can wrap
+ print ' <td>%s</td>' % cgi.escape(col.replace('_', ' '))
+ print '</thead>'
+
+ # Assume all columns are numeric at first. Look at each row for non-numeric
+ # values.
+ col_is_numeric = [True] * len(col_names)
+
+ print '<tbody>'
+ for row in rows:
+ print ' <tr>'
+ is_number_flags = PrintRow(row, col_names, col_formats, defs,
+ opts.percent_cols)
+
+ # If one cell in a column is not a number, then the whole cell isn't.
+ for (i, is_number) in enumerate(is_number_flags):
+ if not is_number:
+ col_is_numeric[i] = False
+
+ print ' </tr>'
+ print '</tbody>'
+
+ PrintColGroup(col_names, col_is_numeric)
+
+ if opts.table:
+ print '</table>'
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)
diff --git a/pipeline/csv_to_html_test.py b/pipeline/csv_to_html_test.py
new file mode 100755
index 0000000..5fd5822
--- /dev/null
+++ b/pipeline/csv_to_html_test.py
@@ -0,0 +1,24 @@
+#!/usr/bin/python -S
+"""
+csv_to_html_test.py: Tests for csv_to_html.py
+"""
+
+import unittest
+
+import csv_to_html # module under test
+
+
+class CsvToHtmlTest(unittest.TestCase):
+
+ def testParseSpec(self):
+ self.assertEqual(
+ {'foo': 'bar', 'spam': 'eggs'},
+ csv_to_html.ParseSpec(['foo bar', 'spam eggs']))
+
+ self.assertEqual(
+ {},
+ csv_to_html.ParseSpec([]))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pipeline/dist.sh b/pipeline/dist.sh
new file mode 100755
index 0000000..ad33006
--- /dev/null
+++ b/pipeline/dist.sh
@@ -0,0 +1,135 @@
+#!/bin/bash
+#
+# Usage:
+# ./dist.sh <function name>
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+readonly THIS_DIR=$(dirname $0)
+readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd)
+
+source $RAPPOR_SRC/util.sh # log, banner
+source $RAPPOR_SRC/pipeline/tools-lib.sh
+source $RAPPOR_SRC/pipeline/alarm-lib.sh
+
+readonly DECODE_DIST=${DEP_DECODE_DIST:-$RAPPOR_SRC/bin/decode-dist}
+
+readonly NUM_ARGS=7 # used for xargs
+
+decode-dist-one() {
+ # Job constants
+ local rappor_src=$1
+ local timeout_secs=$2
+ local min_reports=$3
+ shift 3 # job constants do not vary per task and are not part of the spec
+
+ # 7 spec variables
+ local num_reports=$1 # unused, only for filtering
+ local metric_name=$2
+ local date=$3
+ local counts=$4
+ local params=$5
+ local map=$6
+ local results_dir=$7
+
+ local task_dir=$results_dir/$metric_name/$date
+ mkdir --verbose -p $task_dir
+
+ local log_file=$task_dir/log.txt
+ local status_file=$task_dir/STATUS.txt
+
+ # Record the spec so we know params, counts, etc.
+ echo "$@" > $task_dir/spec.txt
+
+ if test $num_reports -lt $min_reports; then
+ local msg="SKIPPED because $num_reports reports is less than $min_reports"
+ # Duplicate this message
+ echo "$msg" > $status_file
+ echo "$msg" > $log_file
+ return
+ fi
+
+ # Run it with a timeout, and record status in the task dir.
+ { time \
+ alarm-status $status_file $timeout_secs \
+ $DECODE_DIST \
+ --counts $counts \
+ --params $params \
+ --map $map \
+ --output-dir $task_dir \
+ --adjust-counts-hack
+ } >$log_file 2>&1
+
+ # TODO: Don't pass --adjust-counts-hack unless the user asks for it.
+}
+
+# Print the number of processes to use.
+# NOTE: This is copied from google/rappor regtest.sh.
+# It also doesn't take into account the fact that we are memory-bound.
+#
+# 128 GiB / 4GiB would also imply about 32 processes though.
+num-processes() {
+ local processors=$(grep -c ^processor /proc/cpuinfo || echo 4)
+ if test $processors -gt 1; then # leave one CPU for the OS
+ processors=$(expr $processors - 1)
+ fi
+ echo $processors
+}
+
+#readonly DEFAULT_MAX_PROCS=6 # for andychu2.hot, to avoid locking up UI
+#readonly DEFAULT_MAX_PROCS=16 # for rappor-ac.hot, to avoid thrashing
+readonly DEFAULT_MAX_PROCS=$(num-processes)
+
+#readonly DEFAULT_MAX_TASKS=12
+readonly DEFAULT_MAX_TASKS=10000 # more than the max
+
+# NOTE: Since we have 125 GB RAM, and processes can take up to 12 gigs of RAM,
+# only use parallelism of 10, even though we have 31 cores.
+
+readonly DEFAULT_MIN_REPORTS=5000
+
+
+decode-dist-many() {
+ local job_dir=$1
+ local spec_list=$2
+ local timeout_secs=${3:-1200} # default timeout
+ local max_procs=${4:-$DEFAULT_MAX_PROCS}
+ local rappor_src=${5:-$RAPPOR_SRC}
+ local min_reports=${6:-$DEFAULT_MIN_REPORTS}
+
+ local interval_secs=5
+ local pid_dir="$job_dir/pids"
+ local sys_mem="$job_dir/system-mem.csv"
+ mkdir --verbose -p $pid_dir
+
+ time cat $spec_list \
+ | xargs --verbose -n $NUM_ARGS -P $max_procs --no-run-if-empty -- \
+ $0 decode-dist-one $rappor_src $timeout_secs $min_reports
+}
+
+# Combine/summarize results and task metadata from the parallel decode-dist
+# processes. Render them as HTML.
+combine-and-render-html() {
+ local jobs_base_dir=$1
+ local job_dir=$2
+
+ banner "Combining dist task status"
+ TOOLS-cook combine-dist-task-status $jobs_base_dir $job_dir
+
+ banner "Combining dist results"
+ TOOLS-cook combine-dist-results $jobs_base_dir $job_dir
+
+ banner "Splitting out status per metric, and writing overview"
+ TOOLS-cook dist-metric-status $job_dir
+
+ # The task-status.csv file should have the a JOB ID.
+ banner "Building overview.html and per-metric HTML"
+ TOOLS-gen-ui build-html1 $job_dir
+
+ banner "Building individual results.html (for ONE day)"
+ TOOLS-gen-ui results-html $job_dir
+}
+
+"$@"
diff --git a/pipeline/metric_status.R b/pipeline/metric_status.R
new file mode 100755
index 0000000..0774423
--- /dev/null
+++ b/pipeline/metric_status.R
@@ -0,0 +1,343 @@
+#!/usr/bin/Rscript
+#
+# Write an overview of task status, per-metric task status, task histograms.
+
+library(data.table)
+library(ggplot2)
+
+options(stringsAsFactors = FALSE) # get rid of annoying behavior
+
+Log <- function(fmt, ...) {
+ cat(sprintf(fmt, ...))
+ cat('\n')
+}
+
+# max of non-NA values; NA if there are none
+MaybeMax <- function(values) {
+ v <- values[!is.na(values)]
+ if (length(v) == 0) {
+ m <- NA
+ } else {
+ m <- max(v)
+ }
+ as.numeric(m) # data.table requires this; otherwise we get type errors
+}
+
+# mean of non-NA values; NA if there are none
+MaybeMean <- function(values) {
+ v <- values[!is.na(values)]
+ if (length(v) == 0) {
+ m <- NA
+ } else {
+ m <- mean(v)
+ }
+ as.numeric(m) # data.table require this; otherwise we get type errors
+}
+
+WriteDistOverview <- function(summary, output_dir) {
+ s <- data.table(summary) # data.table syntax is easier here
+
+ by_metric <- s[ , list(
+ params_file = unique(params_file),
+ map_file = unique(map_file),
+ days = length(date),
+ max_num_reports = MaybeMax(num_reports),
+
+ # summarize status
+ ok = sum(status == 'OK'),
+ fail = sum(status == 'FAIL'),
+ timeout = sum(status == 'TIMEOUT'),
+ skipped = sum(status == 'SKIPPED'),
+
+ # TODO: Need to document the meaning of these metrics.
+ # All could be NA
+ # KiB -> MB
+ #max_vm5_peak_mb = MaybeMax(vm5_peak_kib * 1024 / 1e6),
+ #mean_vm5_mean_mb = MaybeMean(vm5_mean_kib * 1024 / 1e6),
+
+ mean_secs = MaybeMean(seconds),
+ mean_allocated_mass = MaybeMean(allocated_mass)
+
+ # unique failure reasons
+ # This can be used when there are different call stacks.
+ #fail_reasons = length(unique(fail_reason[fail_reason != ""]))
+ ), by=metric]
+
+ # Case insensitive sort by metric name
+ by_metric <- by_metric[order(tolower(by_metric$metric)), ]
+
+ overview_path <- file.path(output_dir, 'overview.csv')
+ write.csv(by_metric, file = overview_path, row.names = FALSE)
+ Log("Wrote %s", overview_path)
+
+ by_metric
+}
+
+WriteDistMetricStatus <- function(summary, output_dir) {
+ # Write status.csv, num_reports.csv, and mass.csv for each metric.
+
+ s <- data.table(summary)
+
+ # loop over unique metrics, and write a CSV for each one
+ for (m in unique(s$metric)) {
+ # Select cols, and convert units. Don't need params / map / metric.
+ subframe <- s[s$metric == m,
+ list(job_id, date, status,
+ #vm5_peak_mb = vm5_peak_kib * 1024 / 1e6,
+ #vm5_mean_mb = vm5_mean_kib * 1024 / 1e6,
+ num_reports,
+ seconds,
+ allocated_mass, num_rappor)]
+
+ # Sort by descending date. Alphabetical sort works fine for YYYY-MM-DD.
+ subframe <- subframe[order(subframe$date, decreasing = TRUE), ]
+
+ out_path = file.path(output_dir, m, 'status.csv')
+ write.csv(subframe, file = out_path, row.names = FALSE)
+ Log("Wrote %s", out_path)
+ }
+
+ # This one is just for plotting with dygraphs. TODO: can dygraphs do
+ # something smarter? Maybe you need to select the column in JavaScript, and
+ # pass it an array, rather than CSV text.
+ for (m in unique(s$metric)) {
+ f1 <- s[s$metric == m, list(date, num_reports)]
+ path1 <- file.path(output_dir, m, 'num_reports.csv')
+ # NOTE: dygraphs (only in Firefox?) doesn't like the quotes around
+ # "2015-04-03". In general, we can't turn off quotes, because strings with
+ # double quotes will be invalid CSV files. But in this case, we only have
+ # date and number columns, so we can. dygraphs is mistaken here.
+ write.csv(f1, file = path1, row.names = FALSE, quote = FALSE)
+ Log("Wrote %s", path1)
+
+ # Write unallocated mass. TODO: Write the other 2 vars too?
+ f2 <- s[s$metric == m,
+ list(date,
+ unallocated_mass = 1.0 - allocated_mass)]
+
+ path2 <- file.path(output_dir, m, 'mass.csv')
+ write.csv(f2, file = path2, row.names = FALSE, quote = FALSE)
+ Log("Wrote %s", path2)
+ }
+}
+
+WritePlot <- function(p, outdir, filename, width = 800, height = 600) {
+ filename <- file.path(outdir, filename)
+ png(filename, width = width, height = height)
+ plot(p)
+ dev.off()
+ Log('Wrote %s', filename)
+}
+
+# Make sure the histogram has some valid input. If we don't do this, ggplot
+# blows up with an unintuitive error message.
+CheckHistogramInput <- function(v) {
+ if (all(is.na(v))) {
+ arg_name <- deparse(substitute(v)) # R idiom to get name
+ Log('FATAL: All values in %s are NA (no successful runs?)', arg_name)
+ quit(status = 1)
+ }
+}
+
+WriteDistHistograms <- function(s, output_dir) {
+ CheckHistogramInput(s$allocated_mass)
+
+ p <- qplot(s$allocated_mass, geom = "histogram")
+ t <- ggtitle("Allocated Mass by Task")
+ x <- xlab("allocated mass")
+ y <- ylab("number of tasks")
+ WritePlot(p + t + x + y, output_dir, 'allocated_mass.png')
+
+ CheckHistogramInput(s$num_rappor)
+
+ p <- qplot(s$num_rappor, geom = "histogram")
+ t <- ggtitle("Detected Strings by Task")
+ x <- xlab("detected strings")
+ y <- ylab("number of tasks")
+ WritePlot(p + t + x + y, output_dir, 'num_rappor.png')
+
+ CheckHistogramInput(s$num_reports)
+
+ p <- qplot(s$num_reports / 1e6, geom = "histogram")
+ t <- ggtitle("Raw Reports by Task")
+ x <- xlab("millions of reports")
+ y <- ylab("number of tasks")
+ WritePlot(p + t + x + y, output_dir, 'num_reports.png')
+
+ CheckHistogramInput(s$seconds)
+
+ p <- qplot(s$seconds, geom = "histogram")
+ t <- ggtitle("Analysis Duration by Task")
+ x <- xlab("seconds")
+ y <- ylab("number of tasks")
+ WritePlot(p + t + x + y, output_dir, 'seconds.png')
+
+ # NOTE: Skipping this for 'series' jobs.
+ if (sum(!is.na(s$vm5_peak_kib)) > 0) {
+ p <- qplot(s$vm5_peak_kib * 1024 / 1e6, geom = "histogram")
+ t <- ggtitle("Peak Memory Usage by Task")
+ x <- xlab("Peak megabytes (1e6 bytes) of memory")
+ y <- ylab("number of tasks")
+ WritePlot(p + t + x + y, output_dir, 'memory.png')
+ }
+}
+
+ProcessAllDist <- function(s, output_dir) {
+ Log('dist: Writing per-metric status.csv')
+ WriteDistMetricStatus(s, output_dir)
+
+ Log('dist: Writing histograms')
+ WriteDistHistograms(s, output_dir)
+
+ Log('dist: Writing aggregated overview.csv')
+ WriteDistOverview(s, output_dir)
+}
+
+# Write the single CSV file loaded by assoc-overview.html.
+WriteAssocOverview <- function(summary, output_dir) {
+ s <- data.table(summary) # data.table syntax is easier here
+
+ by_metric <- s[ , list(
+ #params_file = unique(params_file),
+ #map_file = unique(map_file),
+
+ days = length(date),
+ max_num_reports = MaybeMax(num_reports),
+
+ # summarize status
+ ok = sum(status == 'OK'),
+ fail = sum(status == 'FAIL'),
+ timeout = sum(status == 'TIMEOUT'),
+ skipped = sum(status == 'SKIPPED'),
+
+ mean_total_secs = MaybeMean(total_elapsed_seconds),
+ mean_em_secs = MaybeMean(em_elapsed_seconds)
+
+ ), by=list(metric)]
+
+ # Case insensitive sort by metric name
+ by_metric <- by_metric[order(tolower(by_metric$metric)), ]
+
+ overview_path <- file.path(output_dir, 'assoc-overview.csv')
+ write.csv(by_metric, file = overview_path, row.names = FALSE)
+ Log("Wrote %s", overview_path)
+
+ by_metric
+}
+
+# Write the CSV files loaded by assoc-metric.html -- that is, one
+# metric-status.csv for each metric name.
+WriteAssocMetricStatus <- function(summary, output_dir) {
+ s <- data.table(summary)
+ csv_list <- unique(s[, list(metric)])
+ for (i in 1:nrow(csv_list)) {
+ u <- csv_list[i, ]
+ # Select cols, and convert units. Don't need params / map / metric.
+ by_pair <- s[s$metric == u$metric,
+ list(days = length(date),
+ max_num_reports = MaybeMax(num_reports),
+
+ # summarize status
+ ok = sum(status == 'OK'),
+ fail = sum(status == 'FAIL'),
+ timeout = sum(status == 'TIMEOUT'),
+ skipped = sum(status == 'SKIPPED'),
+
+ mean_total_secs = MaybeMean(total_elapsed_seconds),
+ mean_em_secs = MaybeMean(em_elapsed_seconds)
+ ),
+ by=list(var1, var2)]
+
+ # Case insensitive sort by var1 name
+ by_pair <- by_pair[order(tolower(by_pair$var1)), ]
+
+ csv_path <- file.path(output_dir, u$metric, 'metric-status.csv')
+ write.csv(by_pair, file = csv_path, row.names = FALSE)
+ Log("Wrote %s", csv_path)
+ }
+}
+
+# This naming convention is in task_spec.py AssocTaskSpec.
+FormatAssocRelPath <- function(metric, var1, var2) {
+ v2 <- gsub('..', '_', var2, fixed = TRUE)
+ var_dir <- sprintf('%s_X_%s', var1, v2)
+ file.path(metric, var_dir)
+}
+
+# Write the CSV files loaded by assoc-pair.html -- that is, one pair-status.csv
+# for each (metric, var1, var2) pair.
+WriteAssocPairStatus <- function(summary, output_dir) {
+
+ s <- data.table(summary)
+
+ csv_list <- unique(s[, list(metric, var1, var2)])
+ Log('CSV list:')
+ print(csv_list)
+
+ # loop over unique metrics, and write a CSV for each one
+ for (i in 1:nrow(csv_list)) {
+ u <- csv_list[i, ]
+
+ # Select cols, and convert units. Don't need params / map / metric.
+ subframe <- s[s$metric == u$metric & s$var1 == u$var1 & s$var2 == u$var2,
+ list(job_id, date, status,
+ num_reports, d1, d2,
+ total_elapsed_seconds,
+ em_elapsed_seconds)]
+
+ # Sort by descending date. Alphabetical sort works fine for YYYY-MM-DD.
+ subframe <- subframe[order(subframe$date, decreasing = TRUE), ]
+
+ pair_rel_path <- FormatAssocRelPath(u$metric, u$var1, u$var2)
+
+ csv_path <- file.path(output_dir, pair_rel_path, 'pair-status.csv')
+ write.csv(subframe, file = csv_path, row.names = FALSE)
+ Log("Wrote %s", csv_path)
+
+ # Write a file with the raw variable names. Parsed by ui.sh, to pass to
+ # csv_to_html.py.
+ meta_path <- file.path(output_dir, pair_rel_path, 'pair-metadata.txt')
+
+ # NOTE: The conversion from data.table to character vector requires
+ # stringsAsFactors to work correctly!
+ lines <- as.character(u)
+ writeLines(lines, con = meta_path)
+ Log("Wrote %s", meta_path)
+ }
+}
+
+ProcessAllAssoc <- function(s, output_dir) {
+ Log('assoc: Writing pair-status.csv for each variable pair in each metric')
+ WriteAssocPairStatus(s, output_dir)
+
+ Log('assoc: Writing metric-status.csv for each metric')
+ WriteAssocMetricStatus(s, output_dir)
+
+ Log('assoc: Writing aggregated overview.csv')
+ WriteAssocOverview(s, output_dir)
+}
+
+main <- function(argv) {
+ # increase ggplot font size globally
+ theme_set(theme_grey(base_size = 16))
+
+ action = argv[[1]]
+ input = argv[[2]]
+ output_dir = argv[[3]]
+
+ if (action == 'dist') {
+ summary = read.csv(input)
+ ProcessAllDist(summary, output_dir)
+ } else if (action == 'assoc') {
+ summary = read.csv(input)
+ ProcessAllAssoc(summary, output_dir)
+ } else {
+ stop(sprintf('Invalid action %s', action))
+ }
+
+ Log('Done')
+}
+
+if (length(sys.frames()) == 0) {
+ main(commandArgs(TRUE))
+}
diff --git a/pipeline/regtest.sh b/pipeline/regtest.sh
new file mode 100755
index 0000000..a29a0f0
--- /dev/null
+++ b/pipeline/regtest.sh
@@ -0,0 +1,161 @@
+#!/bin/bash
+#
+# End-to-end tests for the dashboard.
+#
+# Usage:
+# ./regtest.sh <function name>
+#
+# NOTE: Must be run in this directory (rappor/pipeline).
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+# Create schema and params.
+create-metadata() {
+ mkdir -p _tmp/metadata
+ echo 'Hello from regtest.sh'
+
+ local params_path=_tmp/metadata/regtest_params.csv
+
+ # Relying on $RAPPOR_SRC/regtest.sh
+ cp --verbose ../_tmp/python/demo1/case_params.csv $params_path
+
+ # For now, use the same map everywhere.
+ cat >_tmp/metadata/dist-analysis.csv <<EOF
+var,map_filename
+unif,map.csv
+gauss,map.csv
+exp,map.csv
+m.domain,domain_map.csv
+EOF
+
+ # Both single dimensional and multi dimensional metrics.
+ cat >_tmp/metadata/rappor-vars.csv <<EOF
+metric,var,var_type,params
+m,domain,string,m_params
+m,flag..HTTPS,boolean,m_params
+unif,,string,regtest_params
+gauss,,string,regtest_params
+exp,,string,regtest_params
+EOF
+}
+
+# Create map files.
+create-maps() {
+ mkdir -p _tmp/maps
+ # Use the same map for everyone now?
+ local map_path=_tmp/maps/map.csv
+
+ # Relying on $RAPPOR_SRC/regtest.sh
+ cp --verbose ../_tmp/python/demo1/case_map.csv $map_path
+}
+
+# Simulate different metrics.
+create-counts() {
+ mkdir -p _tmp/counts
+
+ for date in 2015-12-01 2015-12-02 2015-12-03; do
+ mkdir -p _tmp/counts/$date
+
+ # TODO: Change params for each day.
+ cp --verbose \
+ ../_tmp/python/demo1/1/case_counts.csv _tmp/counts/$date/unif_counts.csv
+ cp --verbose \
+ ../_tmp/python/demo2/1/case_counts.csv _tmp/counts/$date/gauss_counts.csv
+ cp --verbose \
+ ../_tmp/python/demo3/1/case_counts.csv _tmp/counts/$date/exp_counts.csv
+ done
+}
+
+dist-task-spec() {
+ local job_dir=$1
+ ./task_spec.py dist \
+ --map-dir _tmp/maps \
+ --config-dir _tmp/metadata \
+ --output-base-dir $job_dir/raw \
+ --bad-report-out _tmp/bad_counts.csv \
+ "$@"
+}
+
+dist-job() {
+ local job_id=$1
+ local pat=$2
+
+ local job_dir=_tmp/$job_id
+ mkdir -p $job_dir/raw
+
+ local spec_list=$job_dir/spec-list.txt
+
+ find _tmp/counts/$pat -name \*_counts.csv \
+ | dist-task-spec $job_dir \
+ | tee $spec_list
+
+ ./dist.sh decode-dist-many $job_dir $spec_list
+ ./dist.sh combine-and-render-html _tmp $job_dir
+}
+
+dist() {
+ create-metadata
+ create-maps
+ create-counts
+
+ dist-job smoke1 '2015-12-01' # one day
+ dist-job smoke2 '2015-12-0[23]' # two days
+}
+
+# Simulate different metrics.
+create-reports() {
+ mkdir -p _tmp/reports
+
+ for date in 2015-12-01 2015-12-02 2015-12-03; do
+ mkdir -p _tmp/reports/$date
+
+ # TODO: Change params for each day.
+ cp --verbose \
+ ../bin/_tmp/reports.csv _tmp/reports/$date/m_reports.csv
+ done
+}
+
+assoc-task-spec() {
+ local job_dir=$1
+ ./task_spec.py assoc \
+ --map-dir _tmp/maps \
+ --config-dir _tmp/metadata \
+ --output-base-dir $job_dir/raw \
+ "$@"
+}
+
+assoc-job() {
+ local job_id=$1
+ local pat=$2
+
+ local job_dir=_tmp/$job_id
+ mkdir -p $job_dir/raw $job_dir/config
+
+ local spec_list=$job_dir/spec-list.txt
+
+ find _tmp/reports/$pat -name \*_reports.csv \
+ | assoc-task-spec $job_dir \
+ | tee $spec_list
+
+ # decode-many calls decode_assoc.R, which expects this schema in the 'config'
+ # dir now. TODO: adjust this.
+ cp --verbose _tmp/metadata/rappor-vars.csv $job_dir/config
+ cp --verbose ../bin/_tmp/m_params.csv $job_dir/config
+
+ ./assoc.sh decode-many $job_dir $spec_list
+ ./assoc.sh combine-and-render-html _tmp $job_dir
+}
+
+# Copy some from bin/test.sh? The input _reports.csv files should be taken
+# from there.
+assoc() {
+ create-reports
+ cp --verbose ../bin/_tmp/domain_map.csv _tmp/maps
+
+ assoc-job smoke1-assoc '2015-12-01' # one day
+ assoc-job smoke2-assoc '2015-12-0[23]' # two days
+}
+
+"$@"
diff --git a/pipeline/task_spec.py b/pipeline/task_spec.py
new file mode 100755
index 0000000..c46bb35
--- /dev/null
+++ b/pipeline/task_spec.py
@@ -0,0 +1,364 @@
+#!/usr/bin/python
+"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.
+
+Each line represents a task, or R process invocation. The params on each line
+are passed to ./dist.sh decode-many or ./assoc.sh decode-many.
+"""
+
+import collections
+import csv
+import errno
+import optparse
+import os
+import pprint
+import re
+import sys
+
+import util
+
+
+def _ReadDistMaps(f):
+ dist_maps = {}
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['var', 'map_filename']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s' % expected)
+ continue # skip header
+
+ var_name, map_filename = row
+ dist_maps[var_name] = map_filename
+ return dist_maps
+
+
+class DistMapLookup(object):
+ """Create a dictionary of var -> map to analyze against.
+
+ TODO: Support a LIST of maps. Users should be able to specify more than one.
+ """
+ def __init__(self, f, map_dir):
+ self.dist_maps = _ReadDistMaps(f)
+ self.map_dir = map_dir
+
+ def GetMapPath(self, var_name):
+ filename = self.dist_maps[var_name]
+ return os.path.join(self.map_dir, filename)
+
+
+def CreateFieldIdLookup(f):
+ """Create a dictionary that specifies single variable analysis each var.
+
+ Args:
+ config_dir: directory of metadata, output by update_rappor.par
+
+ Returns:
+ A dictionary from field ID -> full field name
+
+ NOTE: Right now we're only doing single variable analysis for strings, so we
+ don't have the "type".
+ """
+ field_id_lookup = {}
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['metric', 'field', 'field_type', 'params', 'field_id']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s' % expected)
+ continue
+
+ metric, field, field_type, _, field_id = row
+
+ if field_type != 'string':
+ continue
+
+ # Paper over the difference between plain metrics (single variable) and
+ # metrics with fields (multiple variables, for association analysis).
+ if field:
+ full_field_name = '%s.%s' % (metric, field)
+ else:
+ full_field_name = metric
+
+ field_id_lookup[field_id] = full_field_name
+ return field_id_lookup
+
+
+def _ReadVarSchema(f):
+ """Given the rappor-vars.csv file, return a list of metric/var/type."""
+ # metric -> list of (variable name, type)
+ assoc_metrics = collections.defaultdict(list)
+ params_lookup = {}
+
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ expected = ['metric', 'var', 'var_type', 'params']
+ if row != expected:
+ raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
+ continue
+
+ metric, var, var_type, params = row
+ if var == '':
+ full_var_name = metric
+ else:
+ full_var_name = '%s.%s' % (metric, var)
+ # Also group multi-dimensional reports
+ assoc_metrics[metric].append((var, var_type))
+
+ params_lookup[full_var_name] = params
+
+ return assoc_metrics, params_lookup
+
+
+class VarSchema(object):
+ """Object representing rappor-vars.csv.
+
+ Right now we use it for slightly different purposes for dist and assoc
+ analysis.
+ """
+ def __init__(self, f, params_dir):
+ self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
+ self.params_dir = params_dir
+
+ def GetParamsPath(self, var_name):
+ filename = self.params_lookup[var_name]
+ return os.path.join(self.params_dir, filename + '.csv')
+
+ def GetAssocMetrics(self):
+ return self.assoc_metrics
+
+
+def CountReports(f):
+ num_reports = 0
+ for line in f:
+ first_col = line.split(',')[0]
+ num_reports += int(first_col)
+ return num_reports
+
+
+DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')
+
+
+def DistInputIter(stdin):
+ """Read lines from stdin and extract fields to construct analysis tasks."""
+ for line in stdin:
+ m = DIST_INPUT_PATH_RE.match(line)
+ if not m:
+ raise RuntimeError('Invalid path %r' % line)
+
+ counts_path = line.strip()
+ date, field_id = m.groups()
+
+ yield counts_path, date, field_id
+
+
+def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
+ """Print task spec for single variable RAPPOR to stdout."""
+
+ num_bad = 0
+ unique_ids = set()
+
+ for counts_path, date, field_id in input_iter:
+ unique_ids.add(field_id)
+
+ # num_reports is used for filtering
+ with open(counts_path) as f:
+ num_reports = CountReports(f)
+
+ # Look up field name from field ID
+ if field_id_lookup:
+ field_name = field_id_lookup.get(field_id)
+ if field_name is None:
+ # The metric id is the md5 hash of the name. We can miss some, e.g. due
+ # to debug builds.
+ if bad_c:
+ bad_c.writerow((date, field_id, num_reports))
+ num_bad += 1
+ continue
+ else:
+ field_name = field_id
+
+ # NOTE: We could remove the params from the spec if decode_dist.R took the
+ # --schema flag. The var type is there too.
+ params_path = var_schema.GetParamsPath(field_name)
+ map_path= dist_maps.GetMapPath(field_name)
+
+ yield num_reports, field_name, date, counts_path, params_path, map_path
+
+ util.log('%d unique field IDs', len(unique_ids))
+ if num_bad:
+ util.log('Failed field ID -> field name lookup on %d files '
+ '(check --field-ids file)', num_bad)
+
+
+ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')
+
+
+def AssocInputIter(stdin):
+ """Read lines from stdin and extract fields to construct analysis tasks."""
+ for line in stdin:
+ m = ASSOC_INPUT_PATH_RE.match(line)
+ if not m:
+ raise RuntimeError('Invalid path %r' % line)
+
+ reports_path = line.strip()
+ date, metric_name = m.groups()
+
+ yield reports_path, date, metric_name
+
+
+def CreateAssocVarPairs(rappor_metrics):
+ """Yield a list of pairs of variables that should be associated.
+
+ For now just do all (string x boolean) analysis.
+ """
+ var_pairs = collections.defaultdict(list)
+
+ for metric, var_list in rappor_metrics.iteritems():
+ string_vars = []
+ boolean_vars = []
+
+ # Separate variables into strings and booleans
+ for var_name, var_type in var_list:
+ if var_type == 'string':
+ string_vars.append(var_name)
+ elif var_type == 'boolean':
+ boolean_vars.append(var_name)
+ else:
+ util.log('Unknown type variable type %r', var_type)
+
+ for s in string_vars:
+ for b in boolean_vars:
+ var_pairs[metric].append((s, b))
+ return var_pairs
+
+
+# For debugging
+def PrintAssocVarPairs(var_pairs):
+ for metric, var_list in var_pairs.iteritems():
+ print metric
+ for var_name, var_type in var_list:
+ print '\t', var_name, var_type
+
+
+def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
+ """Print the task spec for multiple variable RAPPOR to stdout."""
+ # Flow:
+ #
+ # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
+ #
+ # Short term: update_rappor.py should print every combination of string vs.
+ # bool? Or I guess we have it in rappor-vars.csv
+
+ for reports_path, date, metric_name in input_iter:
+ pairs = var_pairs[metric_name]
+ for var1, var2 in pairs:
+ # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps?
+ field1_name = '%s.%s' % (metric_name, var1)
+ map1_path = dist_maps.GetMapPath(field1_name)
+
+ # e.g. domain_X_flags__DID_PROCEED
+ # Don't use .. in filenames since it could be confusing.
+ pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
+ output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)
+
+ yield metric_name, date, reports_path, var1, var2, map1_path, output_dir
+
+
+def CreateOptionsParser():
+ p = optparse.OptionParser()
+
+ p.add_option(
+ '--bad-report-out', dest='bad_report', metavar='PATH', type='str',
+ default='',
+ help='Optionally write a report of input filenames with invalid field '
+ 'IDs to this file.')
+ p.add_option(
+ '--config-dir', dest='config_dir', metavar='PATH', type='str',
+ default='',
+ help='Directory with metadata schema and params files to read.')
+ p.add_option(
+ '--map-dir', dest='map_dir', metavar='PATH', type='str',
+ default='',
+ help='Directory with map files to read.')
+ p.add_option(
+ '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
+ default='',
+ help='Root of the directory tree where analysis output will be placed.')
+ p.add_option(
+ '--field-ids', dest='field_ids', metavar='PATH', type='str',
+ default='',
+ help='Optional CSV file with field IDs (generally should not be used).')
+
+ return p
+
+
+def main(argv):
+ (opts, argv) = CreateOptionsParser().parse_args(argv)
+
+ if opts.bad_report:
+ bad_f = open(opts.bad_report, 'w')
+ bad_c = csv.writer(bad_f)
+ else:
+ bad_c = None
+
+ action = argv[1]
+
+ if not opts.config_dir:
+ raise RuntimeError('--config-dir is required')
+ if not opts.map_dir:
+ raise RuntimeError('--map-dir is required')
+ if not opts.output_base_dir:
+ raise RuntimeError('--output-base-dir is required')
+
+ # This is shared between the two specs.
+ path = os.path.join(opts.config_dir, 'dist-analysis.csv')
+ with open(path) as f:
+ dist_maps = DistMapLookup(f, opts.map_dir)
+
+ path = os.path.join(opts.config_dir, 'rappor-vars.csv')
+ with open(path) as f:
+ var_schema = VarSchema(f, opts.config_dir)
+
+ if action == 'dist':
+ if opts.field_ids:
+ with open(opts.field_ids) as f:
+ field_id_lookup = CreateFieldIdLookup(f)
+ else:
+ field_id_lookup = {}
+
+ input_iter = DistInputIter(sys.stdin)
+ for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
+ bad_c):
+ # The spec is a series of space-separated tokens.
+ tokens = row + (opts.output_base_dir,)
+ print ' '.join(str(t) for t in tokens)
+
+ elif action == 'assoc':
+ # Parse input
+ input_iter = AssocInputIter(sys.stdin)
+
+ # Create M x N association tasks
+ var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())
+
+ # Now add the other constant stuff
+ for row in AssocTaskSpec(
+ input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):
+
+ num_reports = 0 # placeholder, not filtering yet
+ tokens = (num_reports,) + row
+ print ' '.join(str(t) for t in tokens)
+
+ else:
+ raise RuntimeError('Invalid action %r' % action)
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except IOError, e:
+ if e.errno != errno.EPIPE: # ignore broken pipe
+ raise
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)
diff --git a/pipeline/task_spec_test.py b/pipeline/task_spec_test.py
new file mode 100755
index 0000000..94cbac8
--- /dev/null
+++ b/pipeline/task_spec_test.py
@@ -0,0 +1,61 @@
+#!/usr/bin/python -S
+"""
+task_spec_test.py: Tests for task_spec.py
+"""
+
+import cStringIO
+import unittest
+
+import task_spec # module under test
+
+
+class TaskSpecTest(unittest.TestCase):
+
+ def testCountReports(self):
+ f = cStringIO.StringIO("""\
+1,2
+3,4
+5,6
+""")
+ c = task_spec.CountReports(f)
+ self.assertEqual(9, c)
+
+ def testDist(self):
+ # NOTE: These files are opened, in order to count the reports. Maybe skip
+ # that step.
+ f = cStringIO.StringIO("""\
+_tmp/counts/2015-12-01/exp_counts.csv
+_tmp/counts/2015-12-01/gauss_counts.csv
+_tmp/counts/2015-12-02/exp_counts.csv
+_tmp/counts/2015-12-02/gauss_counts.csv
+""")
+ input_iter = task_spec.DistInputIter(f)
+ #for row in input_iter:
+ # print row
+
+ field_id_lookup = {}
+
+ # var name -> map filename
+ f = cStringIO.StringIO("""\
+var,map_filename
+exp,map.csv
+unif,map.csv
+gauss,map.csv
+""")
+ dist_maps = task_spec.DistMapLookup(f, '_tmp/maps')
+
+ f2 = cStringIO.StringIO("""\
+metric,var,var_type,params
+exp,,string,params
+unif,,string,params
+gauss,,string,params
+""")
+ var_schema = task_spec.VarSchema(f2, '_tmp/config')
+
+ for row in task_spec.DistTaskSpec(
+ input_iter, field_id_lookup, var_schema, dist_maps, None):
+ print row
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pipeline/tools-lib.sh b/pipeline/tools-lib.sh
new file mode 100755
index 0000000..c7b3b24
--- /dev/null
+++ b/pipeline/tools-lib.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Library used to refer to open source tools.
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+# NOTE: RAPPOR_SRC defined by the module that sources (cook.sh or ui.sh)
+
+# Caller can override shebang line by setting $DEP_PYTHON.
+readonly PYTHON=${DEP_PYTHON:-}
+
+readonly METRIC_STATUS=${DEP_METRIC_STATUS:-}
+
+
+# These 3 used by cook.sh.
+
+TOOLS-combine-status() {
+ if test -n "$PYTHON"; then
+ $PYTHON $RAPPOR_SRC/pipeline/combine_status.py "$@"
+ else
+ $RAPPOR_SRC/pipeline/combine_status.py "$@"
+ fi
+}
+
+TOOLS-combine-results() {
+ if test -n "$PYTHON"; then
+ $PYTHON $RAPPOR_SRC/pipeline/combine_results.py "$@"
+ else
+ $RAPPOR_SRC/pipeline/combine_results.py "$@"
+ fi
+}
+
+TOOLS-metric-status() {
+ if test -n "$METRIC_STATUS"; then
+ $METRIC_STATUS "$@"
+ else
+ $RAPPOR_SRC/pipeline/metric_status.R "$@"
+ fi
+}
+
+# Used by ui.sh.
+
+TOOLS-csv-to-html() {
+ if test -n "$PYTHON"; then
+ $PYTHON $RAPPOR_SRC/pipeline/csv_to_html.py "$@"
+ else
+ $RAPPOR_SRC/pipeline/csv_to_html.py "$@"
+ fi
+}
+
+#
+# Higher level scripts
+#
+
+TOOLS-cook() {
+ $RAPPOR_SRC/pipeline/cook.sh "$@"
+}
+
+# TODO: Rename gen-ui.sh.
+TOOLS-gen-ui() {
+ $RAPPOR_SRC/pipeline/ui.sh "$@"
+}
diff --git a/pipeline/ui.sh b/pipeline/ui.sh
new file mode 100755
index 0000000..8fbcde0
--- /dev/null
+++ b/pipeline/ui.sh
@@ -0,0 +1,322 @@
+#!/bin/bash
+#
+# Build the user interface.
+#
+# Usage:
+# ./ui.sh <function name>
+
+set -o nounset
+set -o pipefail
+set -o errexit
+
+readonly THIS_DIR=$(dirname $0)
+readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd)
+
+source $RAPPOR_SRC/pipeline/tools-lib.sh
+
+# Change the default location of this file by setting DEP_DYGRAPHS_JS
+readonly DYGRAPHS_JS=${DEP_DYGRAPHS_JS:-$RAPPOR_SRC/third_party/dygraph-combined.js}
+
+_link() {
+ ln --verbose -s -f "$@"
+}
+
+_copy() {
+ cp --verbose -f "$@"
+}
+
+download-dygraphs() {
+ local out=third_party
+ wget --directory $out \
+ http://dygraphs.com/1.1.1/dygraph-combined.js
+}
+
+import-table() {
+ local src=~/git/scratch/ajax/
+ cp --verbose $src/table-sort.{js,css} $src/url-hash.js ui
+ pushd ui
+ # TODO: Could minify it here
+ cat table-sort.js url-hash.js > table-lib.js
+ popd
+}
+
+# Use symlinks so we can edit and reload during development.
+symlink-static() {
+ local kind=$1
+ local job_dir=$2
+
+ local base=$RAPPOR_SRC/ui
+
+ # HTML goes at the top level.
+ if test "$kind" = dist; then
+ _link \
+ $base/overview.html $base/histograms.html $base/metric.html $base/day.html \
+ $job_dir
+ elif test "$kind" = assoc; then
+ _link \
+ $base/assoc-overview.html $base/assoc-metric.html $base/assoc-pair.html \
+ $base/assoc-day.html \
+ $job_dir
+ else
+ log "Invalid kind $kind"
+ exit 1
+ fi
+
+ mkdir --verbose -p $job_dir/static
+
+ # Static subdir.
+ _link \
+ $base/ui.css $base/ui.js \
+ $base/table-sort.css $base/table-lib.js \
+ $DYGRAPHS_JS \
+ $job_dir/static
+}
+
+
+# Write HTML fragment based on overview.csv.
+overview-part-html() {
+ local job_dir=${1:-_tmp/results-10}
+ local out=$job_dir/cooked/overview.part.html
+ # Sort by descending date!
+ TOOLS-csv-to-html \
+ --col-format 'metric <a href="metric.html#metric={metric}">{metric}</a>' \
+ < $job_dir/cooked/overview.csv \
+ > $out
+ echo "Wrote $out"
+}
+
+metric-part-html() {
+ local job_dir=${1:-_tmp/results-10}
+ # Testing it out. This should probably be a different dir.
+
+ for entry in $job_dir/cooked/*; do
+ # Only do it for dirs
+ if ! test -d $entry; then
+ continue
+ fi
+ # Now it's a metric dir
+ echo $entry
+
+ local metric_name=$(basename $entry)
+
+ # Convert status.csv to status.part.html (a fragment)
+
+ # NOTE: counts path could be useful. You need the input tree though. Hash
+ # it? Or point to the git link.
+
+ # Link to raw CSV
+ #--col-format 'date <a href="../../raw/{metric}/{date}/results.csv">{date}</a>' \
+
+ # TODO: Link to ui/results_viewer.html#{metric}_{date}
+ # And that needs some JavaScript to load the correct fragment.
+ # I guess you could do the same with metric.html. Currently it uses a
+ # symlink.
+
+ # Before job ID:
+ # --col-format 'date <a href="{date}.html">{date}</a>' \
+ # --col-format 'status <a href="../../raw/{metric}/{date}/log.txt">{status}</a>' \
+
+ local fmt1='date <a href="day.html#jobId={job_id}&metric={metric}&date={date}">{date}</a>'
+ local fmt2='status <a href="../{job_id}/raw/{metric}/{date}/log.txt">{status}</a>'
+
+ TOOLS-csv-to-html \
+ --def "metric $metric_name" \
+ --col-format "$fmt1" \
+ --col-format "$fmt2" \
+ < $entry/status.csv \
+ > $entry/status.part.html
+ done
+}
+
+results-html-one() {
+ local csv_in=$1
+ echo "$csv_in -> HTML"
+
+ # .../raw/Settings.HomePage2/2015-03-01/results.csv ->
+ # .../cooked/Settings.HomePage2/2015-03-01.part.html
+ # (This saves some directories)
+ local html_out=$(echo $csv_in | sed -e 's|/raw/|/cooked/|; s|/results.csv|.part.html|')
+
+ TOOLS-csv-to-html < $csv_in > $html_out
+}
+
+results-html() {
+ local job_dir=${1:-_tmp/results-10}
+
+ find $job_dir -name results.csv \
+ | xargs -n 1 --verbose --no-run-if-empty -- $0 results-html-one
+}
+
+# Build parts of the HTML
+build-html1() {
+ local job_dir=${1:-_tmp/results-10}
+
+ symlink-static dist $job_dir
+
+ # writes overview.part.html, which is loaded by overview.html
+ overview-part-html $job_dir
+
+ # Writes status.part.html for each metric
+ metric-part-html $job_dir
+}
+
+#
+# Association Analysis
+#
+
+readonly ASSOC_TEST_JOB_DIR=~/rappor/chrome-assoc-smoke/smoke5-assoc
+
+# Write HTML fragment based on CSV.
+assoc-overview-part-html() {
+ local job_dir=${1:-$ASSOC_TEST_JOB_DIR}
+ local html_path=$job_dir/cooked/assoc-overview.part.html
+
+ # Sort by descending date!
+
+ TOOLS-csv-to-html \
+ --col-format 'metric <a href="assoc-metric.html#metric={metric}">{metric}</a>' \
+ < $job_dir/cooked/assoc-overview.csv \
+ > $html_path
+ echo "Wrote $html_path"
+}
+
+assoc-metric-part-html-one() {
+ local csv_path=$1
+ local html_path=$(echo $csv_path | sed 's/.csv$/.part.html/')
+
+ local metric_dir=$(dirname $csv_path)
+ local metric_name=$(basename $metric_dir) # e.g. interstitial.harmful
+
+ local fmt='days <a href="assoc-pair.html#metric={metric}&var1={var1}&var2={var2}">{days}</a>'
+
+ TOOLS-csv-to-html \
+ --def "metric $metric_name" \
+ --col-format "$fmt" \
+ < $csv_path \
+ > $html_path
+
+ echo "Wrote $html_path"
+}
+
+assoc-metric-part-html() {
+ local job_dir=${1:-$ASSOC_TEST_JOB_DIR}
+ # Testing it out. This should probably be a different dir.
+
+ find $job_dir/cooked -name metric-status.csv \
+ | xargs -n 1 --verbose --no-run-if-empty -- $0 assoc-metric-part-html-one
+}
+
+# TODO:
+# - Construct link in JavaScript instead? It has more information. The
+# pair-metadata.txt file is a hack.
+
+assoc-pair-part-html-one() {
+ local csv_path=$1
+ local html_path=$(echo $csv_path | sed 's/.csv$/.part.html/')
+
+ local pair_dir_path=$(dirname $csv_path)
+ local pair_dir_name=$(basename $pair_dir_path) # e.g. domain_X_flags_IS_REPEAT_VISIT
+
+ # This file is generated by metric_status.R for each pair of variables.
+ local metadata="$pair_dir_path/pair-metadata.txt"
+ # Read one variable per line.
+ { read metric_name; read var1; read var2; } < $metadata
+
+ local fmt1='date <a href="assoc-day.html#jobId={job_id}&metric={metric}&var1={var1}&var2={var2}&date={date}">{date}</a>'
+ local fmt2="status <a href=\"../{job_id}/raw/{metric}/$pair_dir_name/{date}/assoc-log.txt\">{status}</a>"
+
+ TOOLS-csv-to-html \
+ --def "metric $metric_name" \
+ --def "var1 $var1" \
+ --def "var2 $var2" \
+ --col-format "$fmt1" \
+ --col-format "$fmt2" \
+ < $csv_path \
+ > $html_path
+}
+
+assoc-pair-part-html() {
+ local job_dir=${1:-~/rappor/chrome-assoc-smoke/smoke3}
+ # Testing it out. This should probably be a different dir.
+
+ find $job_dir/cooked -name pair-status.csv \
+ | xargs -n 1 --verbose -- $0 assoc-pair-part-html-one
+
+ return
+
+ # OLD STUFF
+ for entry in $job_dir/cooked/*; do
+ # Only do it for dirs
+ if ! test -d $entry; then
+ continue
+ fi
+ # Now it's a metric dir
+ echo $entry
+
+ local metric_name=$(basename $entry)
+
+ # Convert status.csv to status.part.html (a fragment)
+
+ # NOTE: counts path could be useful. You need the input tree though. Hash
+ # it? Or point to the git link.
+
+ # Link to raw CSV
+ #--col-format 'date <a href="../../raw/{metric}/{date}/results.csv">{date}</a>' \
+
+ # TODO: Link to ui/results_viewer.html#{metric}_{date}
+ # And that needs some JavaScript to load the correct fragment.
+ # I guess you could do the same with metric.html. Currently it uses a
+ # symlink.
+
+ # Before job ID:
+ # --col-format 'date <a href="{date}.html">{date}</a>' \
+ # --col-format 'status <a href="../../raw/{metric}/{date}/log.txt">{status}</a>' \
+
+ local fmt1='date <a href="day.html#jobId={job_id}&metric={metric}&date={date}">{date}</a>'
+ local fmt2='status <a href="../{job_id}/raw/{metric}/{date}/log.txt">{status}</a>'
+
+ TOOLS-csv-to-html \
+ --def "metric $metric_name" \
+ --col-format "$fmt1" \
+ --col-format "$fmt2" \
+ < $entry/status.csv \
+ > $entry/status.part.html
+ done
+}
+
+assoc-day-part-html-one() {
+ local csv_in=$1
+ echo "$csv_in -> HTML"
+
+ # .../raw/interstitial.harmful/a_X_b/2015-03-01/assoc-results.csv ->
+ # .../cooked/interstitial.harmful/a_X_b/2015-03-01.part.html
+ # (This saves some directories)
+ local html_out=$(echo $csv_in | sed -e 's|/raw/|/cooked/|; s|/assoc-results.csv|.part.html|')
+
+ TOOLS-csv-to-html --as-percent proportion < $csv_in > $html_out
+}
+
+assoc-day-part-html() {
+ local job_dir=${1:-_tmp/results-10}
+
+ find $job_dir -name assoc-results.csv \
+ | xargs -n 1 --verbose --no-run-if-empty -- $0 assoc-day-part-html-one
+}
+
+lint-html() {
+ set -o xtrace
+ set +o errexit # don't fail fast
+ tidy -errors -quiet ui/metric.html
+ tidy -errors -quiet ui/overview.html
+ tidy -errors -quiet ui/histograms.html
+}
+
+# Directory we should serve from
+readonly WWW_DIR=_tmp
+
+serve() {
+ local port=${1:-7999}
+ cd $WWW_DIR && python -m SimpleHTTPServer $port
+}
+
+"$@"
diff --git a/pipeline/util.py b/pipeline/util.py
new file mode 100755
index 0000000..c517483
--- /dev/null
+++ b/pipeline/util.py
@@ -0,0 +1,9 @@
+"""Common functions."""
+
+import sys
+
+
+def log(msg, *args):
+ if args:
+ msg = msg % args
+ print >>sys.stderr, msg