diff options
Diffstat (limited to 'pipeline')
-rw-r--r-- | pipeline/README.md | 52 | ||||
-rwxr-xr-x | pipeline/alarm-lib.sh | 124 | ||||
-rwxr-xr-x | pipeline/assoc.sh | 152 | ||||
-rwxr-xr-x | pipeline/combine_results.py | 138 | ||||
-rwxr-xr-x | pipeline/combine_results_test.py | 38 | ||||
-rwxr-xr-x | pipeline/combine_status.py | 298 | ||||
-rwxr-xr-x | pipeline/combine_status_test.py | 38 | ||||
-rwxr-xr-x | pipeline/cook.sh | 147 | ||||
-rwxr-xr-x | pipeline/csv-to-html-test.sh | 63 | ||||
-rwxr-xr-x | pipeline/csv_to_html.py | 218 | ||||
-rwxr-xr-x | pipeline/csv_to_html_test.py | 24 | ||||
-rwxr-xr-x | pipeline/dist.sh | 135 | ||||
-rwxr-xr-x | pipeline/metric_status.R | 343 | ||||
-rwxr-xr-x | pipeline/regtest.sh | 161 | ||||
-rwxr-xr-x | pipeline/task_spec.py | 364 | ||||
-rwxr-xr-x | pipeline/task_spec_test.py | 61 | ||||
-rwxr-xr-x | pipeline/tools-lib.sh | 64 | ||||
-rwxr-xr-x | pipeline/ui.sh | 322 | ||||
-rwxr-xr-x | pipeline/util.py | 9 |
19 files changed, 2751 insertions, 0 deletions
diff --git a/pipeline/README.md b/pipeline/README.md new file mode 100644 index 0000000..052ea9d --- /dev/null +++ b/pipeline/README.md @@ -0,0 +1,52 @@ +pipeline +======== + +This directory contains tools and scripts for running a cron job that does +RAPPOR analysis and generates an HTML dashboard. + +It works like this: + +1. `task_spec.py` generates a text file where each line corresponds to a process + to be run (a "task"). The process is `bin/decode-dist` or + `bin/decode-assoc`. The line contains the task parameters. + +2. `xargs -P` is used to run processes in parallel. Our analysis is generally + single-threaded (i.e. because R is single-threaded), so this helps utilize + the machine fully. Each task places its output in a different subdirectory. + +3. `cook.sh` calls `combine_results.py` to combine analysis results into a time + series. It also calls `combine_status.py` to keep track of task data for + "meta-analysis". `metric_status.R` generates more summary CSV files. + +4. `ui.sh` calls `csv_to_html.py` to generate an HTML fragments from the CSV + files. + +5. The JavaScript in `ui/ui.js` is loaded from static HTML, and makes AJAX calls + to retrieve the HTML fragments. The page is made interactive with + `ui/table-lib.js`. + +`dist.sh` and `assoc.sh` contain functions which coordinate this process. + +`alarm-lib.sh` is used to kill processes that have been running for too long. + +Testing +------- + +`pipeline/regtest.sh` contains end-to-end demos of this process. Right now it +depends on testdata from elsewhere in the tree: + + + rappor$ ./demo.sh run # prepare dist testdata + rappor$ cd bin + + bin$ ./test.sh write-assoc-testdata # prepare assoc testdata + bin$ cd ../pipeline + + pipeline$ ./regtest.sh dist + pipeline$ ./regtest.sh assoc + + pipeline$ python -m SimpleHTTPServer # start a static web server + + http://localhost:8000/_tmp/ + + diff --git a/pipeline/alarm-lib.sh b/pipeline/alarm-lib.sh new file mode 100755 index 0000000..90495ce --- /dev/null +++ b/pipeline/alarm-lib.sh @@ -0,0 +1,124 @@ +#!/bin/bash +# +# Alarm tool. +# +# Usage: +# ./alarm.sh <function name> + +# You can source this file and use the alarm-status function. + +set -o nounset +set -o pipefail +set -o errexit + +# Run a command with a timeout, and print its status to a directory. +# +# Usage: +# alarm-status job_dir/STATUS 10 \ +# flaky_command ... + +alarm-status() { + set +o errexit + local status_file=$1 + shift # everything except the status file goes to perl + + # NOTE: It would be nice to setpgrp() before exec? And then can the signal + # be delivered to the entire group, like kill -SIGALRM -PID? + + # NOTE: If we did this in Python, the error message would also be clearer. + perl -e 'alarm shift; exec @ARGV or die "ERROR: after exec @ARGV"' "$@" + local exit_code=$? + + set -o errexit + + local result='' + case $exit_code in + 0) + # Would be nice to show elapsed time? + result='OK' + ;; + 9) + # decode_assoc.R will exit 9 if there are no reports AFTER + # --remove-bad-rows. A task can also be marked SKIPPED before running + # the child process (see backfill.sh). + result='SKIPPED by child process' + ;; + # exit code 142 means SIGALARM. 128 + 14 = 142. See 'kill -l'. + 142) + local seconds=$1 + result="TIMEOUT after $seconds seconds" + ;; + *) + result="FAIL with status $exit_code" + ;; + esac + echo "$result" + echo "$result" > $status_file +} + +_work() { + local n=10 # 2 seconds + for i in $(seq $n); do + echo $i - "$@" + sleep 0.2 + done +} + +_succeed() { + _work "$@" + exit 0 +} + +_fail() { + _work "$@" + exit 1 +} + +_skip() { + exit 9 +} + +# http://perldoc.perl.org/functions/alarm.html +# +# Delivers alarm. But how to get the process to have a distinct exit code? + +demo() { + mkdir -p _tmp + + # timeout + alarm-status _tmp/A 1 $0 _succeed foo + echo + + # ok + alarm-status _tmp/B 3 $0 _succeed bar + echo + + # fail + alarm-status _tmp/C 3 $0 _fail baz + echo + + # skip + alarm-status _tmp/D 3 $0 _skip baz + echo + + head _tmp/{A,B,C,D} +} + +test-simple() { + alarm-status _tmp/status.txt 1 sleep 2 +} + +test-bad-command() { + alarm-status _tmp/status.txt 1 nonexistent_sleep 2 +} + +# BUG +test-perl() { + set +o errexit + perl -e 'alarm shift; exec @ARGV or die "ERROR after exec @ARGV"' 1 _sleep 2 + echo $? +} + +if test $(basename $0) = 'alarm-lib.sh'; then + "$@" +fi diff --git a/pipeline/assoc.sh b/pipeline/assoc.sh new file mode 100755 index 0000000..2c6a54d --- /dev/null +++ b/pipeline/assoc.sh @@ -0,0 +1,152 @@ +#!/bin/bash +# +# Usage: +# ./assoc.sh <function name> + +set -o nounset +set -o pipefail +set -o errexit + +readonly THIS_DIR=$(dirname $0) +readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd) + +source $RAPPOR_SRC/util.sh # log, banner +source $RAPPOR_SRC/pipeline/tools-lib.sh +source $RAPPOR_SRC/pipeline/alarm-lib.sh + +# Change the default location of these tools by setting DEP_* +readonly DECODE_ASSOC=${DEP_DECODE_ASSOC:-$RAPPOR_SRC/bin/decode-assoc} +readonly FAST_EM=${DEP_FAST_EM:-$RAPPOR_SRC/analysis/cpp/_tmp/fast_em} + +# Run a single decode-assoc process, to analyze one variable pair for one +# metric. The arguments to this function are one row of the task spec. +decode-one() { + # Job constants, from decode-many + local rappor_src=$1 + local timeout_secs=$2 + local min_reports=$3 + local job_dir=$4 + local sample_size=$5 + + # Task spec variables, from task_spec.py + local num_reports=$6 + local metric_name=$7 + local date=$8 # for output naming only + local reports=$9 # file with reports + local var1=${10} + local var2=${11} + local map1=${12} + local output_dir=${13} + + local log_file=$output_dir/assoc-log.txt + local status_file=$output_dir/assoc-status.txt + mkdir --verbose -p $output_dir + + # Flags drived from job constants + local schema=$job_dir/config/rappor-vars.csv + local params_dir=$job_dir/config + local em_executable=$FAST_EM + + # TODO: + # - Skip jobs with few reports, like ./backfill.sh analyze-one. + + # Output the spec for combine_status.py. + echo "$@" > $output_dir/assoc-spec.txt + + # NOTE: Not passing --num-cores since we're parallelizing already. + + # NOTE: --tmp-dir is the output dir. Then we just delete all the .bin files + # afterward so we don't copy them to x20 (they are big). + + { time \ + alarm-status $status_file $timeout_secs \ + $DECODE_ASSOC \ + --create-bool-map \ + --remove-bad-rows \ + --em-executable $em_executable \ + --schema $schema \ + --params-dir $params_dir \ + --metric-name $metric_name \ + --reports $reports \ + --var1 $var1 \ + --var2 $var2 \ + --map1 $map1 \ + --reports-sample-size $sample_size \ + --tmp-dir $output_dir \ + --output-dir $output_dir + } >$log_file 2>&1 +} + +test-decode-one() { + decode-one $RAPPOR_SRC +} + +readonly DEFAULT_MIN_REPORTS=5000 + +#readonly DEFAULT_TIMEOUT_SECONDS=300 # 5 minutes as a quick test. +readonly DEFAULT_TIMEOUT_SECONDS=3600 # 1 hour + +readonly DEFAULT_MAX_PROCS=6 # TODO: Share with backfill.sh + +# Limit to 1M for now. Raise it when we have a full run. +readonly DEFAULT_SAMPLE_SIZE=1000000 + +readonly NUM_ARGS=8 # number of tokens in the task spec, used for xargs + +# Run many decode-assoc processes in parallel. +decode-many() { + local job_dir=$1 + local spec_list=$2 + + # These 3 params affect speed + local timeout_secs=${3:-$DEFAULT_TIMEOUT_SECONDS} + local sample_size=${4:-$DEFAULT_SAMPLE_SIZE} + local max_procs=${5:-$DEFAULT_MAX_PROCS} + + local rappor_src=${6:-$RAPPOR_SRC} + local min_reports=${7:-$DEFAULT_MIN_REPORTS} + + time cat $spec_list \ + | xargs --verbose -n $NUM_ARGS -P $max_procs --no-run-if-empty -- \ + $0 decode-one $rappor_src $timeout_secs $min_reports $job_dir $sample_size +} + +# Combine assoc results and render HTML. + +combine-and-render-html() { + local jobs_base_dir=$1 + local job_dir=$2 + + banner "Combining assoc task status" + TOOLS-cook combine-assoc-task-status $jobs_base_dir $job_dir + + banner "Combining assoc results" + TOOLS-cook combine-assoc-results $jobs_base_dir $job_dir + + banner "Splitting out status per metric, and writing overview" + TOOLS-cook assoc-metric-status $job_dir + + TOOLS-gen-ui symlink-static assoc $job_dir + + banner "Building overview .part.html from CSV" + TOOLS-gen-ui assoc-overview-part-html $job_dir + + banner "Building metric .part.html from CSV" + TOOLS-gen-ui assoc-metric-part-html $job_dir + + banner "Building pair .part.html from CSV" + TOOLS-gen-ui assoc-pair-part-html $job_dir + + banner "Building day .part.html from CSV" + TOOLS-gen-ui assoc-day-part-html $job_dir +} + +# Temp files left over by the fast_em R <-> C++. +list-and-remove-bin() { + local job_dir=$1 + # If everything failed, we might not have anything to list/delete. + find $job_dir -name \*.bin | xargs --no-run-if-empty -- ls -l --si + find $job_dir -name \*.bin | xargs --no-run-if-empty -- rm -f --verbose +} + +"$@" diff --git a/pipeline/combine_results.py b/pipeline/combine_results.py new file mode 100755 index 0000000..6cb0150 --- /dev/null +++ b/pipeline/combine_results.py @@ -0,0 +1,138 @@ +#!/usr/bin/python +"""Combines results from multiple days of a single metric. + +Feed it the STATUS.txt files on stdin. It then finds the corresponding +results.csv, and takes the top N items. + +Example: + +Date, "google.com,", yahoo.com +2015-03-01, 0.0, 0.9 +2015-03-02, 0.1, 0.8 + +Dygraphs can load this CSV file directly. + +TODO: Use different dygraph API? + +Also we need error bars. + + new Dygraph(document.getElementById("graphdiv2"), + [ + [1,10,100], + [2,20,80], + [3,50,60], + [4,70,80] + ], + { + labels: [ "Date", "failure", "timeout", "google.com" ] + }); +""" + +import collections +import csv +import json +import os +import sys + +import util + + +def CombineDistResults(stdin, c_out, num_top): + dates = [] + var_cols = collections.defaultdict(dict) # {name: {date: value}} + + seen_dates = set() + + for line in stdin: + status_path = line.strip() + + # Assume it looks like .../2015-03-01/STATUS.txt + task_dir = os.path.dirname(status_path) + date = os.path.basename(task_dir) + + # Get rid of duplicate dates. These could be caused by retries. + if date in seen_dates: + continue + + seen_dates.add(date) + + with open(status_path) as f: + status = f.readline().split()[0] # OK, FAIL, TIMEOUT, SKIPPED + + dates.append(date) + + if status != 'OK': + continue # won't have results.csv + + results_path = os.path.join(task_dir, 'results.csv') + with open(results_path) as f: + c = csv.reader(f) + unused_header = c.next() # header row + + # they are sorted by decreasing "estimate", which is what we want + for i in xrange(0, num_top): + try: + row = c.next() + except StopIteration: + # It's OK if it doesn't have enough + util.log('Stopping early. Fewer than %d results to render.', num_top) + break + + string, _, _, proportion, _, prop_low, prop_high = row + + # dygraphs has a weird format with semicolons: + # value;lower;upper,value;lower;upper. + + # http://dygraphs.com/data.html#csv + + # Arbitrarily use 4 digits after decimal point (for dygraphs, not + # directly displayed) + dygraph_triple = '%.4f;%.4f;%.4f' % ( + float(prop_low), float(proportion), float(prop_high)) + + var_cols[string][date] = dygraph_triple + + # Now print CSV on stdout. + cols = sorted(var_cols.keys()) # sort columns alphabetically + c_out.writerow(['date'] + cols) + + dates.sort() + + for date in dates: + row = [date] + for col in cols: + cell = var_cols[col].get(date) # None mean sthere is no row + row.append(cell) + c_out.writerow(row) + + #util.log("Number of dynamic cols: %d", len(var_cols)) + + +def CombineAssocResults(stdin, c_out, num_top): + header = ('dummy',) + c_out.writerow(header) + + +def main(argv): + action = argv[1] + + if action == 'dist': + num_top = int(argv[2]) # number of values to keep + c_out = csv.writer(sys.stdout) + CombineDistResults(sys.stdin, c_out, num_top) + + elif action == 'assoc': + num_top = int(argv[2]) # number of values to keep + c_out = csv.writer(sys.stdout) + CombineAssocResults(sys.stdin, c_out, num_top) + + else: + raise RuntimeError('Invalid action %r' % action) + + +if __name__ == '__main__': + try: + main(sys.argv) + except RuntimeError, e: + print >>sys.stderr, 'FATAL: %s' % e + sys.exit(1) diff --git a/pipeline/combine_results_test.py b/pipeline/combine_results_test.py new file mode 100755 index 0000000..84c4cb7 --- /dev/null +++ b/pipeline/combine_results_test.py @@ -0,0 +1,38 @@ +#!/usr/bin/python -S +""" +combine_results_test.py: Tests for combine_results.py +""" + +import csv +import cStringIO +import unittest + +import combine_results # module under test + + +# TODO: Make these test more the header row. They rely heavily on the file +# system! + +class CombineResultsTest(unittest.TestCase): + + def testCombineDistResults(self): + stdin = cStringIO.StringIO('') + out = cStringIO.StringIO() + c_out = csv.writer(out) + + combine_results.CombineDistResults(stdin, c_out, 10) + actual = out.getvalue() + self.assert_(actual.startswith('date'), actual) + + def testCombineAssocResults(self): + stdin = cStringIO.StringIO('') + out = cStringIO.StringIO() + c_out = csv.writer(out) + + combine_results.CombineAssocResults(stdin, c_out, 10) + actual = out.getvalue() + self.assert_(actual.startswith('dummy'), actual) + + +if __name__ == '__main__': + unittest.main() diff --git a/pipeline/combine_status.py b/pipeline/combine_status.py new file mode 100755 index 0000000..4fbb36a --- /dev/null +++ b/pipeline/combine_status.py @@ -0,0 +1,298 @@ +#!/usr/bin/python +"""Summarize the results of many RAPPOR analysis runs. + +Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt +and log.txt files. Writes a CSV to stdout. Row key is (metric, date). +""" + +import collections +import csv +import json +import os +import re +import sys + + +# Parse bash 'time' output: +# real 0m11.578s + +# TODO: Parse the time from metrics.json instead. +TIMING_RE = re.compile( + r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE) + +# TODO: Could have decode-dist and decode-assoc output the PID? +PID_RE = re.compile( + r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal + + +def ParseMemCsv(f): + """Compute summary stats for memory. + + vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses + the kernel, it's accurate except for takes that spike in their last 4 + seconds. + + vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals + """ + peak_by_pid = collections.defaultdict(list) + size_by_pid = collections.defaultdict(list) + + # Parse columns we care about, by PID + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + continue # skip header + # looks like timestamp, pid, then (rss, peak, size) + _, pid, _, peak, size = row + if peak != '': + peak_by_pid[pid].append(int(peak)) + if size != '': + size_by_pid[pid].append(int(size)) + + mem_by_pid = {} + + # Now compute summaries + pids = peak_by_pid.keys() + for pid in pids: + peaks = peak_by_pid[pid] + vm5_peak_kib = max(peaks) + + sizes = size_by_pid[pid] + vm5_mean_kib = sum(sizes) / len(sizes) + + mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib) + + return mem_by_pid + + +def CheckJobId(job_id, parts): + """Sanity check for date or smoke test.""" + if not job_id.startswith('201') and not job_id.startswith('smoke'): + raise RuntimeError( + "Expected job ID to start with '201' or 'smoke': got %r (%s)" % + (job_id, parts)) + + +def ReadStatus(f): + status_line = f.readline().strip() + return status_line.split()[0] # OK, TIMEOUT, FAIL + + +def CombineDistTaskStatus(stdin, c_out, mem_by_pid): + """Read status task paths from stdin, write CSV summary to c_out'.""" + + #util.log('%s', mem_by_pid) + + # Parses: + # - input path for metric name and date + # - spec.txt for task params + # - STATUS.txt for task success/failure + # - metrics.json for output metrics + # - log.txt for timing, if it ran to completion + # - and for structured data + # - join with mem by PID + + header = ( + 'job_id', 'params_file', 'map_file', + 'metric', 'date', + 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped + 'seconds', 'status', + # only set when OK + 'num_reports', 'num_rappor', 'allocated_mass', + # only set when failed + 'fail_reason') + c_out.writerow(header) + + for line in stdin: + # + # Receive a STATUS.txt path on each line of stdin, and parse it. + # + status_path = line.strip() + + with open(status_path) as f: + status = ReadStatus(f) + + # Path should look like this: + # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt + parts = status_path.split('/') + job_id = parts[-5] + CheckJobId(job_id, parts) + + # + # Parse the job spec + # + result_dir = os.path.dirname(status_path) + spec_file = os.path.join(result_dir, 'spec.txt') + with open(spec_file) as f: + spec_line = f.readline() + # See backfill.sh analyze-one for the order of these 7 fields. + # There are 3 job constants on the front. + (num_reports, metric_name, date, counts_path, params_path, + map_path, _) = spec_line.split() + + # NOTE: These are all constant per metric. Could have another CSV and + # join. But denormalizing is OK for now. + params_file = os.path.basename(params_path) + map_file = os.path.basename(map_path) + + # remove extension + params_file, _ = os.path.splitext(params_file) + map_file, _ = os.path.splitext(map_file) + + # + # Read the log + # + log_file = os.path.join(result_dir, 'log.txt') + with open(log_file) as f: + lines = f.readlines() + + # Search lines in reverse order for total time. It could have output from + # multiple 'time' statements, and we want the last one. + seconds = None # for skipped + for i in xrange(len(lines) - 1, -1, -1): + # TODO: Parse the R timing too. Could use LOG_RECORD_RE. + m = TIMING_RE.search(lines[i]) + if m: + min_part, sec_part = m.groups() + seconds = float(min_part) * 60 + float(sec_part) + break + + # Extract stack trace + if status == 'FAIL': + # Stack trace looks like: "Calls: main -> RunOne ..." + fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line) + else: + fail_reason = None + + # Extract PID and join with memory results + pid = None + vm5_peak_kib = None + vm5_mean_kib = None + if mem_by_pid: + for line in lines: + m = PID_RE.match(line) + if m: + pid = m.group(1) + # Could the PID not exist if the process was super short was less + # than 5 seconds? + try: + vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid] + except KeyError: # sometimes we don't add mem-track on the front + vm5_peak_kib, vm5_mean_kib = None, None + break + else: + pass # we weren't passed memory.csv + + # + # Read the metrics + # + metrics = {} + metrics_file = os.path.join(result_dir, 'metrics.json') + if os.path.isfile(metrics_file): + with open(metrics_file) as f: + metrics = json.load(f) + + num_rappor = metrics.get('num_detected') + allocated_mass = metrics.get('allocated_mass') + + # Construct and write row + row = ( + job_id, params_file, map_file, + metric_name, date, + vm5_peak_kib, vm5_mean_kib, + seconds, status, + num_reports, num_rappor, allocated_mass, + fail_reason) + + c_out.writerow(row) + + +def CombineAssocTaskStatus(stdin, c_out): + """Read status task paths from stdin, write CSV summary to c_out'.""" + + header = ( + 'job_id', 'metric', 'date', 'status', 'num_reports', + 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1', + 'd2') + + c_out.writerow(header) + + for line in stdin: + status_path = line.strip() + + with open(status_path) as f: + status = ReadStatus(f) + + parts = status_path.split('/') + job_id = parts[-6] + CheckJobId(job_id, parts) + + # + # Parse the job spec + # + result_dir = os.path.dirname(status_path) + spec_file = os.path.join(result_dir, 'assoc-spec.txt') + with open(spec_file) as f: + spec_line = f.readline() + # See backfill.sh analyze-one for the order of these 7 fields. + # There are 3 job constants on the front. + + # 5 job params + (_, _, _, _, _, + dummy_num_reports, metric_name, date, reports, var1, var2, map1, + output_dir) = spec_line.split() + + # + # Parse decode-assoc metrics + # + metrics = {} + metrics_file = os.path.join(result_dir, 'assoc-metrics.json') + if os.path.isfile(metrics_file): + with open(metrics_file) as f: + metrics = json.load(f) + + # After we run it we have the actual number of reports + num_reports = metrics.get('num_reports') + total_elapsed_seconds = metrics.get('total_elapsed_time') + em_elapsed_seconds = metrics.get('em_elapsed_time') + estimate_dimensions = metrics.get('estimate_dimensions') + if estimate_dimensions: + d1, d2 = estimate_dimensions + else: + d1, d2 = (0, 0) # unknown + + row = ( + job_id, metric_name, date, status, num_reports, total_elapsed_seconds, + em_elapsed_seconds, var1, var2, d1, d2) + c_out.writerow(row) + + +def main(argv): + action = argv[1] + + try: + mem_csv = argv[2] + except IndexError: + mem_by_pid = None + else: + with open(mem_csv) as f: + mem_by_pid = ParseMemCsv(f) + + if action == 'dist': + c_out = csv.writer(sys.stdout) + CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid) + + elif action == 'assoc': + c_out = csv.writer(sys.stdout) + CombineAssocTaskStatus(sys.stdin, c_out) + + else: + raise RuntimeError('Invalid action %r' % action) + + +if __name__ == '__main__': + try: + main(sys.argv) + except RuntimeError, e: + print >>sys.stderr, 'FATAL: %s' % e + sys.exit(1) diff --git a/pipeline/combine_status_test.py b/pipeline/combine_status_test.py new file mode 100755 index 0000000..4606587 --- /dev/null +++ b/pipeline/combine_status_test.py @@ -0,0 +1,38 @@ +#!/usr/bin/python -S +""" +combine_status_test.py: Tests for combine_status.py +""" + +import csv +import cStringIO +import unittest + +import combine_status # module under test + + +# TODO: Make these test more the header row. They rely heavily on the file +# system! + +class CombineStatusTest(unittest.TestCase): + + def testCombineDistTaskStatus(self): + stdin = cStringIO.StringIO('') + out = cStringIO.StringIO() + c_out = csv.writer(out) + + combine_status.CombineDistTaskStatus(stdin, c_out, {}) + actual = out.getvalue() + self.assert_(actual.startswith('job_id,params_file,'), actual) + + def testCombineAssocTaskStatus(self): + stdin = cStringIO.StringIO('') + out = cStringIO.StringIO() + c_out = csv.writer(out) + + combine_status.CombineAssocTaskStatus(stdin, c_out) + actual = out.getvalue() + self.assert_(actual.startswith('job_id,metric,'), actual) + + +if __name__ == '__main__': + unittest.main() diff --git a/pipeline/cook.sh b/pipeline/cook.sh new file mode 100755 index 0000000..e820d44 --- /dev/null +++ b/pipeline/cook.sh @@ -0,0 +1,147 @@ +#!/bin/bash +# +# Take the raw data from the analysis and massage it into various formats +# suitable for display. +# +# Usage: +# ./cook.sh <function name> + +set -o nounset +set -o pipefail +set -o errexit + +readonly THIS_DIR=$(dirname $0) +readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd) + +source $RAPPOR_SRC/pipeline/tools-lib.sh + + +status-files() { + local dir=$1 + find $dir -name STATUS.txt +} + +results-files() { + local dir=$1 + find $dir -name results.csv +} + +count-results() { + # first field of each line is one of {OK, TIMEOUT, FAIL, SKIPPED} + status-files "$@" \ + | xargs cat \ + | cut -d ' ' -f 1 \ + | sort | uniq -c | sort -n -r +} + +# +# For dist cron job +# + +# Combine status of tasks over multiple jobs. Each row is a task (decode-dist +# invocation). This has the number of reports. +combine-dist-task-status() { + local base_dir=${1:-~/rappor/cron} + local job_dir=${2:-~/rappor/cron/2015-05-22__05-58-01} + + local out=$job_dir/task-status.csv + + # Ignore memory for now. + time status-files $base_dir | TOOLS-combine-status dist > $out + echo "Wrote $out" +} + +# Create a single dist.csv time series for a GIVEN metric. +combine-dist-results-one() { + local base_dir=$1 + local job_dir=$2 + local metric_name=$3 + #echo FOO $base_dir $metric_name + + local out_dir=$job_dir/cooked/$metric_name + mkdir -p $out_dir + + # Glob to capture this specific metric name over ALL job IDs. + find $base_dir/*/raw/$metric_name -name STATUS.txt \ + | TOOLS-combine-results dist 5 \ + > $out_dir/dist.csv +} + +# Creates a dist.csv file for EACH metric. TODO: Rename one/many +combine-dist-results() { + local base_dir=${1:-~/rappor/cron} + local job_dir=${2:-~/rappor/cron/2015-05-22__05-58-01} + + # Direct subdirs of 'raw' are metrics. Just print filename. + find $base_dir/*/raw -mindepth 1 -maxdepth 1 -type d -a -printf '%f\n' \ + | sort | uniq \ + | xargs --verbose -n1 -- \ + $0 combine-dist-results-one $base_dir $job_dir +} + +# Take the task-status.csv file, which has row key (metric, date). Writes +# num_reports.csv and status.csv per metric, and a single overview.csv for all +# metrics. +dist-metric-status() { + local job_dir=${1:-_tmp/results-10} + local out_dir=$job_dir/cooked + + TOOLS-metric-status dist $job_dir/task-status.csv $out_dir +} + +# +# For association analysis cron job +# + +combine-assoc-task-status() { + local base_dir=${1:-~/rappor/chrome-assoc-smoke} + local job_dir=${2:-$base_dir/smoke1} + + local out=$job_dir/assoc-task-status.csv + + time find $base_dir -name assoc-status.txt \ + | TOOLS-combine-status assoc \ + > $out + + echo "Wrote $out" +} + +# Create a single assoc.csv time series for a GIVEN (var1, var2) pair. +combine-assoc-results-one() { + local base_dir=$1 + local job_dir=$2 + local metric_pair_rel_path=$3 + + local out_dir=$job_dir/cooked/$metric_pair_rel_path + mkdir -p $out_dir + + # Glob to capture this specific metric name over ALL job IDs. + find $base_dir/*/raw/$metric_pair_rel_path -name assoc-status.txt \ + | TOOLS-combine-results assoc 5 \ + > $out_dir/assoc-results-series.csv +} + +# Creates a dist.csv file for EACH metric. TODO: Rename one/many +combine-assoc-results() { + local base_dir=${1:-~/rappor/chrome-assoc-smoke} + local job_dir=${2:-$base_dir/smoke3} + + # Direct subdirs of 'raw' are metrics, and subdirs of that are variable + # pairs. Print "$metric_name/$pair_name". + find $base_dir/*/raw -mindepth 2 -maxdepth 2 -type d -a -printf '%P\n' \ + | sort | uniq \ + | xargs --verbose -n1 -- \ + $0 combine-assoc-results-one $base_dir $job_dir +} + +# Take the assoc-task-status.csv file, which has row key (metric, date). Writes +# num_reports.csv and status.csv per metric, and a single overview.csv for all +# metrics. +assoc-metric-status() { + local job_dir=${1:-~/rappor/chrome-assoc-smoke/smoke3} + local out_dir=$job_dir/cooked + + TOOLS-metric-status assoc $job_dir/assoc-task-status.csv $out_dir +} + +"$@" diff --git a/pipeline/csv-to-html-test.sh b/pipeline/csv-to-html-test.sh new file mode 100755 index 0000000..754d083 --- /dev/null +++ b/pipeline/csv-to-html-test.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# +# Test for csv_to_html.py. +# +# Usage: +# ./csv-to-html-test.sh <function name> + +set -o nounset +set -o pipefail +set -o errexit + +test-basic() { + ./csv_to_html.py <<EOF +a_number,b +1,2 +3,4 +NA,4 +EOF +} + +test-col-format() { + ./csv_to_html.py \ + --col-format 'b <a href="../{b}/metric.html">{b}</a>' <<EOF +a,b +1,2015-05-01 +3,2015-05-02 +EOF +} + +test-var-def() { + ./csv_to_html.py \ + --def 'v VALUE' \ + --col-format 'b <a href="../{b}/metric.html">{v}</a>' <<EOF +a,b +1,2 +3,4 +EOF +} + +test-as-percent() { + ./csv_to_html.py \ + --as-percent b <<EOF +a,b +A,0.21 +B,0.001 +C,0.0009 +D,0.0001 +EOF +} + +if test $# -eq 0; then + test-basic + echo '--' + test-col-format + echo '--' + test-var-def + echo '--' + test-as-percent + echo '--' + echo 'OK' +else + "$@" +fi diff --git a/pipeline/csv_to_html.py b/pipeline/csv_to_html.py new file mode 100755 index 0000000..e4d76ae --- /dev/null +++ b/pipeline/csv_to_html.py @@ -0,0 +1,218 @@ +#!/usr/bin/python +"""Reads a CSV file on stdin, and prints an an HTML table on stdout. + +The static HTML can then be made made dynamic with JavaScript, e.g. jQuery +DataTable. + +Use Cases: + + - overview.csv -- each row is a metric + - links: to metric page + + - status.csv -- each row is a day + - links: to log.txt, to results.html +""" + +import cgi +import csv +import optparse +import sys + +import util + + +def CreateOptionsParser(): + p = optparse.OptionParser() + + # We are taking a path, and not using stdin, because we read it twice. + p.add_option( + '--col-format', dest='col_formats', metavar="'COLNAME FMT'", type='str', + default=[], action='append', + help='Add HTML links to the named column, using the given Python ' + '.format() string') + + p.add_option( + '--def', dest='defs', metavar="'NAME VALUE'", type='str', + default=[], action='append', + help='Define varaibles for use in format strings') + + p.add_option( + '--as-percent', dest='percent_cols', metavar="COLNAME", type='str', + default=[], action='append', + help='Format this floating point column as a percentage string') + + # TODO: We could include this by default, and then change all the HTML to + # have <div> placeholders instead of <table>. + p.add_option( + '--table', dest='table', default=False, action='store_true', + help='Add <table></table> tags (useful for testing)') + + return p + + +def ParseSpec(arg_list): + """Given an argument list, return a string -> string dictionary.""" + # The format string is passed the cell value. Escaped as HTML? + d = {} + for s in arg_list: + try: + name, value = s.split(' ', 1) + except ValueError: + raise RuntimeError('Invalid column format %r' % s) + d[name] = value + return d + + +def PrintRow(row, col_names, col_formats, defs, percent_cols): + """Print a CSV row as HTML, using the given formatting. + + Returns: + An array of booleans indicating whether each cell is a number. + """ + is_number_flags = [False] * len(col_names) + + for i, cell in enumerate(row): + # The cell as a string. By default we leave it as is; it may be mutated + # below. + cell_str = cell + css_class = '' # CSS class for the cell. + col_name = col_names[i] # column that the cell is under + + # Does the cell look like a float? + try: + cell_float = float(cell) + if col_name in percent_cols: # Floats can be formatted as percentages. + cell_str = '{:.1f}%'.format(cell_float * 100) + else: + # Arbitrarily use 3 digits of precision for display + cell_str = '{:.3f}'.format(cell_float) + css_class = 'num' + is_number_flags[i] = True + except ValueError: + pass + + # Does it look lik an int? + try: + cell_int = int(cell) + cell_str = '{:,}'.format(cell_int) + css_class = 'num' + is_number_flags[i] = True + except ValueError: + pass + + # Special CSS class for R NA values. + if cell_str.strip() == 'NA': + css_class = 'num na' # num should right justify; na should make it red + is_number_flags[i] = True + + if css_class: + print ' <td class="{}">'.format(css_class), + else: + print ' <td>', + + cell_safe = cgi.escape(cell_str) + + # If the cell has a format string, print it this way. + + fmt = col_formats.get(col_name) # e.g. "../{date}.html" + if fmt: + # Copy variable bindings + bindings = dict(defs) + + # Also let the format string use other column names. TODO: Is there a + # more efficient way? + bindings.update(zip(col_names, [cgi.escape(c) for c in row])) + + bindings[col_name] = cell_safe + + print fmt.format(**bindings), # no newline + else: + print cell_safe, # no newline + + print '</td>' + + return is_number_flags + + +def ReadCsv(f): + """Read the CSV file, returning the column names and rows.""" + c = csv.reader(f) + + # The first row of the CSV is assumed to be a header. The rest are data. + col_names = [] + rows = [] + for i, row in enumerate(c): + if i == 0: + col_names = row + continue + rows.append(row) + return col_names, rows + + +def PrintColGroup(col_names, col_is_numeric): + """Print HTML colgroup element, used for JavaScript sorting.""" + print '<colgroup>' + for i, col in enumerate(col_names): + # CSS class is used for sorting + if col_is_numeric[i]: + css_class = 'number' + else: + css_class = 'case-insensitive' + + # NOTE: id is a comment only; not used + print ' <col id="{}" type="{}" />'.format(col, css_class) + print '</colgroup>' + + +def main(argv): + (opts, argv) = CreateOptionsParser().parse_args(argv) + + col_formats = ParseSpec(opts.col_formats) + defs = ParseSpec(opts.defs) + + col_names, rows = ReadCsv(sys.stdin) + + for col in opts.percent_cols: + if col not in col_names: + raise RuntimeError('--percent-col %s is not a valid column' % col) + + # By default, we don't print the <table> bit -- that's up to the host page + if opts.table: + print '<table>' + + print '<thead>' + for col in col_names: + # change _ to space so long column names can wrap + print ' <td>%s</td>' % cgi.escape(col.replace('_', ' ')) + print '</thead>' + + # Assume all columns are numeric at first. Look at each row for non-numeric + # values. + col_is_numeric = [True] * len(col_names) + + print '<tbody>' + for row in rows: + print ' <tr>' + is_number_flags = PrintRow(row, col_names, col_formats, defs, + opts.percent_cols) + + # If one cell in a column is not a number, then the whole cell isn't. + for (i, is_number) in enumerate(is_number_flags): + if not is_number: + col_is_numeric[i] = False + + print ' </tr>' + print '</tbody>' + + PrintColGroup(col_names, col_is_numeric) + + if opts.table: + print '</table>' + + +if __name__ == '__main__': + try: + main(sys.argv) + except RuntimeError, e: + print >>sys.stderr, 'FATAL: %s' % e + sys.exit(1) diff --git a/pipeline/csv_to_html_test.py b/pipeline/csv_to_html_test.py new file mode 100755 index 0000000..5fd5822 --- /dev/null +++ b/pipeline/csv_to_html_test.py @@ -0,0 +1,24 @@ +#!/usr/bin/python -S +""" +csv_to_html_test.py: Tests for csv_to_html.py +""" + +import unittest + +import csv_to_html # module under test + + +class CsvToHtmlTest(unittest.TestCase): + + def testParseSpec(self): + self.assertEqual( + {'foo': 'bar', 'spam': 'eggs'}, + csv_to_html.ParseSpec(['foo bar', 'spam eggs'])) + + self.assertEqual( + {}, + csv_to_html.ParseSpec([])) + + +if __name__ == '__main__': + unittest.main() diff --git a/pipeline/dist.sh b/pipeline/dist.sh new file mode 100755 index 0000000..ad33006 --- /dev/null +++ b/pipeline/dist.sh @@ -0,0 +1,135 @@ +#!/bin/bash +# +# Usage: +# ./dist.sh <function name> + +set -o nounset +set -o pipefail +set -o errexit + +readonly THIS_DIR=$(dirname $0) +readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd) + +source $RAPPOR_SRC/util.sh # log, banner +source $RAPPOR_SRC/pipeline/tools-lib.sh +source $RAPPOR_SRC/pipeline/alarm-lib.sh + +readonly DECODE_DIST=${DEP_DECODE_DIST:-$RAPPOR_SRC/bin/decode-dist} + +readonly NUM_ARGS=7 # used for xargs + +decode-dist-one() { + # Job constants + local rappor_src=$1 + local timeout_secs=$2 + local min_reports=$3 + shift 3 # job constants do not vary per task and are not part of the spec + + # 7 spec variables + local num_reports=$1 # unused, only for filtering + local metric_name=$2 + local date=$3 + local counts=$4 + local params=$5 + local map=$6 + local results_dir=$7 + + local task_dir=$results_dir/$metric_name/$date + mkdir --verbose -p $task_dir + + local log_file=$task_dir/log.txt + local status_file=$task_dir/STATUS.txt + + # Record the spec so we know params, counts, etc. + echo "$@" > $task_dir/spec.txt + + if test $num_reports -lt $min_reports; then + local msg="SKIPPED because $num_reports reports is less than $min_reports" + # Duplicate this message + echo "$msg" > $status_file + echo "$msg" > $log_file + return + fi + + # Run it with a timeout, and record status in the task dir. + { time \ + alarm-status $status_file $timeout_secs \ + $DECODE_DIST \ + --counts $counts \ + --params $params \ + --map $map \ + --output-dir $task_dir \ + --adjust-counts-hack + } >$log_file 2>&1 + + # TODO: Don't pass --adjust-counts-hack unless the user asks for it. +} + +# Print the number of processes to use. +# NOTE: This is copied from google/rappor regtest.sh. +# It also doesn't take into account the fact that we are memory-bound. +# +# 128 GiB / 4GiB would also imply about 32 processes though. +num-processes() { + local processors=$(grep -c ^processor /proc/cpuinfo || echo 4) + if test $processors -gt 1; then # leave one CPU for the OS + processors=$(expr $processors - 1) + fi + echo $processors +} + +#readonly DEFAULT_MAX_PROCS=6 # for andychu2.hot, to avoid locking up UI +#readonly DEFAULT_MAX_PROCS=16 # for rappor-ac.hot, to avoid thrashing +readonly DEFAULT_MAX_PROCS=$(num-processes) + +#readonly DEFAULT_MAX_TASKS=12 +readonly DEFAULT_MAX_TASKS=10000 # more than the max + +# NOTE: Since we have 125 GB RAM, and processes can take up to 12 gigs of RAM, +# only use parallelism of 10, even though we have 31 cores. + +readonly DEFAULT_MIN_REPORTS=5000 + + +decode-dist-many() { + local job_dir=$1 + local spec_list=$2 + local timeout_secs=${3:-1200} # default timeout + local max_procs=${4:-$DEFAULT_MAX_PROCS} + local rappor_src=${5:-$RAPPOR_SRC} + local min_reports=${6:-$DEFAULT_MIN_REPORTS} + + local interval_secs=5 + local pid_dir="$job_dir/pids" + local sys_mem="$job_dir/system-mem.csv" + mkdir --verbose -p $pid_dir + + time cat $spec_list \ + | xargs --verbose -n $NUM_ARGS -P $max_procs --no-run-if-empty -- \ + $0 decode-dist-one $rappor_src $timeout_secs $min_reports +} + +# Combine/summarize results and task metadata from the parallel decode-dist +# processes. Render them as HTML. +combine-and-render-html() { + local jobs_base_dir=$1 + local job_dir=$2 + + banner "Combining dist task status" + TOOLS-cook combine-dist-task-status $jobs_base_dir $job_dir + + banner "Combining dist results" + TOOLS-cook combine-dist-results $jobs_base_dir $job_dir + + banner "Splitting out status per metric, and writing overview" + TOOLS-cook dist-metric-status $job_dir + + # The task-status.csv file should have the a JOB ID. + banner "Building overview.html and per-metric HTML" + TOOLS-gen-ui build-html1 $job_dir + + banner "Building individual results.html (for ONE day)" + TOOLS-gen-ui results-html $job_dir +} + +"$@" diff --git a/pipeline/metric_status.R b/pipeline/metric_status.R new file mode 100755 index 0000000..0774423 --- /dev/null +++ b/pipeline/metric_status.R @@ -0,0 +1,343 @@ +#!/usr/bin/Rscript +# +# Write an overview of task status, per-metric task status, task histograms. + +library(data.table) +library(ggplot2) + +options(stringsAsFactors = FALSE) # get rid of annoying behavior + +Log <- function(fmt, ...) { + cat(sprintf(fmt, ...)) + cat('\n') +} + +# max of non-NA values; NA if there are none +MaybeMax <- function(values) { + v <- values[!is.na(values)] + if (length(v) == 0) { + m <- NA + } else { + m <- max(v) + } + as.numeric(m) # data.table requires this; otherwise we get type errors +} + +# mean of non-NA values; NA if there are none +MaybeMean <- function(values) { + v <- values[!is.na(values)] + if (length(v) == 0) { + m <- NA + } else { + m <- mean(v) + } + as.numeric(m) # data.table require this; otherwise we get type errors +} + +WriteDistOverview <- function(summary, output_dir) { + s <- data.table(summary) # data.table syntax is easier here + + by_metric <- s[ , list( + params_file = unique(params_file), + map_file = unique(map_file), + days = length(date), + max_num_reports = MaybeMax(num_reports), + + # summarize status + ok = sum(status == 'OK'), + fail = sum(status == 'FAIL'), + timeout = sum(status == 'TIMEOUT'), + skipped = sum(status == 'SKIPPED'), + + # TODO: Need to document the meaning of these metrics. + # All could be NA + # KiB -> MB + #max_vm5_peak_mb = MaybeMax(vm5_peak_kib * 1024 / 1e6), + #mean_vm5_mean_mb = MaybeMean(vm5_mean_kib * 1024 / 1e6), + + mean_secs = MaybeMean(seconds), + mean_allocated_mass = MaybeMean(allocated_mass) + + # unique failure reasons + # This can be used when there are different call stacks. + #fail_reasons = length(unique(fail_reason[fail_reason != ""])) + ), by=metric] + + # Case insensitive sort by metric name + by_metric <- by_metric[order(tolower(by_metric$metric)), ] + + overview_path <- file.path(output_dir, 'overview.csv') + write.csv(by_metric, file = overview_path, row.names = FALSE) + Log("Wrote %s", overview_path) + + by_metric +} + +WriteDistMetricStatus <- function(summary, output_dir) { + # Write status.csv, num_reports.csv, and mass.csv for each metric. + + s <- data.table(summary) + + # loop over unique metrics, and write a CSV for each one + for (m in unique(s$metric)) { + # Select cols, and convert units. Don't need params / map / metric. + subframe <- s[s$metric == m, + list(job_id, date, status, + #vm5_peak_mb = vm5_peak_kib * 1024 / 1e6, + #vm5_mean_mb = vm5_mean_kib * 1024 / 1e6, + num_reports, + seconds, + allocated_mass, num_rappor)] + + # Sort by descending date. Alphabetical sort works fine for YYYY-MM-DD. + subframe <- subframe[order(subframe$date, decreasing = TRUE), ] + + out_path = file.path(output_dir, m, 'status.csv') + write.csv(subframe, file = out_path, row.names = FALSE) + Log("Wrote %s", out_path) + } + + # This one is just for plotting with dygraphs. TODO: can dygraphs do + # something smarter? Maybe you need to select the column in JavaScript, and + # pass it an array, rather than CSV text. + for (m in unique(s$metric)) { + f1 <- s[s$metric == m, list(date, num_reports)] + path1 <- file.path(output_dir, m, 'num_reports.csv') + # NOTE: dygraphs (only in Firefox?) doesn't like the quotes around + # "2015-04-03". In general, we can't turn off quotes, because strings with + # double quotes will be invalid CSV files. But in this case, we only have + # date and number columns, so we can. dygraphs is mistaken here. + write.csv(f1, file = path1, row.names = FALSE, quote = FALSE) + Log("Wrote %s", path1) + + # Write unallocated mass. TODO: Write the other 2 vars too? + f2 <- s[s$metric == m, + list(date, + unallocated_mass = 1.0 - allocated_mass)] + + path2 <- file.path(output_dir, m, 'mass.csv') + write.csv(f2, file = path2, row.names = FALSE, quote = FALSE) + Log("Wrote %s", path2) + } +} + +WritePlot <- function(p, outdir, filename, width = 800, height = 600) { + filename <- file.path(outdir, filename) + png(filename, width = width, height = height) + plot(p) + dev.off() + Log('Wrote %s', filename) +} + +# Make sure the histogram has some valid input. If we don't do this, ggplot +# blows up with an unintuitive error message. +CheckHistogramInput <- function(v) { + if (all(is.na(v))) { + arg_name <- deparse(substitute(v)) # R idiom to get name + Log('FATAL: All values in %s are NA (no successful runs?)', arg_name) + quit(status = 1) + } +} + +WriteDistHistograms <- function(s, output_dir) { + CheckHistogramInput(s$allocated_mass) + + p <- qplot(s$allocated_mass, geom = "histogram") + t <- ggtitle("Allocated Mass by Task") + x <- xlab("allocated mass") + y <- ylab("number of tasks") + WritePlot(p + t + x + y, output_dir, 'allocated_mass.png') + + CheckHistogramInput(s$num_rappor) + + p <- qplot(s$num_rappor, geom = "histogram") + t <- ggtitle("Detected Strings by Task") + x <- xlab("detected strings") + y <- ylab("number of tasks") + WritePlot(p + t + x + y, output_dir, 'num_rappor.png') + + CheckHistogramInput(s$num_reports) + + p <- qplot(s$num_reports / 1e6, geom = "histogram") + t <- ggtitle("Raw Reports by Task") + x <- xlab("millions of reports") + y <- ylab("number of tasks") + WritePlot(p + t + x + y, output_dir, 'num_reports.png') + + CheckHistogramInput(s$seconds) + + p <- qplot(s$seconds, geom = "histogram") + t <- ggtitle("Analysis Duration by Task") + x <- xlab("seconds") + y <- ylab("number of tasks") + WritePlot(p + t + x + y, output_dir, 'seconds.png') + + # NOTE: Skipping this for 'series' jobs. + if (sum(!is.na(s$vm5_peak_kib)) > 0) { + p <- qplot(s$vm5_peak_kib * 1024 / 1e6, geom = "histogram") + t <- ggtitle("Peak Memory Usage by Task") + x <- xlab("Peak megabytes (1e6 bytes) of memory") + y <- ylab("number of tasks") + WritePlot(p + t + x + y, output_dir, 'memory.png') + } +} + +ProcessAllDist <- function(s, output_dir) { + Log('dist: Writing per-metric status.csv') + WriteDistMetricStatus(s, output_dir) + + Log('dist: Writing histograms') + WriteDistHistograms(s, output_dir) + + Log('dist: Writing aggregated overview.csv') + WriteDistOverview(s, output_dir) +} + +# Write the single CSV file loaded by assoc-overview.html. +WriteAssocOverview <- function(summary, output_dir) { + s <- data.table(summary) # data.table syntax is easier here + + by_metric <- s[ , list( + #params_file = unique(params_file), + #map_file = unique(map_file), + + days = length(date), + max_num_reports = MaybeMax(num_reports), + + # summarize status + ok = sum(status == 'OK'), + fail = sum(status == 'FAIL'), + timeout = sum(status == 'TIMEOUT'), + skipped = sum(status == 'SKIPPED'), + + mean_total_secs = MaybeMean(total_elapsed_seconds), + mean_em_secs = MaybeMean(em_elapsed_seconds) + + ), by=list(metric)] + + # Case insensitive sort by metric name + by_metric <- by_metric[order(tolower(by_metric$metric)), ] + + overview_path <- file.path(output_dir, 'assoc-overview.csv') + write.csv(by_metric, file = overview_path, row.names = FALSE) + Log("Wrote %s", overview_path) + + by_metric +} + +# Write the CSV files loaded by assoc-metric.html -- that is, one +# metric-status.csv for each metric name. +WriteAssocMetricStatus <- function(summary, output_dir) { + s <- data.table(summary) + csv_list <- unique(s[, list(metric)]) + for (i in 1:nrow(csv_list)) { + u <- csv_list[i, ] + # Select cols, and convert units. Don't need params / map / metric. + by_pair <- s[s$metric == u$metric, + list(days = length(date), + max_num_reports = MaybeMax(num_reports), + + # summarize status + ok = sum(status == 'OK'), + fail = sum(status == 'FAIL'), + timeout = sum(status == 'TIMEOUT'), + skipped = sum(status == 'SKIPPED'), + + mean_total_secs = MaybeMean(total_elapsed_seconds), + mean_em_secs = MaybeMean(em_elapsed_seconds) + ), + by=list(var1, var2)] + + # Case insensitive sort by var1 name + by_pair <- by_pair[order(tolower(by_pair$var1)), ] + + csv_path <- file.path(output_dir, u$metric, 'metric-status.csv') + write.csv(by_pair, file = csv_path, row.names = FALSE) + Log("Wrote %s", csv_path) + } +} + +# This naming convention is in task_spec.py AssocTaskSpec. +FormatAssocRelPath <- function(metric, var1, var2) { + v2 <- gsub('..', '_', var2, fixed = TRUE) + var_dir <- sprintf('%s_X_%s', var1, v2) + file.path(metric, var_dir) +} + +# Write the CSV files loaded by assoc-pair.html -- that is, one pair-status.csv +# for each (metric, var1, var2) pair. +WriteAssocPairStatus <- function(summary, output_dir) { + + s <- data.table(summary) + + csv_list <- unique(s[, list(metric, var1, var2)]) + Log('CSV list:') + print(csv_list) + + # loop over unique metrics, and write a CSV for each one + for (i in 1:nrow(csv_list)) { + u <- csv_list[i, ] + + # Select cols, and convert units. Don't need params / map / metric. + subframe <- s[s$metric == u$metric & s$var1 == u$var1 & s$var2 == u$var2, + list(job_id, date, status, + num_reports, d1, d2, + total_elapsed_seconds, + em_elapsed_seconds)] + + # Sort by descending date. Alphabetical sort works fine for YYYY-MM-DD. + subframe <- subframe[order(subframe$date, decreasing = TRUE), ] + + pair_rel_path <- FormatAssocRelPath(u$metric, u$var1, u$var2) + + csv_path <- file.path(output_dir, pair_rel_path, 'pair-status.csv') + write.csv(subframe, file = csv_path, row.names = FALSE) + Log("Wrote %s", csv_path) + + # Write a file with the raw variable names. Parsed by ui.sh, to pass to + # csv_to_html.py. + meta_path <- file.path(output_dir, pair_rel_path, 'pair-metadata.txt') + + # NOTE: The conversion from data.table to character vector requires + # stringsAsFactors to work correctly! + lines <- as.character(u) + writeLines(lines, con = meta_path) + Log("Wrote %s", meta_path) + } +} + +ProcessAllAssoc <- function(s, output_dir) { + Log('assoc: Writing pair-status.csv for each variable pair in each metric') + WriteAssocPairStatus(s, output_dir) + + Log('assoc: Writing metric-status.csv for each metric') + WriteAssocMetricStatus(s, output_dir) + + Log('assoc: Writing aggregated overview.csv') + WriteAssocOverview(s, output_dir) +} + +main <- function(argv) { + # increase ggplot font size globally + theme_set(theme_grey(base_size = 16)) + + action = argv[[1]] + input = argv[[2]] + output_dir = argv[[3]] + + if (action == 'dist') { + summary = read.csv(input) + ProcessAllDist(summary, output_dir) + } else if (action == 'assoc') { + summary = read.csv(input) + ProcessAllAssoc(summary, output_dir) + } else { + stop(sprintf('Invalid action %s', action)) + } + + Log('Done') +} + +if (length(sys.frames()) == 0) { + main(commandArgs(TRUE)) +} diff --git a/pipeline/regtest.sh b/pipeline/regtest.sh new file mode 100755 index 0000000..a29a0f0 --- /dev/null +++ b/pipeline/regtest.sh @@ -0,0 +1,161 @@ +#!/bin/bash +# +# End-to-end tests for the dashboard. +# +# Usage: +# ./regtest.sh <function name> +# +# NOTE: Must be run in this directory (rappor/pipeline). + +set -o nounset +set -o pipefail +set -o errexit + +# Create schema and params. +create-metadata() { + mkdir -p _tmp/metadata + echo 'Hello from regtest.sh' + + local params_path=_tmp/metadata/regtest_params.csv + + # Relying on $RAPPOR_SRC/regtest.sh + cp --verbose ../_tmp/python/demo1/case_params.csv $params_path + + # For now, use the same map everywhere. + cat >_tmp/metadata/dist-analysis.csv <<EOF +var,map_filename +unif,map.csv +gauss,map.csv +exp,map.csv +m.domain,domain_map.csv +EOF + + # Both single dimensional and multi dimensional metrics. + cat >_tmp/metadata/rappor-vars.csv <<EOF +metric,var,var_type,params +m,domain,string,m_params +m,flag..HTTPS,boolean,m_params +unif,,string,regtest_params +gauss,,string,regtest_params +exp,,string,regtest_params +EOF +} + +# Create map files. +create-maps() { + mkdir -p _tmp/maps + # Use the same map for everyone now? + local map_path=_tmp/maps/map.csv + + # Relying on $RAPPOR_SRC/regtest.sh + cp --verbose ../_tmp/python/demo1/case_map.csv $map_path +} + +# Simulate different metrics. +create-counts() { + mkdir -p _tmp/counts + + for date in 2015-12-01 2015-12-02 2015-12-03; do + mkdir -p _tmp/counts/$date + + # TODO: Change params for each day. + cp --verbose \ + ../_tmp/python/demo1/1/case_counts.csv _tmp/counts/$date/unif_counts.csv + cp --verbose \ + ../_tmp/python/demo2/1/case_counts.csv _tmp/counts/$date/gauss_counts.csv + cp --verbose \ + ../_tmp/python/demo3/1/case_counts.csv _tmp/counts/$date/exp_counts.csv + done +} + +dist-task-spec() { + local job_dir=$1 + ./task_spec.py dist \ + --map-dir _tmp/maps \ + --config-dir _tmp/metadata \ + --output-base-dir $job_dir/raw \ + --bad-report-out _tmp/bad_counts.csv \ + "$@" +} + +dist-job() { + local job_id=$1 + local pat=$2 + + local job_dir=_tmp/$job_id + mkdir -p $job_dir/raw + + local spec_list=$job_dir/spec-list.txt + + find _tmp/counts/$pat -name \*_counts.csv \ + | dist-task-spec $job_dir \ + | tee $spec_list + + ./dist.sh decode-dist-many $job_dir $spec_list + ./dist.sh combine-and-render-html _tmp $job_dir +} + +dist() { + create-metadata + create-maps + create-counts + + dist-job smoke1 '2015-12-01' # one day + dist-job smoke2 '2015-12-0[23]' # two days +} + +# Simulate different metrics. +create-reports() { + mkdir -p _tmp/reports + + for date in 2015-12-01 2015-12-02 2015-12-03; do + mkdir -p _tmp/reports/$date + + # TODO: Change params for each day. + cp --verbose \ + ../bin/_tmp/reports.csv _tmp/reports/$date/m_reports.csv + done +} + +assoc-task-spec() { + local job_dir=$1 + ./task_spec.py assoc \ + --map-dir _tmp/maps \ + --config-dir _tmp/metadata \ + --output-base-dir $job_dir/raw \ + "$@" +} + +assoc-job() { + local job_id=$1 + local pat=$2 + + local job_dir=_tmp/$job_id + mkdir -p $job_dir/raw $job_dir/config + + local spec_list=$job_dir/spec-list.txt + + find _tmp/reports/$pat -name \*_reports.csv \ + | assoc-task-spec $job_dir \ + | tee $spec_list + + # decode-many calls decode_assoc.R, which expects this schema in the 'config' + # dir now. TODO: adjust this. + cp --verbose _tmp/metadata/rappor-vars.csv $job_dir/config + cp --verbose ../bin/_tmp/m_params.csv $job_dir/config + + ./assoc.sh decode-many $job_dir $spec_list + ./assoc.sh combine-and-render-html _tmp $job_dir +} + +# Copy some from bin/test.sh? The input _reports.csv files should be taken +# from there. +assoc() { + create-reports + cp --verbose ../bin/_tmp/domain_map.csv _tmp/maps + + assoc-job smoke1-assoc '2015-12-01' # one day + assoc-job smoke2-assoc '2015-12-0[23]' # two days +} + +"$@" diff --git a/pipeline/task_spec.py b/pipeline/task_spec.py new file mode 100755 index 0000000..c46bb35 --- /dev/null +++ b/pipeline/task_spec.py @@ -0,0 +1,364 @@ +#!/usr/bin/python +"""Read a list of 'counts' paths on stdin, and write a task spec on stdout. + +Each line represents a task, or R process invocation. The params on each line +are passed to ./dist.sh decode-many or ./assoc.sh decode-many. +""" + +import collections +import csv +import errno +import optparse +import os +import pprint +import re +import sys + +import util + + +def _ReadDistMaps(f): + dist_maps = {} + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['var', 'map_filename'] + if row != expected: + raise RuntimeError('Expected CSV header %s' % expected) + continue # skip header + + var_name, map_filename = row + dist_maps[var_name] = map_filename + return dist_maps + + +class DistMapLookup(object): + """Create a dictionary of var -> map to analyze against. + + TODO: Support a LIST of maps. Users should be able to specify more than one. + """ + def __init__(self, f, map_dir): + self.dist_maps = _ReadDistMaps(f) + self.map_dir = map_dir + + def GetMapPath(self, var_name): + filename = self.dist_maps[var_name] + return os.path.join(self.map_dir, filename) + + +def CreateFieldIdLookup(f): + """Create a dictionary that specifies single variable analysis each var. + + Args: + config_dir: directory of metadata, output by update_rappor.par + + Returns: + A dictionary from field ID -> full field name + + NOTE: Right now we're only doing single variable analysis for strings, so we + don't have the "type". + """ + field_id_lookup = {} + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['metric', 'field', 'field_type', 'params', 'field_id'] + if row != expected: + raise RuntimeError('Expected CSV header %s' % expected) + continue + + metric, field, field_type, _, field_id = row + + if field_type != 'string': + continue + + # Paper over the difference between plain metrics (single variable) and + # metrics with fields (multiple variables, for association analysis). + if field: + full_field_name = '%s.%s' % (metric, field) + else: + full_field_name = metric + + field_id_lookup[field_id] = full_field_name + return field_id_lookup + + +def _ReadVarSchema(f): + """Given the rappor-vars.csv file, return a list of metric/var/type.""" + # metric -> list of (variable name, type) + assoc_metrics = collections.defaultdict(list) + params_lookup = {} + + c = csv.reader(f) + for i, row in enumerate(c): + if i == 0: + expected = ['metric', 'var', 'var_type', 'params'] + if row != expected: + raise RuntimeError('Expected CSV header %s, got %s' % (expected, row)) + continue + + metric, var, var_type, params = row + if var == '': + full_var_name = metric + else: + full_var_name = '%s.%s' % (metric, var) + # Also group multi-dimensional reports + assoc_metrics[metric].append((var, var_type)) + + params_lookup[full_var_name] = params + + return assoc_metrics, params_lookup + + +class VarSchema(object): + """Object representing rappor-vars.csv. + + Right now we use it for slightly different purposes for dist and assoc + analysis. + """ + def __init__(self, f, params_dir): + self.assoc_metrics, self.params_lookup = _ReadVarSchema(f) + self.params_dir = params_dir + + def GetParamsPath(self, var_name): + filename = self.params_lookup[var_name] + return os.path.join(self.params_dir, filename + '.csv') + + def GetAssocMetrics(self): + return self.assoc_metrics + + +def CountReports(f): + num_reports = 0 + for line in f: + first_col = line.split(',')[0] + num_reports += int(first_col) + return num_reports + + +DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv') + + +def DistInputIter(stdin): + """Read lines from stdin and extract fields to construct analysis tasks.""" + for line in stdin: + m = DIST_INPUT_PATH_RE.match(line) + if not m: + raise RuntimeError('Invalid path %r' % line) + + counts_path = line.strip() + date, field_id = m.groups() + + yield counts_path, date, field_id + + +def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): + """Print task spec for single variable RAPPOR to stdout.""" + + num_bad = 0 + unique_ids = set() + + for counts_path, date, field_id in input_iter: + unique_ids.add(field_id) + + # num_reports is used for filtering + with open(counts_path) as f: + num_reports = CountReports(f) + + # Look up field name from field ID + if field_id_lookup: + field_name = field_id_lookup.get(field_id) + if field_name is None: + # The metric id is the md5 hash of the name. We can miss some, e.g. due + # to debug builds. + if bad_c: + bad_c.writerow((date, field_id, num_reports)) + num_bad += 1 + continue + else: + field_name = field_id + + # NOTE: We could remove the params from the spec if decode_dist.R took the + # --schema flag. The var type is there too. + params_path = var_schema.GetParamsPath(field_name) + map_path= dist_maps.GetMapPath(field_name) + + yield num_reports, field_name, date, counts_path, params_path, map_path + + util.log('%d unique field IDs', len(unique_ids)) + if num_bad: + util.log('Failed field ID -> field name lookup on %d files ' + '(check --field-ids file)', num_bad) + + +ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv') + + +def AssocInputIter(stdin): + """Read lines from stdin and extract fields to construct analysis tasks.""" + for line in stdin: + m = ASSOC_INPUT_PATH_RE.match(line) + if not m: + raise RuntimeError('Invalid path %r' % line) + + reports_path = line.strip() + date, metric_name = m.groups() + + yield reports_path, date, metric_name + + +def CreateAssocVarPairs(rappor_metrics): + """Yield a list of pairs of variables that should be associated. + + For now just do all (string x boolean) analysis. + """ + var_pairs = collections.defaultdict(list) + + for metric, var_list in rappor_metrics.iteritems(): + string_vars = [] + boolean_vars = [] + + # Separate variables into strings and booleans + for var_name, var_type in var_list: + if var_type == 'string': + string_vars.append(var_name) + elif var_type == 'boolean': + boolean_vars.append(var_name) + else: + util.log('Unknown type variable type %r', var_type) + + for s in string_vars: + for b in boolean_vars: + var_pairs[metric].append((s, b)) + return var_pairs + + +# For debugging +def PrintAssocVarPairs(var_pairs): + for metric, var_list in var_pairs.iteritems(): + print metric + for var_name, var_type in var_list: + print '\t', var_name, var_type + + +def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c): + """Print the task spec for multiple variable RAPPOR to stdout.""" + # Flow: + # + # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml? + # + # Short term: update_rappor.py should print every combination of string vs. + # bool? Or I guess we have it in rappor-vars.csv + + for reports_path, date, metric_name in input_iter: + pairs = var_pairs[metric_name] + for var1, var2 in pairs: + # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps? + field1_name = '%s.%s' % (metric_name, var1) + map1_path = dist_maps.GetMapPath(field1_name) + + # e.g. domain_X_flags__DID_PROCEED + # Don't use .. in filenames since it could be confusing. + pair_name = '%s_X_%s' % (var1, var2.replace('..', '_')) + output_dir = os.path.join(output_base_dir, metric_name, pair_name, date) + + yield metric_name, date, reports_path, var1, var2, map1_path, output_dir + + +def CreateOptionsParser(): + p = optparse.OptionParser() + + p.add_option( + '--bad-report-out', dest='bad_report', metavar='PATH', type='str', + default='', + help='Optionally write a report of input filenames with invalid field ' + 'IDs to this file.') + p.add_option( + '--config-dir', dest='config_dir', metavar='PATH', type='str', + default='', + help='Directory with metadata schema and params files to read.') + p.add_option( + '--map-dir', dest='map_dir', metavar='PATH', type='str', + default='', + help='Directory with map files to read.') + p.add_option( + '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str', + default='', + help='Root of the directory tree where analysis output will be placed.') + p.add_option( + '--field-ids', dest='field_ids', metavar='PATH', type='str', + default='', + help='Optional CSV file with field IDs (generally should not be used).') + + return p + + +def main(argv): + (opts, argv) = CreateOptionsParser().parse_args(argv) + + if opts.bad_report: + bad_f = open(opts.bad_report, 'w') + bad_c = csv.writer(bad_f) + else: + bad_c = None + + action = argv[1] + + if not opts.config_dir: + raise RuntimeError('--config-dir is required') + if not opts.map_dir: + raise RuntimeError('--map-dir is required') + if not opts.output_base_dir: + raise RuntimeError('--output-base-dir is required') + + # This is shared between the two specs. + path = os.path.join(opts.config_dir, 'dist-analysis.csv') + with open(path) as f: + dist_maps = DistMapLookup(f, opts.map_dir) + + path = os.path.join(opts.config_dir, 'rappor-vars.csv') + with open(path) as f: + var_schema = VarSchema(f, opts.config_dir) + + if action == 'dist': + if opts.field_ids: + with open(opts.field_ids) as f: + field_id_lookup = CreateFieldIdLookup(f) + else: + field_id_lookup = {} + + input_iter = DistInputIter(sys.stdin) + for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, + bad_c): + # The spec is a series of space-separated tokens. + tokens = row + (opts.output_base_dir,) + print ' '.join(str(t) for t in tokens) + + elif action == 'assoc': + # Parse input + input_iter = AssocInputIter(sys.stdin) + + # Create M x N association tasks + var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics()) + + # Now add the other constant stuff + for row in AssocTaskSpec( + input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c): + + num_reports = 0 # placeholder, not filtering yet + tokens = (num_reports,) + row + print ' '.join(str(t) for t in tokens) + + else: + raise RuntimeError('Invalid action %r' % action) + + +if __name__ == '__main__': + try: + main(sys.argv) + except IOError, e: + if e.errno != errno.EPIPE: # ignore broken pipe + raise + except RuntimeError, e: + print >>sys.stderr, 'FATAL: %s' % e + sys.exit(1) diff --git a/pipeline/task_spec_test.py b/pipeline/task_spec_test.py new file mode 100755 index 0000000..94cbac8 --- /dev/null +++ b/pipeline/task_spec_test.py @@ -0,0 +1,61 @@ +#!/usr/bin/python -S +""" +task_spec_test.py: Tests for task_spec.py +""" + +import cStringIO +import unittest + +import task_spec # module under test + + +class TaskSpecTest(unittest.TestCase): + + def testCountReports(self): + f = cStringIO.StringIO("""\ +1,2 +3,4 +5,6 +""") + c = task_spec.CountReports(f) + self.assertEqual(9, c) + + def testDist(self): + # NOTE: These files are opened, in order to count the reports. Maybe skip + # that step. + f = cStringIO.StringIO("""\ +_tmp/counts/2015-12-01/exp_counts.csv +_tmp/counts/2015-12-01/gauss_counts.csv +_tmp/counts/2015-12-02/exp_counts.csv +_tmp/counts/2015-12-02/gauss_counts.csv +""") + input_iter = task_spec.DistInputIter(f) + #for row in input_iter: + # print row + + field_id_lookup = {} + + # var name -> map filename + f = cStringIO.StringIO("""\ +var,map_filename +exp,map.csv +unif,map.csv +gauss,map.csv +""") + dist_maps = task_spec.DistMapLookup(f, '_tmp/maps') + + f2 = cStringIO.StringIO("""\ +metric,var,var_type,params +exp,,string,params +unif,,string,params +gauss,,string,params +""") + var_schema = task_spec.VarSchema(f2, '_tmp/config') + + for row in task_spec.DistTaskSpec( + input_iter, field_id_lookup, var_schema, dist_maps, None): + print row + + +if __name__ == '__main__': + unittest.main() diff --git a/pipeline/tools-lib.sh b/pipeline/tools-lib.sh new file mode 100755 index 0000000..c7b3b24 --- /dev/null +++ b/pipeline/tools-lib.sh @@ -0,0 +1,64 @@ +#!/bin/bash +# +# Library used to refer to open source tools. + +set -o nounset +set -o pipefail +set -o errexit + +# NOTE: RAPPOR_SRC defined by the module that sources (cook.sh or ui.sh) + +# Caller can override shebang line by setting $DEP_PYTHON. +readonly PYTHON=${DEP_PYTHON:-} + +readonly METRIC_STATUS=${DEP_METRIC_STATUS:-} + + +# These 3 used by cook.sh. + +TOOLS-combine-status() { + if test -n "$PYTHON"; then + $PYTHON $RAPPOR_SRC/pipeline/combine_status.py "$@" + else + $RAPPOR_SRC/pipeline/combine_status.py "$@" + fi +} + +TOOLS-combine-results() { + if test -n "$PYTHON"; then + $PYTHON $RAPPOR_SRC/pipeline/combine_results.py "$@" + else + $RAPPOR_SRC/pipeline/combine_results.py "$@" + fi +} + +TOOLS-metric-status() { + if test -n "$METRIC_STATUS"; then + $METRIC_STATUS "$@" + else + $RAPPOR_SRC/pipeline/metric_status.R "$@" + fi +} + +# Used by ui.sh. + +TOOLS-csv-to-html() { + if test -n "$PYTHON"; then + $PYTHON $RAPPOR_SRC/pipeline/csv_to_html.py "$@" + else + $RAPPOR_SRC/pipeline/csv_to_html.py "$@" + fi +} + +# +# Higher level scripts +# + +TOOLS-cook() { + $RAPPOR_SRC/pipeline/cook.sh "$@" +} + +# TODO: Rename gen-ui.sh. +TOOLS-gen-ui() { + $RAPPOR_SRC/pipeline/ui.sh "$@" +} diff --git a/pipeline/ui.sh b/pipeline/ui.sh new file mode 100755 index 0000000..8fbcde0 --- /dev/null +++ b/pipeline/ui.sh @@ -0,0 +1,322 @@ +#!/bin/bash +# +# Build the user interface. +# +# Usage: +# ./ui.sh <function name> + +set -o nounset +set -o pipefail +set -o errexit + +readonly THIS_DIR=$(dirname $0) +readonly RAPPOR_SRC=$(cd $THIS_DIR/.. && pwd) + +source $RAPPOR_SRC/pipeline/tools-lib.sh + +# Change the default location of this file by setting DEP_DYGRAPHS_JS +readonly DYGRAPHS_JS=${DEP_DYGRAPHS_JS:-$RAPPOR_SRC/third_party/dygraph-combined.js} + +_link() { + ln --verbose -s -f "$@" +} + +_copy() { + cp --verbose -f "$@" +} + +download-dygraphs() { + local out=third_party + wget --directory $out \ + http://dygraphs.com/1.1.1/dygraph-combined.js +} + +import-table() { + local src=~/git/scratch/ajax/ + cp --verbose $src/table-sort.{js,css} $src/url-hash.js ui + pushd ui + # TODO: Could minify it here + cat table-sort.js url-hash.js > table-lib.js + popd +} + +# Use symlinks so we can edit and reload during development. +symlink-static() { + local kind=$1 + local job_dir=$2 + + local base=$RAPPOR_SRC/ui + + # HTML goes at the top level. + if test "$kind" = dist; then + _link \ + $base/overview.html $base/histograms.html $base/metric.html $base/day.html \ + $job_dir + elif test "$kind" = assoc; then + _link \ + $base/assoc-overview.html $base/assoc-metric.html $base/assoc-pair.html \ + $base/assoc-day.html \ + $job_dir + else + log "Invalid kind $kind" + exit 1 + fi + + mkdir --verbose -p $job_dir/static + + # Static subdir. + _link \ + $base/ui.css $base/ui.js \ + $base/table-sort.css $base/table-lib.js \ + $DYGRAPHS_JS \ + $job_dir/static +} + + +# Write HTML fragment based on overview.csv. +overview-part-html() { + local job_dir=${1:-_tmp/results-10} + local out=$job_dir/cooked/overview.part.html + # Sort by descending date! + TOOLS-csv-to-html \ + --col-format 'metric <a href="metric.html#metric={metric}">{metric}</a>' \ + < $job_dir/cooked/overview.csv \ + > $out + echo "Wrote $out" +} + +metric-part-html() { + local job_dir=${1:-_tmp/results-10} + # Testing it out. This should probably be a different dir. + + for entry in $job_dir/cooked/*; do + # Only do it for dirs + if ! test -d $entry; then + continue + fi + # Now it's a metric dir + echo $entry + + local metric_name=$(basename $entry) + + # Convert status.csv to status.part.html (a fragment) + + # NOTE: counts path could be useful. You need the input tree though. Hash + # it? Or point to the git link. + + # Link to raw CSV + #--col-format 'date <a href="../../raw/{metric}/{date}/results.csv">{date}</a>' \ + + # TODO: Link to ui/results_viewer.html#{metric}_{date} + # And that needs some JavaScript to load the correct fragment. + # I guess you could do the same with metric.html. Currently it uses a + # symlink. + + # Before job ID: + # --col-format 'date <a href="{date}.html">{date}</a>' \ + # --col-format 'status <a href="../../raw/{metric}/{date}/log.txt">{status}</a>' \ + + local fmt1='date <a href="day.html#jobId={job_id}&metric={metric}&date={date}">{date}</a>' + local fmt2='status <a href="../{job_id}/raw/{metric}/{date}/log.txt">{status}</a>' + + TOOLS-csv-to-html \ + --def "metric $metric_name" \ + --col-format "$fmt1" \ + --col-format "$fmt2" \ + < $entry/status.csv \ + > $entry/status.part.html + done +} + +results-html-one() { + local csv_in=$1 + echo "$csv_in -> HTML" + + # .../raw/Settings.HomePage2/2015-03-01/results.csv -> + # .../cooked/Settings.HomePage2/2015-03-01.part.html + # (This saves some directories) + local html_out=$(echo $csv_in | sed -e 's|/raw/|/cooked/|; s|/results.csv|.part.html|') + + TOOLS-csv-to-html < $csv_in > $html_out +} + +results-html() { + local job_dir=${1:-_tmp/results-10} + + find $job_dir -name results.csv \ + | xargs -n 1 --verbose --no-run-if-empty -- $0 results-html-one +} + +# Build parts of the HTML +build-html1() { + local job_dir=${1:-_tmp/results-10} + + symlink-static dist $job_dir + + # writes overview.part.html, which is loaded by overview.html + overview-part-html $job_dir + + # Writes status.part.html for each metric + metric-part-html $job_dir +} + +# +# Association Analysis +# + +readonly ASSOC_TEST_JOB_DIR=~/rappor/chrome-assoc-smoke/smoke5-assoc + +# Write HTML fragment based on CSV. +assoc-overview-part-html() { + local job_dir=${1:-$ASSOC_TEST_JOB_DIR} + local html_path=$job_dir/cooked/assoc-overview.part.html + + # Sort by descending date! + + TOOLS-csv-to-html \ + --col-format 'metric <a href="assoc-metric.html#metric={metric}">{metric}</a>' \ + < $job_dir/cooked/assoc-overview.csv \ + > $html_path + echo "Wrote $html_path" +} + +assoc-metric-part-html-one() { + local csv_path=$1 + local html_path=$(echo $csv_path | sed 's/.csv$/.part.html/') + + local metric_dir=$(dirname $csv_path) + local metric_name=$(basename $metric_dir) # e.g. interstitial.harmful + + local fmt='days <a href="assoc-pair.html#metric={metric}&var1={var1}&var2={var2}">{days}</a>' + + TOOLS-csv-to-html \ + --def "metric $metric_name" \ + --col-format "$fmt" \ + < $csv_path \ + > $html_path + + echo "Wrote $html_path" +} + +assoc-metric-part-html() { + local job_dir=${1:-$ASSOC_TEST_JOB_DIR} + # Testing it out. This should probably be a different dir. + + find $job_dir/cooked -name metric-status.csv \ + | xargs -n 1 --verbose --no-run-if-empty -- $0 assoc-metric-part-html-one +} + +# TODO: +# - Construct link in JavaScript instead? It has more information. The +# pair-metadata.txt file is a hack. + +assoc-pair-part-html-one() { + local csv_path=$1 + local html_path=$(echo $csv_path | sed 's/.csv$/.part.html/') + + local pair_dir_path=$(dirname $csv_path) + local pair_dir_name=$(basename $pair_dir_path) # e.g. domain_X_flags_IS_REPEAT_VISIT + + # This file is generated by metric_status.R for each pair of variables. + local metadata="$pair_dir_path/pair-metadata.txt" + # Read one variable per line. + { read metric_name; read var1; read var2; } < $metadata + + local fmt1='date <a href="assoc-day.html#jobId={job_id}&metric={metric}&var1={var1}&var2={var2}&date={date}">{date}</a>' + local fmt2="status <a href=\"../{job_id}/raw/{metric}/$pair_dir_name/{date}/assoc-log.txt\">{status}</a>" + + TOOLS-csv-to-html \ + --def "metric $metric_name" \ + --def "var1 $var1" \ + --def "var2 $var2" \ + --col-format "$fmt1" \ + --col-format "$fmt2" \ + < $csv_path \ + > $html_path +} + +assoc-pair-part-html() { + local job_dir=${1:-~/rappor/chrome-assoc-smoke/smoke3} + # Testing it out. This should probably be a different dir. + + find $job_dir/cooked -name pair-status.csv \ + | xargs -n 1 --verbose -- $0 assoc-pair-part-html-one + + return + + # OLD STUFF + for entry in $job_dir/cooked/*; do + # Only do it for dirs + if ! test -d $entry; then + continue + fi + # Now it's a metric dir + echo $entry + + local metric_name=$(basename $entry) + + # Convert status.csv to status.part.html (a fragment) + + # NOTE: counts path could be useful. You need the input tree though. Hash + # it? Or point to the git link. + + # Link to raw CSV + #--col-format 'date <a href="../../raw/{metric}/{date}/results.csv">{date}</a>' \ + + # TODO: Link to ui/results_viewer.html#{metric}_{date} + # And that needs some JavaScript to load the correct fragment. + # I guess you could do the same with metric.html. Currently it uses a + # symlink. + + # Before job ID: + # --col-format 'date <a href="{date}.html">{date}</a>' \ + # --col-format 'status <a href="../../raw/{metric}/{date}/log.txt">{status}</a>' \ + + local fmt1='date <a href="day.html#jobId={job_id}&metric={metric}&date={date}">{date}</a>' + local fmt2='status <a href="../{job_id}/raw/{metric}/{date}/log.txt">{status}</a>' + + TOOLS-csv-to-html \ + --def "metric $metric_name" \ + --col-format "$fmt1" \ + --col-format "$fmt2" \ + < $entry/status.csv \ + > $entry/status.part.html + done +} + +assoc-day-part-html-one() { + local csv_in=$1 + echo "$csv_in -> HTML" + + # .../raw/interstitial.harmful/a_X_b/2015-03-01/assoc-results.csv -> + # .../cooked/interstitial.harmful/a_X_b/2015-03-01.part.html + # (This saves some directories) + local html_out=$(echo $csv_in | sed -e 's|/raw/|/cooked/|; s|/assoc-results.csv|.part.html|') + + TOOLS-csv-to-html --as-percent proportion < $csv_in > $html_out +} + +assoc-day-part-html() { + local job_dir=${1:-_tmp/results-10} + + find $job_dir -name assoc-results.csv \ + | xargs -n 1 --verbose --no-run-if-empty -- $0 assoc-day-part-html-one +} + +lint-html() { + set -o xtrace + set +o errexit # don't fail fast + tidy -errors -quiet ui/metric.html + tidy -errors -quiet ui/overview.html + tidy -errors -quiet ui/histograms.html +} + +# Directory we should serve from +readonly WWW_DIR=_tmp + +serve() { + local port=${1:-7999} + cd $WWW_DIR && python -m SimpleHTTPServer $port +} + +"$@" diff --git a/pipeline/util.py b/pipeline/util.py new file mode 100755 index 0000000..c517483 --- /dev/null +++ b/pipeline/util.py @@ -0,0 +1,9 @@ +"""Common functions.""" + +import sys + + +def log(msg, *args): + if args: + msg = msg % args + print >>sys.stderr, msg |