diff options
Diffstat (limited to 'catapult/trace_processor')
35 files changed, 3485 insertions, 41 deletions
diff --git a/catapult/trace_processor/bin/deploy_trace_uploader b/catapult/trace_processor/bin/deploy_trace_uploader new file mode 100755 index 00000000..a04bd3e8 --- /dev/null +++ b/catapult/trace_processor/bin/deploy_trace_uploader @@ -0,0 +1,49 @@ +#!/usr/bin/python +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import argparse +import os +import subprocess +import sys + + +def _AddToPathIfNeeded(path): + if path not in sys.path: + sys.path.insert(0, path) + + +def Main(): + catapult_path = os.path.abspath(os.path.join( + os.path.dirname(__file__), os.path.pardir, os.path.pardir)) + parser = argparse.ArgumentParser() + parser.add_argument('--appid', default='performance-insights') + parser.add_argument('--version', default='pi-test') + args = parser.parse_args() + + _AddToPathIfNeeded( + os.path.join(catapult_path, 'trace_processor')) + from trace_uploader.endpoints import cloud_mapper + paths = cloud_mapper.PathsForDeployment() + + _AddToPathIfNeeded(catapult_path) + from catapult_build import temp_deployment_dir + + file_sets = [ + ['app.yaml', 'cron.yaml', 'dispatch.yaml', 'index.yaml'] + ] + for cur_set in file_sets: + with temp_deployment_dir.TempDeploymentDir( + paths, use_symlinks=False) as temp_dir: + cmd = ['gcloud', 'preview', 'app', 'deploy'] + cmd += cur_set + cmd += [ + '--project=%s' % args.appid, + '--version=%s' % args.version, + '--force', '--quiet', '--no-promote', '--docker-build', 'local'] + subprocess.call(cmd, cwd=temp_dir) + +if __name__ == '__main__': + Main() + diff --git a/catapult/trace_processor/bin/process_traces b/catapult/trace_processor/bin/process_traces index 48266966..700ec6f3 100755 --- a/catapult/trace_processor/bin/process_traces +++ b/catapult/trace_processor/bin/process_traces @@ -9,11 +9,9 @@ import sys if __name__ == '__main__': path_to_catapult = os.path.join(os.path.dirname(__file__), '..', '..') path_to_tracing = os.path.join(path_to_catapult, 'tracing') - path_to_perf_insights = os.path.join(path_to_catapult, 'perf_insights') path_to_trace_processor = os.path.join(path_to_catapult, 'trace_processor') sys.path.append(path_to_tracing) - sys.path.append(path_to_perf_insights) sys.path.append(path_to_trace_processor) from trace_processor import process_traces diff --git a/catapult/trace_processor/experimental/bin/map_v8runtimecallstats b/catapult/trace_processor/experimental/bin/map_v8runtimecallstats new file mode 100755 index 00000000..ab55d22e --- /dev/null +++ b/catapult/trace_processor/experimental/bin/map_v8runtimecallstats @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# Copyright 2016 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +import os +import sys +import json +import ntpath + + +OUT_DIR = 'v8_callstats_dump' + +URL_MAP = dict() + +URL_MAP['http___edition_cnn_com10.html'] = 'cnn.com' +URL_MAP['http___hi_wikipedia_org_wiki__E0_A4_AE_E0_A5_81_E0_A4_96_E0_A4_AA_E0_A5_83_E0_A4_B7_E0_A5_8D_E0_A4_A06.html'] = 'wikipedia.org' +URL_MAP['http___inbox_google_com23.html'] = 'inbox.google.com' +URL_MAP['http___maps_google_co_jp_maps_search_restaurant_tokyo24.html'] = 'maps.google.co.jp' +URL_MAP['http___meta_discourse_org21.html'] = 'discourse.org' +URL_MAP['http___reddit_musicplayer_io22.html'] = 'reddit.musicplayer.io' +URL_MAP['https___www_facebook_com_shakira2.html'] = 'facebook.com' +URL_MAP['https___www_google_de_search_q_v80.html'] = 'google.de' +URL_MAP['https___www_linkedin_com_m_13.html'] = 'linkedin.com' +URL_MAP['https___www_youtube_com1.html'] = 'youtube.com' +URL_MAP['http___weibo_com18.html'] = 'weibo.com' +URL_MAP['http___world_taobao_com11.html'] = 'taobao.com' +URL_MAP['http___www_amazon_com_s__field_keywords_v85.html'] = 'amazon.com' +URL_MAP['http___www_baidu_com_s_wd_v83.html'] = 'baidu.com' +URL_MAP['http___www_bing_com_search_q_v8_engine15.html'] = 'bing.com' +URL_MAP['http___www_ebay_fr_sch_i_html__nkw_v89.html'] = 'ebay.fr' +URL_MAP['http___www_instagram_com_archdigest12.html'] = 'instagram.com' +URL_MAP['http___www_msn_com_ar_ae14.html'] = 'msn.com' +URL_MAP['http___www_pinterest_com_categories_popular16.html'] = 'pinterest.com' +URL_MAP['http___www_qq_com7.html'] = 'qq.com' +URL_MAP['http___www_reddit_com8.html'] = 'reddit.com' +URL_MAP['http___www_sina_com_cn17.html'] = 'sina.com.cn' +URL_MAP['http___www_wikiwand_com_en_hill20.html'] = 'wikiwand.com' +URL_MAP['http___www_yahoo_co_jp4.html'] = 'yahoo.co.jp' +URL_MAP['http___yandex_ru_search__text_v819.html'] = 'yandex.ru' + + +def extractFilename(path): + head, tail = ntpath.split(path) + name = tail or ntpath.basename(head) + if name in URL_MAP: + return URL_MAP[name] + return name + + +def writeDump(name, value): + dump_file = open(OUT_DIR + '/' + extractFilename(name) + '.txt', 'w+') + runtime_call = value['pairs'] + for name in runtime_call: + dump_file.write(name + '\t' + str(runtime_call[name]['time']) + '\tX\t' + str(runtime_call[name]['count']) + '\n') + dump_file.close() + +if __name__ == '__main__': + with open(sys.argv[1]) as data_file: + data = json.load(data_file) + + if not os.path.exists(OUT_DIR): + os.makedirs(OUT_DIR) + + for entry in data: + writeDump(entry, data[entry]) + sys.exit(0) diff --git a/catapult/trace_processor/experimental/mappers/long_running_tasks_mapper.html b/catapult/trace_processor/experimental/mappers/long_running_tasks_mapper.html new file mode 100644 index 00000000..45844090 --- /dev/null +++ b/catapult/trace_processor/experimental/mappers/long_running_tasks_mapper.html @@ -0,0 +1,161 @@ +<!DOCTYPE html> +!-- +Copyright 2016 The Chromium Authors. All rights reserved. +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. +--> + +<script> +'use strict'; + +tr.exportTo('pi.m', function() { +var LONG_TASK_MS = 100; + +var UNINTERESTING_TASKS = [ + 'MessageLoop::RunTask', + 'TaskQueueManager::DoWork', + 'TaskQueueManager::ProcessTaskFromWorkQueue', + 'TaskQueueManager::RunTask', + 'TimerBase::run', + 'ThreadProxy::BeginMainFrame', + 'ParseHTML', + 'TimerFire', + 'EventDispatch', + 'WebViewImpl::updateAllLifecyclePhases', + 'ScheduledAction::execute', + 'HTMLDocumentParser::processParsedChunkFromBackgroundParser', + 'HTMLScriptRunner::executeScriptsWaitingForLoad', + 'ResourceDispatcher::OnRequestComplete', + 'HTMLScriptRunner::execute', + 'ChannelProxy::Context::OnDispatchMessage', + 'XHRReadyStateChange', + 'RenderFrameImpl::didFinishDocumentLoad', + 'ResourceDispatcher::OnReceivedData', + 'WebViewImpl::beginFrame', + 'RenderFrameImpl::OnBeforeUnload', + 'WindowProxy::initialize', + 'Sampler::InstallJitCodeEventHandler', + 'ResourceMsg_DataReceived', + 'ResourceReceivedData', + 'RenderWidgetInputHandler::OnHandleInputEvent', + 'ProxyMain::BeginMainFrame::commit', + 'ResourceMsg_RequestComplete', + 'ExtensionMsg_Response', + 'ExtensionMsg_MessageInvoke', + 'SyncChannel::Send', + 'SingleThreadIdleTaskRunner::RunTask', + 'ResourceDispatcher::OnReceivedResponse', + 'FireAnimationFrame', + 'InputMsg_HandleInputEvent', + 'WebURLLoaderImpl::Context::OnCompletedRequest', + 'WebURLLoaderImpl::Context::OnReceivedData', + 'v8.callFunction', + 'ExtensionMsg_Loaded', + 'ProxyMain::BeginMainFrame::commit', + 'XHRLoad', + 'RenderFrameImpl::OnNavigate', + 'CommitLoad', + 'FrameMsg_Navigate', + 'ResourceFinish', + 'RenderViewImpl::OnResize', + 'HostDispatcher::OnMessageReceived', + 'HTMLScriptRunner::executeScriptsWaitingForParsing', + 'HTMLScriptRunner::executeScriptsWaitingForResources', + 'FrameMsg_JavaScriptExecuteRequest', + 'v8.callModuleMethod' +]; + + function longRunningScriptsMapper(result, model) { + var resultArray = []; + + iterateRendererMainThreads(model, function(thread) { + thread.sliceGroup.topLevelSlices.forEach(function(slice) { + + var interestingSlice = findLongestInterestingSlice(slice); + if (interestingSlice.duration < LONG_TASK_MS) + return; + + var scriptURL = getScriptURL(interestingSlice); + if (scriptURL !== undefined && scriptURL != '') { + var topLevelDomain = extractDomain(scriptURL); + resultArray.push({key: topLevelDomain, + float_value: interestingSlice.duration}); + } + }); + }); + + result.addPair('values', resultArray); + } + + + tr.mre.FunctionRegistry.register(longRunningScriptsMapper); + + return { + longRunningScriptsMapper: longRunningScriptsMapper + }; + + function iterateRendererMainThreads(model, cb, opt_this) { + var modelHelper = model.getOrCreateHelper( + tr.model.helpers.ChromeModelHelper); + tr.b.dictionaryValues(modelHelper.rendererHelpers).forEach( + function(rendererHelper) { + if (!rendererHelper.mainThread) + return; + cb.call(opt_this, rendererHelper.mainThread); + }); + } + + function getScriptURL(slice) { + var url = undefined; + + if (slice.title === 'v8.run') { + url = slice.args['fileName']; + } else if (slice.title === 'v8.compile') { + url = slice.args['fileName']; + } else if (slice.title === 'FunctionCall') { + url = slice.args['data']['scriptName']; + } else if (slice.title === 'EvaluateScript') { + url = slice.args['data']['url']; + } else if (slice.title === 'HTMLScriptRunner ExecuteScript') { + url = slice.args['data']['url']; + } + + return url; + } + + function findLongestInterestingSlice(slice) { + if (UNINTERESTING_TASKS.indexOf(slice.title) >= 0) { + var longestSlice = undefined; + var subSlices = slice.subSlices; + for (var i = 0; i < subSlices.length; ++i) { + if (longestSlice === undefined || + longestSlice.duration < subSlices[i].duration) { + longestSlice = subSlices[i]; + } + } + + if (longestSlice !== undefined) + return findLongestInterestingSlice(longestSlice); + } + + return slice; + } + + function extractDomain(url) { + var domain; + //find & remove protocol (http, ftp, etc.) and get domain + if (url.indexOf('://') > -1) { + domain = url.split('/')[2]; + } + else { + domain = url.split('/')[0]; + } + + //find & remove port number + domain = domain.split(':')[0]; + + return domain; + } +}); + +</script> diff --git a/catapult/trace_processor/experimental/mappers/slice_cost.html b/catapult/trace_processor/experimental/mappers/slice_cost.html index 8860f87c..d1f1ff8f 100644 --- a/catapult/trace_processor/experimental/mappers/slice_cost.html +++ b/catapult/trace_processor/experimental/mappers/slice_cost.html @@ -4,7 +4,7 @@ Copyright (c) 2015 The Chromium Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> -<link rel="import" href="/perf_insights/mappers/reduce.html"> +<link rel="import" href="/experimental/mappers/reduce.html"> <link rel="import" href="/tracing/extras/ads/domain_category.html"> <link rel="import" href="/tracing/extras/chrome/slice_title_fixer.html"> <link rel="import" href="/tracing/model/source_info/js_source_info.html"> @@ -175,6 +175,8 @@ tr.exportTo('pi.m', function() { return sliceCostInfos; } + tr.mre.FunctionRegistry.register(getSliceCostReport); + return { SliceCostInfo: SliceCostInfo, diff --git a/catapult/trace_processor/experimental/mappers/task_info_map_function.html b/catapult/trace_processor/experimental/mappers/task_info_map_function.html index 41246d4c..2d37ed06 100644 --- a/catapult/trace_processor/experimental/mappers/task_info_map_function.html +++ b/catapult/trace_processor/experimental/mappers/task_info_map_function.html @@ -7,18 +7,17 @@ found in the LICENSE file. <link rel="import" href="/perf_insights/mappers/thread_grouping.html"> <link rel="import" href="/perf_insights/mre/function_handle.html"> +<link rel="import" href="/tracing/base/unit.html"> <link rel="import" href="/tracing/model/flow_event.html"> <link rel="import" href="/tracing/model/slice.html"> -<link rel="import" href="/tracing/value/numeric.html"> -<link rel="import" href="/tracing/value/unit.html"> +<link rel="import" href="/tracing/value/histogram.html"> <script> 'use strict'; tr.exportTo('pi.m', function() { - var DURATION_NUMERIC_BUILDER = tr.v.NumericBuilder.createLinear( - tr.v.Unit.byName.timeDurationInMs, - tr.b.Range.fromExplicitRange(0, 250), 50); + var DURATION_BOUNDARIES = tr.v.HistogramBinBoundaries.createLinear( + 0, 250, 50); function taskInfoMapFunction(result, model) { var canonicalUrl = model.canonicalUrl; @@ -98,7 +97,8 @@ tr.exportTo('pi.m', function() { function addToHistogram(dict, processName, threadName, value, url) { dict[processName] = dict[processName] || {}; dict[processName][threadName] = dict[processName][threadName] || - DURATION_NUMERIC_BUILDER.build(); + new tr.v.Histogram(tr.b.Unit.byName.timeDurationInMs, + DURATION_BOUNDARIES); dict[processName][threadName].add(value, url); } diff --git a/catapult/trace_processor/experimental/mappers/task_info_map_function_test.html b/catapult/trace_processor/experimental/mappers/task_info_map_function_test.html index b559491e..f6793143 100644 --- a/catapult/trace_processor/experimental/mappers/task_info_map_function_test.html +++ b/catapult/trace_processor/experimental/mappers/task_info_map_function_test.html @@ -9,7 +9,7 @@ found in the LICENSE file. <link rel="import" href="/perf_insights/mre/mre_result.html"> <link rel="import" href="/tracing/base/iteration_helpers.html"> <link rel="import" href="/tracing/core/test_utils.html"> -<link rel="import" href="/tracing/value/numeric.html"> +<link rel="import" href="/tracing/value/histogram.html"> <script> 'use strict'; @@ -55,7 +55,7 @@ tr.b.unittest.testSuite(function() { assert.equal(tr.b.dictionaryLength(result.pairs), 3); var time_spent_in_queue = result.pairs.time_spent_in_queue; assert.equal(tr.b.dictionaryLength(time_spent_in_queue.Browser), 1); - var histogram = tr.v.Numeric.fromDict( + var histogram = tr.v.Histogram.fromDict( time_spent_in_queue['Browser']['CrBrowserMain']); assert.equal(histogram.getBinForValue(7.2).count, 1); assert.equal(histogram.getBinForValue(18.1).count, 1); @@ -63,14 +63,14 @@ tr.b.unittest.testSuite(function() { result.pairs.time_spent_in_top_level_task); assert.equal(tr.b.dictionaryLength( time_spent_in_top_level_task['Browser']), 1); - histogram = tr.v.Numeric.fromDict( + histogram = tr.v.Histogram.fromDict( time_spent_in_top_level_task['Browser']['CrBrowserMain']); assert.equal(histogram.getBinForValue(10.4).count, 1); var cpu_time_spent_in_top_level_task = ( result.pairs.cpu_time_spent_in_top_level_task); assert.equal(tr.b.dictionaryLength( cpu_time_spent_in_top_level_task['Browser']), 1); - histogram = tr.v.Numeric.fromDict( + histogram = tr.v.Histogram.fromDict( cpu_time_spent_in_top_level_task['Browser']['CrBrowserMain']); assert.equal(histogram.getBinForValue(3.0).count, 1); }); diff --git a/catapult/trace_processor/experimental/mappers/test_mapper.html b/catapult/trace_processor/experimental/mappers/test_mapper.html index bcb30082..4e9b070a 100644 --- a/catapult/trace_processor/experimental/mappers/test_mapper.html +++ b/catapult/trace_processor/experimental/mappers/test_mapper.html @@ -4,8 +4,7 @@ Copyright (c) 2015 The Chromium Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> -<link rel="import" href="/perf_insights/mre/function_handle.html"> -<link rel="import" href="/tracing/value/value.html"> +<link rel="import" href="/tracing/mre/function_handle.html"> <script> 'use strict'; @@ -16,7 +15,7 @@ tr.exportTo('pi.m', function() { var someValue = 4; // Chosen by fair roll of the dice. result.addPair('simon', {value: someValue}); } - pi.FunctionRegistry.register(testMapFunction); + tr.mre.FunctionRegistry.register(testMapFunction); return { testMapFunction: testMapFunction diff --git a/catapult/trace_processor/experimental/mappers/thread_grouping.html b/catapult/trace_processor/experimental/mappers/thread_grouping.html index 3e5c6164..eab6d79a 100644 --- a/catapult/trace_processor/experimental/mappers/thread_grouping.html +++ b/catapult/trace_processor/experimental/mappers/thread_grouping.html @@ -7,7 +7,6 @@ found in the LICENSE file. <link rel="import" href="/tracing/base/iteration_helpers.html"> <link rel="import" href="/tracing/model/helpers/chrome_model_helper.html"> -<link rel="import" href="/tracing/value/value.html"> <script> 'use strict'; diff --git a/catapult/trace_processor/experimental/mappers/trace_stats.html b/catapult/trace_processor/experimental/mappers/trace_stats.html index cb1c18dd..a8e48a0c 100644 --- a/catapult/trace_processor/experimental/mappers/trace_stats.html +++ b/catapult/trace_processor/experimental/mappers/trace_stats.html @@ -5,18 +5,16 @@ Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> -<link rel="import" href="/perf_insights/mre/function_handle.html"> <link rel="import" href="/tracing/base/range.html"> -<link rel="import" href="/tracing/value/numeric.html"> -<link rel="import" href="/tracing/value/unit.html"> +<link rel="import" href="/tracing/base/unit.html"> +<link rel="import" href="/tracing/mre/function_handle.html"> +<link rel="import" href="/tracing/value/histogram.html"> <script> 'use strict'; tr.exportTo('pi.m', function() { - var COUNT_NUMERIC_BUILDER = tr.v.NumericBuilder.createLinear( - tr.v.Unit.byName.unitlessNumber, tr.b.Range.fromExplicitRange(0, 50000), - 20); + var COUNT_BOUNDARIES = tr.v.HistogramBinBoundaries.createLinear(0, 5e4, 20); function traceStatsFunction(result, model) { var canonicalUrl = model.canonicalUrl; @@ -47,7 +45,8 @@ tr.exportTo('pi.m', function() { seconds_counts[second]++; } - var histogram = COUNT_NUMERIC_BUILDER.build(); + var histogram = new tr.v.Histogram( + tr.b.Unit.byName.count, COUNT_BOUNDARIES); for (var second in seconds_counts) histogram.add(seconds_counts[second]); @@ -65,7 +64,7 @@ tr.exportTo('pi.m', function() { result.addPair('stats', stats); } - pi.FunctionRegistry.register(traceStatsFunction); + tr.mre.FunctionRegistry.register(traceStatsFunction); //Exporting for tests. return { diff --git a/catapult/trace_processor/experimental/mappers/trace_stats_test.html b/catapult/trace_processor/experimental/mappers/trace_stats_test.html index 002078f4..33380cb6 100644 --- a/catapult/trace_processor/experimental/mappers/trace_stats_test.html +++ b/catapult/trace_processor/experimental/mappers/trace_stats_test.html @@ -5,10 +5,9 @@ Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> -<link rel="import" href="/perf_insights/mappers/trace_stats.html"> -<link rel="import" href="/perf_insights/mre/mre_result.html"> <link rel="import" href="/tracing/base/iteration_helpers.html"> <link rel="import" href="/tracing/core/test_utils.html"> +<link rel="import" href="/tracing/mre/mre_result.html"> <script> 'use strict'; diff --git a/catapult/trace_processor/experimental/mappers/v8_callstats_dump.html b/catapult/trace_processor/experimental/mappers/v8_callstats_dump.html index 154a6a6e..4756869e 100644 --- a/catapult/trace_processor/experimental/mappers/v8_callstats_dump.html +++ b/catapult/trace_processor/experimental/mappers/v8_callstats_dump.html @@ -4,16 +4,16 @@ Copyright 2015 The Chromium Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> -<link rel="import" href="/perf_insights/mre/function_handle.html"> <link rel="import" href="/tracing/base/iteration_helpers.html"> <link rel="import" href="/tracing/metrics/system_health/loading_metric.html"> +<link rel="import" href="/tracing/mre/function_handle.html"> <link rel="import" href="/tracing/value/value_set.html"> <script> 'use strict'; -tr.exportTo('pi.m', function() { +tr.exportTo('tr.mre', function() { function v8CallStatsDump(result, model) { var v8_runtime_map = {}; var totalCount = 0; @@ -28,12 +28,23 @@ tr.exportTo('pi.m', function() { throw 'Unable to work with a trace that has more than one navigation'; } - var tti = numeric.running.mean; + var binsWithSampleDiagnosticMaps = numeric.centralBins.filter( + (bin) => bin.diagnosticMaps.length > 0); + var diagnostic = binsWithSampleDiagnosticMaps[0] + .diagnosticMaps[0]['breakdown']; + + var tti = diagnostic.value.interactive; for (var event of model.getDescendantEvents()) { if (!(event instanceof tr.model.ThreadSlice) || event.start > tti) continue; - var v8_runtime = event.args['runtime-call-stat']; + var v8_runtime = event.args['runtime-call-stats']; + + // For older traces, check if we had an arg called 'runtime-call-stat' + // instead. + if (v8_runtime === undefined) + v8_runtime = event.args['runtime-call-stat']; + if (v8_runtime !== undefined) { try { var v8_runtime_object = JSON.parse(v8_runtime); @@ -43,10 +54,11 @@ tr.exportTo('pi.m', function() { if (v8_runtime_map[runtime_call] === undefined) { v8_runtime_map[runtime_call] = {count: 0, time: 0}; } - v8_runtime_map[runtime_call].count++; + var runtime_count = v8_runtime_object[runtime_call][0]; + v8_runtime_map[runtime_call].count += runtime_count; var runtime_time = v8_runtime_object[runtime_call][1] / 1000; v8_runtime_map[runtime_call].time += runtime_time; - totalCount++; + totalCount += runtime_count; totalTime += runtime_time; } } @@ -63,7 +75,7 @@ tr.exportTo('pi.m', function() { count: totalCount}); } - pi.FunctionRegistry.register(v8CallStatsDump); + tr.mre.FunctionRegistry.register(v8CallStatsDump); // Exporting for tests. return { diff --git a/catapult/trace_processor/experimental/mappers/v8_map_function.html b/catapult/trace_processor/experimental/mappers/v8_map_function.html index 2942e2fa..ea18e87b 100644 --- a/catapult/trace_processor/experimental/mappers/v8_map_function.html +++ b/catapult/trace_processor/experimental/mappers/v8_map_function.html @@ -4,14 +4,15 @@ Copyright 2015 The Chromium Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. --> + <link rel="import" href="/perf_insights/mappers/slice_cost.html"> <link rel="import" href="/perf_insights/mappers/thread_grouping.html"> <link rel="import" href="/perf_insights/mre/function_handle.html"> <link rel="import" href="/tracing/base/iteration_helpers.html"> +<link rel="import" href="/tracing/base/unit.html"> <link rel="import" href="/tracing/model/helpers/chrome_model_helper.html"> <link rel="import" href="/tracing/model/ir_coverage.html"> <link rel="import" href="/tracing/model/user_model/user_expectation.html"> -<link rel="import" href="/tracing/value/unit.html"> <script> 'use strict'; diff --git a/catapult/trace_processor/third_party/cloudstorage/COPYING b/catapult/trace_processor/third_party/cloudstorage/COPYING new file mode 100644 index 00000000..b09cd785 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/COPYING @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/catapult/trace_processor/third_party/cloudstorage/README.chromium b/catapult/trace_processor/third_party/cloudstorage/README.chromium new file mode 100644 index 00000000..fe387323 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/README.chromium @@ -0,0 +1,12 @@ +Name: Google Cloud Storage Client Library +URL: https://github.com/GoogleCloudPlatform/appengine-gcs-client +License: Apache 2.0 + +Description: +The Google Cloud Storage client library is a client-side library that is not +dependent on any specific version of App Engine for production use. + +Modifications: + + +Full license is in the COPYING file. diff --git a/catapult/trace_processor/third_party/cloudstorage/__init__.py b/catapult/trace_processor/third_party/cloudstorage/__init__.py new file mode 100644 index 00000000..349a021a --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/__init__.py @@ -0,0 +1,29 @@ +# Copyright 2014 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Client Library for Google Cloud Storage.""" + + + + +from .api_utils import RetryParams +from .api_utils import set_default_retry_params +from cloudstorage_api import * +from .common import CSFileStat +from .common import GCSFileStat +from .common import validate_bucket_name +from .common import validate_bucket_path +from .common import validate_file_path +from errors import * +from storage_api import * diff --git a/catapult/trace_processor/third_party/cloudstorage/api_utils.py b/catapult/trace_processor/third_party/cloudstorage/api_utils.py new file mode 100644 index 00000000..680ac6bc --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/api_utils.py @@ -0,0 +1,353 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Util functions and classes for cloudstorage_api.""" + + + +__all__ = ['set_default_retry_params', + 'RetryParams', + ] + +import copy +import httplib +import logging +import math +import os +import threading +import time +import urllib + + +try: + from google.appengine.api import app_identity + from google.appengine.api import urlfetch + from google.appengine.datastore import datastore_rpc + from google.appengine.ext import ndb + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import tasklets + from google.appengine.ext.ndb import utils + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors +except ImportError: + from google.appengine.api import app_identity + from google.appengine.api import urlfetch + from google.appengine.datastore import datastore_rpc + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors + from google.appengine.ext import ndb + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import tasklets + from google.appengine.ext.ndb import utils + + +_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError, + apiproxy_errors.Error, + app_identity.InternalError, + app_identity.BackendDeadlineExceeded) + +_thread_local_settings = threading.local() +_thread_local_settings.default_retry_params = None + + +def set_default_retry_params(retry_params): + """Set a default RetryParams for current thread current request.""" + _thread_local_settings.default_retry_params = copy.copy(retry_params) + + +def _get_default_retry_params(): + """Get default RetryParams for current request and current thread. + + Returns: + A new instance of the default RetryParams. + """ + default = getattr(_thread_local_settings, 'default_retry_params', None) + if default is None or not default.belong_to_current_request(): + return RetryParams() + else: + return copy.copy(default) + + +def _quote_filename(filename): + """Quotes filename to use as a valid URI path. + + Args: + filename: user provided filename. /bucket/filename. + + Returns: + The filename properly quoted to use as URI's path component. + """ + return urllib.quote(filename) + + +def _unquote_filename(filename): + """Unquotes a valid URI path back to its filename. + + This is the opposite of _quote_filename. + + Args: + filename: a quoted filename. /bucket/some%20filename. + + Returns: + The filename unquoted. + """ + return urllib.unquote(filename) + + +def _should_retry(resp): + """Given a urlfetch response, decide whether to retry that request.""" + return (resp.status_code == httplib.REQUEST_TIMEOUT or + (resp.status_code >= 500 and + resp.status_code < 600)) + + +class _RetryWrapper(object): + """A wrapper that wraps retry logic around any tasklet.""" + + def __init__(self, + retry_params, + retriable_exceptions=_RETRIABLE_EXCEPTIONS, + should_retry=lambda r: False): + """Init. + + Args: + retry_params: an RetryParams instance. + retriable_exceptions: a list of exception classes that are retriable. + should_retry: a function that takes a result from the tasklet and returns + a boolean. True if the result should be retried. + """ + self.retry_params = retry_params + self.retriable_exceptions = retriable_exceptions + self.should_retry = should_retry + + @ndb.tasklet + def run(self, tasklet, **kwds): + """Run a tasklet with retry. + + The retry should be transparent to the caller: if no results + are successful, the exception or result from the last retry is returned + to the caller. + + Args: + tasklet: the tasklet to run. + **kwds: keywords arguments to run the tasklet. + + Raises: + The exception from running the tasklet. + + Returns: + The result from running the tasklet. + """ + start_time = time.time() + n = 1 + + while True: + e = None + result = None + got_result = False + + try: + result = yield tasklet(**kwds) + got_result = True + if not self.should_retry(result): + raise ndb.Return(result) + except runtime.DeadlineExceededError: + logging.debug( + 'Tasklet has exceeded request deadline after %s seconds total', + time.time() - start_time) + raise + except self.retriable_exceptions, e: + pass + + if n == 1: + logging.debug('Tasklet is %r', tasklet) + + delay = self.retry_params.delay(n, start_time) + + if delay <= 0: + logging.debug( + 'Tasklet failed after %s attempts and %s seconds in total', + n, time.time() - start_time) + if got_result: + raise ndb.Return(result) + elif e is not None: + raise e + else: + assert False, 'Should never reach here.' + + if got_result: + logging.debug( + 'Got result %r from tasklet.', result) + else: + logging.debug( + 'Got exception "%r" from tasklet.', e) + logging.debug('Retry in %s seconds.', delay) + n += 1 + yield tasklets.sleep(delay) + + +class RetryParams(object): + """Retry configuration parameters.""" + + _DEFAULT_USER_AGENT = 'App Engine Python GCS Client' + + @datastore_rpc._positional(1) + def __init__(self, + backoff_factor=2.0, + initial_delay=0.1, + max_delay=10.0, + min_retries=3, + max_retries=6, + max_retry_period=30.0, + urlfetch_timeout=None, + save_access_token=False, + _user_agent=None): + """Init. + + This object is unique per request per thread. + + Library will retry according to this setting when App Engine Server + can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or + 500-600 response. + + Args: + backoff_factor: exponential backoff multiplier. + initial_delay: seconds to delay for the first retry. + max_delay: max seconds to delay for every retry. + min_retries: min number of times to retry. This value is automatically + capped by max_retries. + max_retries: max number of times to retry. Set this to 0 for no retry. + max_retry_period: max total seconds spent on retry. Retry stops when + this period passed AND min_retries has been attempted. + urlfetch_timeout: timeout for urlfetch in seconds. Could be None, + in which case the value will be chosen by urlfetch module. + save_access_token: persist access token to datastore to avoid + excessive usage of GetAccessToken API. Usually the token is cached + in process and in memcache. In some cases, memcache isn't very + reliable. + _user_agent: The user agent string that you want to use in your requests. + """ + self.backoff_factor = self._check('backoff_factor', backoff_factor) + self.initial_delay = self._check('initial_delay', initial_delay) + self.max_delay = self._check('max_delay', max_delay) + self.max_retry_period = self._check('max_retry_period', max_retry_period) + self.max_retries = self._check('max_retries', max_retries, True, int) + self.min_retries = self._check('min_retries', min_retries, True, int) + if self.min_retries > self.max_retries: + self.min_retries = self.max_retries + + self.urlfetch_timeout = None + if urlfetch_timeout is not None: + self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout) + self.save_access_token = self._check('save_access_token', save_access_token, + True, bool) + self._user_agent = _user_agent or self._DEFAULT_USER_AGENT + + self._request_id = os.getenv('REQUEST_LOG_ID') + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + @classmethod + def _check(cls, name, val, can_be_zero=False, val_type=float): + """Check init arguments. + + Args: + name: name of the argument. For logging purpose. + val: value. Value has to be non negative number. + can_be_zero: whether value can be zero. + val_type: Python type of the value. + + Returns: + The value. + + Raises: + ValueError: when invalid value is passed in. + TypeError: when invalid value type is passed in. + """ + valid_types = [val_type] + if val_type is float: + valid_types.append(int) + + if type(val) not in valid_types: + raise TypeError( + 'Expect type %s for parameter %s' % (val_type.__name__, name)) + if val < 0: + raise ValueError( + 'Value for parameter %s has to be greater than 0' % name) + if not can_be_zero and val == 0: + raise ValueError( + 'Value for parameter %s can not be 0' % name) + return val + + def belong_to_current_request(self): + return os.getenv('REQUEST_LOG_ID') == self._request_id + + def delay(self, n, start_time): + """Calculate delay before the next retry. + + Args: + n: the number of current attempt. The first attempt should be 1. + start_time: the time when retry started in unix time. + + Returns: + Number of seconds to wait before next retry. -1 if retry should give up. + """ + if (n > self.max_retries or + (n > self.min_retries and + time.time() - start_time > self.max_retry_period)): + return -1 + return min( + math.pow(self.backoff_factor, n-1) * self.initial_delay, + self.max_delay) + + +def _run_until_rpc(): + """Eagerly evaluate tasklets until it is blocking on some RPC. + + Usually ndb eventloop el isn't run until some code calls future.get_result(). + + When an async tasklet is called, the tasklet wrapper evaluates the tasklet + code into a generator, enqueues a callback _help_tasklet_along onto + the el.current queue, and returns a future. + + _help_tasklet_along, when called by the el, will + get one yielded value from the generator. If the value if another future, + set up a callback _on_future_complete to invoke _help_tasklet_along + when the dependent future fulfills. If the value if a RPC, set up a + callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills. + Thus _help_tasklet_along drills down + the chain of futures until some future is blocked by RPC. El runs + all callbacks and constantly check pending RPC status. + """ + el = eventloop.get_event_loop() + while el.current: + el.run0() + + +def _eager_tasklet(tasklet): + """Decorator to turn tasklet to run eagerly.""" + + @utils.wrapping(tasklet) + def eager_wrapper(*args, **kwds): + fut = tasklet(*args, **kwds) + _run_until_rpc() + return fut + + return eager_wrapper diff --git a/catapult/trace_processor/third_party/cloudstorage/cloudstorage_api.py b/catapult/trace_processor/third_party/cloudstorage/cloudstorage_api.py new file mode 100644 index 00000000..ba8be862 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/cloudstorage_api.py @@ -0,0 +1,451 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""File Interface for Google Cloud Storage.""" + + + +from __future__ import with_statement + + + +__all__ = ['delete', + 'listbucket', + 'open', + 'stat', + ] + +import logging +import StringIO +import urllib +import xml.etree.cElementTree as ET +from . import api_utils +from . import common +from . import errors +from . import storage_api + + + +def open(filename, + mode='r', + content_type=None, + options=None, + read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, + retry_params=None, + _account_id=None): + """Opens a Google Cloud Storage file and returns it as a File-like object. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + mode: 'r' for reading mode. 'w' for writing mode. + In reading mode, the file must exist. In writing mode, a file will + be created or be overrode. + content_type: The MIME type of the file. str. Only valid in writing mode. + options: A str->basestring dict to specify additional headers to pass to + GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + Supported options are x-goog-acl, x-goog-meta-, cache-control, + content-disposition, and content-encoding. + Only valid in writing mode. + See https://developers.google.com/storage/docs/reference-headers + for details. + read_buffer_size: The buffer size for read. Read keeps a buffer + and prefetches another one. To minimize blocking for large files, + always read by buffer size. To minimize number of RPC requests for + small files, set a large buffer size. Max is 30MB. + retry_params: An instance of api_utils.RetryParams for subsequent calls + to GCS from this file handle. If None, the default one is used. + _account_id: Internal-use only. + + Returns: + A reading or writing buffer that supports File-like interface. Buffer + must be closed after operations are done. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + ValueError: invalid open mode or if content_type or options are specified + in reading mode. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + filename = api_utils._quote_filename(filename) + + if mode == 'w': + common.validate_options(options) + return storage_api.StreamingBuffer(api, filename, content_type, options) + elif mode == 'r': + if content_type or options: + raise ValueError('Options and content_type can only be specified ' + 'for writing mode.') + return storage_api.ReadBuffer(api, + filename, + buffer_size=read_buffer_size) + else: + raise ValueError('Invalid mode %s.' % mode) + + +def delete(filename, retry_params=None, _account_id=None): + """Delete a Google Cloud Storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Raises: + errors.NotFoundError: if the file doesn't exist prior to deletion. + """ + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + common.validate_file_path(filename) + filename = api_utils._quote_filename(filename) + status, resp_headers, content = api.delete_object(filename) + errors.check_status(status, [204], filename, resp_headers=resp_headers, + body=content) + + +def stat(filename, retry_params=None, _account_id=None): + """Get GCSFileStat of a Google Cloud storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + a GCSFileStat object containing info about this file. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + status, headers, content = api.head_object( + api_utils._quote_filename(filename)) + errors.check_status(status, [200], filename, resp_headers=headers, + body=content) + file_stat = common.GCSFileStat( + filename=filename, + st_size=common.get_stored_content_length(headers), + st_ctime=common.http_time_to_posix(headers.get('last-modified')), + etag=headers.get('etag'), + content_type=headers.get('content-type'), + metadata=common.get_metadata(headers)) + + return file_stat + + +def _copy2(src, dst, metadata=None, retry_params=None): + """Copy the file content from src to dst. + + Internal use only! + + Args: + src: /bucket/filename + dst: /bucket/filename + metadata: a dict of metadata for this copy. If None, old metadata is copied. + For example, {'x-goog-meta-foo': 'bar'}. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(src) + common.validate_file_path(dst) + + if metadata is None: + metadata = {} + copy_meta = 'COPY' + else: + copy_meta = 'REPLACE' + metadata.update({'x-goog-copy-source': src, + 'x-goog-metadata-directive': copy_meta}) + + api = storage_api._get_storage_api(retry_params=retry_params) + status, resp_headers, content = api.put_object( + api_utils._quote_filename(dst), headers=metadata) + errors.check_status(status, [200], src, metadata, resp_headers, body=content) + + +def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, + delimiter=None, retry_params=None, _account_id=None): + """Returns a GCSFileStat iterator over a bucket. + + Optional arguments can limit the result to a subset of files under bucket. + + This function has two modes: + 1. List bucket mode: Lists all files in the bucket without any concept of + hierarchy. GCS doesn't have real directory hierarchies. + 2. Directory emulation mode: If you specify the 'delimiter' argument, + it is used as a path separator to emulate a hierarchy of directories. + In this mode, the "path_prefix" argument should end in the delimiter + specified (thus designates a logical directory). The logical directory's + contents, both files and subdirectories, are listed. The names of + subdirectories returned will end with the delimiter. So listbucket + can be called with the subdirectory name to list the subdirectory's + contents. + + Args: + path_prefix: A Google Cloud Storage path of format "/bucket" or + "/bucket/prefix". Only objects whose fullpath starts with the + path_prefix will be returned. + marker: Another path prefix. Only objects whose fullpath starts + lexicographically after marker will be returned (exclusive). + prefix: Deprecated. Use path_prefix. + max_keys: The limit on the number of objects to return. int. + For best performance, specify max_keys only if you know how many objects + you want. Otherwise, this method requests large batches and handles + pagination for you. + delimiter: Use to turn on directory mode. str of one or multiple chars + that your bucket uses as its directory separator. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Examples: + For files "/bucket/a", + "/bucket/bar/1" + "/bucket/foo", + "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", + + Regular mode: + listbucket("/bucket/f", marker="/bucket/foo/1") + will match "/bucket/foo/2/1", "/bucket/foo/3/1". + + Directory mode: + listbucket("/bucket/", delimiter="/") + will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". + listbucket("/bucket/foo/", delimiter="/") + will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" + + Returns: + Regular mode: + A GCSFileStat iterator over matched files ordered by filename. + The iterator returns GCSFileStat objects. filename, etag, st_size, + st_ctime, and is_dir are set. + + Directory emulation mode: + A GCSFileStat iterator over matched files and directories ordered by + name. The iterator returns GCSFileStat objects. For directories, + only the filename and is_dir fields are set. + + The last name yielded can be used as next call's marker. + """ + if prefix: + common.validate_bucket_path(path_prefix) + bucket = path_prefix + else: + bucket, prefix = common._process_path_prefix(path_prefix) + + if marker and marker.startswith(bucket): + marker = marker[len(bucket) + 1:] + + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + options = {} + if marker: + options['marker'] = marker + if max_keys: + options['max-keys'] = max_keys + if prefix: + options['prefix'] = prefix + if delimiter: + options['delimiter'] = delimiter + + return _Bucket(api, bucket, options) + + +class _Bucket(object): + """A wrapper for a GCS bucket as the return value of listbucket.""" + + def __init__(self, api, path, options): + """Initialize. + + Args: + api: storage_api instance. + path: bucket path of form '/bucket'. + options: a dict of listbucket options. Please see listbucket doc. + """ + self._init(api, path, options) + + def _init(self, api, path, options): + self._api = api + self._path = path + self._options = options.copy() + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + self._last_yield = None + self._new_max_keys = self._options.get('max-keys') + + def __getstate__(self): + options = self._options + if self._last_yield: + options['marker'] = self._last_yield.filename[len(self._path) + 1:] + if self._new_max_keys is not None: + options['max-keys'] = self._new_max_keys + return {'api': self._api, + 'path': self._path, + 'options': options} + + def __setstate__(self, state): + self._init(state['api'], state['path'], state['options']) + + def __iter__(self): + """Iter over the bucket. + + Yields: + GCSFileStat: a GCSFileStat for an object in the bucket. + They are ordered by GCSFileStat.filename. + """ + total = 0 + max_keys = self._options.get('max-keys') + + while self._get_bucket_fut: + status, resp_headers, content = self._get_bucket_fut.get_result() + errors.check_status(status, [200], self._path, resp_headers=resp_headers, + body=content, extras=self._options) + + if self._should_get_another_batch(content): + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + else: + self._get_bucket_fut = None + + root = ET.fromstring(content) + dirs = self._next_dir_gen(root) + files = self._next_file_gen(root) + next_file = files.next() + next_dir = dirs.next() + + while ((max_keys is None or total < max_keys) and + not (next_file is None and next_dir is None)): + total += 1 + if next_file is None: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_dir is None: + self._last_yield = next_file + next_file = files.next() + elif next_dir < next_file: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_file < next_dir: + self._last_yield = next_file + next_file = files.next() + else: + logging.error( + 'Should never reach. next file is %r. next dir is %r.', + next_file, next_dir) + if self._new_max_keys: + self._new_max_keys -= 1 + yield self._last_yield + + def _next_file_gen(self, root): + """Generator for next file element in the document. + + Args: + root: root element of the XML tree. + + Yields: + GCSFileStat for the next file. + """ + for e in root.getiterator(common._T_CONTENTS): + st_ctime, size, etag, key = None, None, None, None + for child in e.getiterator('*'): + if child.tag == common._T_LAST_MODIFIED: + st_ctime = common.dt_str_to_posix(child.text) + elif child.tag == common._T_ETAG: + etag = child.text + elif child.tag == common._T_SIZE: + size = child.text + elif child.tag == common._T_KEY: + key = child.text + yield common.GCSFileStat(self._path + '/' + key, + size, etag, st_ctime) + e.clear() + yield None + + def _next_dir_gen(self, root): + """Generator for next directory element in the document. + + Args: + root: root element in the XML tree. + + Yields: + GCSFileStat for the next directory. + """ + for e in root.getiterator(common._T_COMMON_PREFIXES): + yield common.GCSFileStat( + self._path + '/' + e.find(common._T_PREFIX).text, + st_size=None, etag=None, st_ctime=None, is_dir=True) + e.clear() + yield None + + def _should_get_another_batch(self, content): + """Whether to issue another GET bucket call. + + Args: + content: response XML. + + Returns: + True if should, also update self._options for the next request. + False otherwise. + """ + if ('max-keys' in self._options and + self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): + return False + + elements = self._find_elements( + content, set([common._T_IS_TRUNCATED, + common._T_NEXT_MARKER])) + if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': + return False + + next_marker = elements.get(common._T_NEXT_MARKER) + if next_marker is None: + self._options.pop('marker', None) + return False + self._options['marker'] = next_marker + return True + + def _find_elements(self, result, elements): + """Find interesting elements from XML. + + This function tries to only look for specified elements + without parsing the entire XML. The specified elements is better + located near the beginning. + + Args: + result: response XML. + elements: a set of interesting element tags. + + Returns: + A dict from element tag to element value. + """ + element_mapping = {} + result = StringIO.StringIO(result) + for _, e in ET.iterparse(result, events=('end',)): + if not elements: + break + if e.tag in elements: + element_mapping[e.tag] = e.text + elements.remove(e.tag) + return element_mapping diff --git a/catapult/trace_processor/third_party/cloudstorage/common.py b/catapult/trace_processor/third_party/cloudstorage/common.py new file mode 100644 index 00000000..ab9c8df3 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/common.py @@ -0,0 +1,429 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Helpers shared by cloudstorage_stub and cloudstorage_api.""" + + + + + +__all__ = ['CS_XML_NS', + 'CSFileStat', + 'dt_str_to_posix', + 'local_api_url', + 'LOCAL_GCS_ENDPOINT', + 'local_run', + 'get_access_token', + 'get_stored_content_length', + 'get_metadata', + 'GCSFileStat', + 'http_time_to_posix', + 'memory_usage', + 'posix_time_to_http', + 'posix_to_dt_str', + 'set_access_token', + 'validate_options', + 'validate_bucket_name', + 'validate_bucket_path', + 'validate_file_path', + ] + + +import calendar +import datetime +from email import utils as email_utils +import logging +import os +import re + +try: + from google.appengine.api import runtime +except ImportError: + from google.appengine.api import runtime + + +_GCS_BUCKET_REGEX_BASE = r'[a-z0-9\.\-_]{3,63}' +_GCS_BUCKET_REGEX = re.compile(_GCS_BUCKET_REGEX_BASE + r'$') +_GCS_BUCKET_PATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'$') +_GCS_PATH_PREFIX_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'.*') +_GCS_FULLPATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'/.*') +_GCS_METADATA = ['x-goog-meta-', + 'content-disposition', + 'cache-control', + 'content-encoding'] +_GCS_OPTIONS = _GCS_METADATA + ['x-goog-acl'] +CS_XML_NS = 'http://doc.s3.amazonaws.com/2006-03-01' +LOCAL_GCS_ENDPOINT = '/_ah/gcs' +_access_token = '' + + +_MAX_GET_BUCKET_RESULT = 1000 + + +def set_access_token(access_token): + """Set the shared access token to authenticate with Google Cloud Storage. + + When set, the library will always attempt to communicate with the + real Google Cloud Storage with this token even when running on dev appserver. + Note the token could expire so it's up to you to renew it. + + When absent, the library will automatically request and refresh a token + on appserver, or when on dev appserver, talk to a Google Cloud Storage + stub. + + Args: + access_token: you can get one by run 'gsutil -d ls' and copy the + str after 'Bearer'. + """ + global _access_token + _access_token = access_token + + +def get_access_token(): + """Returns the shared access token.""" + return _access_token + + +class GCSFileStat(object): + """Container for GCS file stat.""" + + def __init__(self, + filename, + st_size, + etag, + st_ctime, + content_type=None, + metadata=None, + is_dir=False): + """Initialize. + + For files, the non optional arguments are always set. + For directories, only filename and is_dir is set. + + Args: + filename: a Google Cloud Storage filename of form '/bucket/filename'. + st_size: file size in bytes. long compatible. + etag: hex digest of the md5 hash of the file's content. str. + st_ctime: posix file creation time. float compatible. + content_type: content type. str. + metadata: a str->str dict of user specified options when creating + the file. Possible keys are x-goog-meta-, content-disposition, + content-encoding, and cache-control. + is_dir: True if this represents a directory. False if this is a real file. + """ + self.filename = filename + self.is_dir = is_dir + self.st_size = None + self.st_ctime = None + self.etag = None + self.content_type = content_type + self.metadata = metadata + + if not is_dir: + self.st_size = long(st_size) + self.st_ctime = float(st_ctime) + if etag[0] == '"' and etag[-1] == '"': + etag = etag[1:-1] + self.etag = etag + + def __repr__(self): + if self.is_dir: + return '(directory: %s)' % self.filename + + return ( + '(filename: %(filename)s, st_size: %(st_size)s, ' + 'st_ctime: %(st_ctime)s, etag: %(etag)s, ' + 'content_type: %(content_type)s, ' + 'metadata: %(metadata)s)' % + dict(filename=self.filename, + st_size=self.st_size, + st_ctime=self.st_ctime, + etag=self.etag, + content_type=self.content_type, + metadata=self.metadata)) + + def __cmp__(self, other): + if not isinstance(other, self.__class__): + raise ValueError('Argument to cmp must have the same type. ' + 'Expect %s, got %s', self.__class__.__name__, + other.__class__.__name__) + if self.filename > other.filename: + return 1 + elif self.filename < other.filename: + return -1 + return 0 + + def __hash__(self): + if self.etag: + return hash(self.etag) + return hash(self.filename) + + +CSFileStat = GCSFileStat + + +def get_stored_content_length(headers): + """Return the content length (in bytes) of the object as stored in GCS. + + x-goog-stored-content-length should always be present except when called via + the local dev_appserver. Therefore if it is not present we default to the + standard content-length header. + + Args: + headers: a dict of headers from the http response. + + Returns: + the stored content length. + """ + length = headers.get('x-goog-stored-content-length') + if length is None: + length = headers.get('content-length') + return length + + +def get_metadata(headers): + """Get user defined options from HTTP response headers.""" + return dict((k, v) for k, v in headers.iteritems() + if any(k.lower().startswith(valid) for valid in _GCS_METADATA)) + + +def validate_bucket_name(name): + """Validate a Google Storage bucket name. + + Args: + name: a Google Storage bucket name with no prefix or suffix. + + Raises: + ValueError: if name is invalid. + """ + _validate_path(name) + if not _GCS_BUCKET_REGEX.match(name): + raise ValueError('Bucket should be 3-63 characters long using only a-z,' + '0-9, underscore, dash or dot but got %s' % name) + + +def validate_bucket_path(path): + """Validate a Google Cloud Storage bucket path. + + Args: + path: a Google Storage bucket path. It should have form '/bucket'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_BUCKET_PATH_REGEX.match(path): + raise ValueError('Bucket should have format /bucket ' + 'but got %s' % path) + + +def validate_file_path(path): + """Validate a Google Cloud Storage file path. + + Args: + path: a Google Storage file path. It should have form '/bucket/filename'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_FULLPATH_REGEX.match(path): + raise ValueError('Path should have format /bucket/filename ' + 'but got %s' % path) + + +def _process_path_prefix(path_prefix): + """Validate and process a Google Cloud Stoarge path prefix. + + Args: + path_prefix: a Google Cloud Storage path prefix of format '/bucket/prefix' + or '/bucket/' or '/bucket'. + + Raises: + ValueError: if path is invalid. + + Returns: + a tuple of /bucket and prefix. prefix can be None. + """ + _validate_path(path_prefix) + if not _GCS_PATH_PREFIX_REGEX.match(path_prefix): + raise ValueError('Path prefix should have format /bucket, /bucket/, ' + 'or /bucket/prefix but got %s.' % path_prefix) + bucket_name_end = path_prefix.find('/', 1) + bucket = path_prefix + prefix = None + if bucket_name_end != -1: + bucket = path_prefix[:bucket_name_end] + prefix = path_prefix[bucket_name_end + 1:] or None + return bucket, prefix + + +def _validate_path(path): + """Basic validation of Google Storage paths. + + Args: + path: a Google Storage path. It should have form '/bucket/filename' + or '/bucket'. + + Raises: + ValueError: if path is invalid. + TypeError: if path is not of type basestring. + """ + if not path: + raise ValueError('Path is empty') + if not isinstance(path, basestring): + raise TypeError('Path should be a string but is %s (%s).' % + (path.__class__, path)) + + +def validate_options(options): + """Validate Google Cloud Storage options. + + Args: + options: a str->basestring dict of options to pass to Google Cloud Storage. + + Raises: + ValueError: if option is not supported. + TypeError: if option is not of type str or value of an option + is not of type basestring. + """ + if not options: + return + + for k, v in options.iteritems(): + if not isinstance(k, str): + raise TypeError('option %r should be a str.' % k) + if not any(k.lower().startswith(valid) for valid in _GCS_OPTIONS): + raise ValueError('option %s is not supported.' % k) + if not isinstance(v, basestring): + raise TypeError('value %r for option %s should be of type basestring.' % + (v, k)) + + +def http_time_to_posix(http_time): + """Convert HTTP time format to posix time. + + See http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1 + for http time format. + + Args: + http_time: time in RFC 2616 format. e.g. + "Mon, 20 Nov 1995 19:12:08 GMT". + + Returns: + A float of secs from unix epoch. + """ + if http_time is not None: + return email_utils.mktime_tz(email_utils.parsedate_tz(http_time)) + + +def posix_time_to_http(posix_time): + """Convert posix time to HTML header time format. + + Args: + posix_time: unix time. + + Returns: + A datatime str in RFC 2616 format. + """ + if posix_time: + return email_utils.formatdate(posix_time, usegmt=True) + + +_DT_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +def dt_str_to_posix(dt_str): + """format str to posix. + + datetime str is of format %Y-%m-%dT%H:%M:%S.%fZ, + e.g. 2013-04-12T00:22:27.978Z. According to ISO 8601, T is a separator + between date and time when they are on the same line. + Z indicates UTC (zero meridian). + + A pointer: http://www.cl.cam.ac.uk/~mgk25/iso-time.html + + This is used to parse LastModified node from GCS's GET bucket XML response. + + Args: + dt_str: A datetime str. + + Returns: + A float of secs from unix epoch. By posix definition, epoch is midnight + 1970/1/1 UTC. + """ + parsable, _ = dt_str.split('.') + dt = datetime.datetime.strptime(parsable, _DT_FORMAT) + return calendar.timegm(dt.utctimetuple()) + + +def posix_to_dt_str(posix): + """Reverse of str_to_datetime. + + This is used by GCS stub to generate GET bucket XML response. + + Args: + posix: A float of secs from unix epoch. + + Returns: + A datetime str. + """ + dt = datetime.datetime.utcfromtimestamp(posix) + dt_str = dt.strftime(_DT_FORMAT) + return dt_str + '.000Z' + + +def local_run(): + """Whether we should hit GCS dev appserver stub.""" + server_software = os.environ.get('SERVER_SOFTWARE') + if server_software is None: + return True + if 'remote_api' in server_software: + return False + if server_software.startswith(('Development', 'testutil')): + return True + return False + + +def local_api_url(): + """Return URL for GCS emulation on dev appserver.""" + return 'http://%s%s' % (os.environ.get('HTTP_HOST'), LOCAL_GCS_ENDPOINT) + + +def memory_usage(method): + """Log memory usage before and after a method.""" + def wrapper(*args, **kwargs): + logging.info('Memory before method %s is %s.', + method.__name__, runtime.memory_usage().current()) + result = method(*args, **kwargs) + logging.info('Memory after method %s is %s', + method.__name__, runtime.memory_usage().current()) + return result + return wrapper + + +def _add_ns(tagname): + return '{%(ns)s}%(tag)s' % {'ns': CS_XML_NS, + 'tag': tagname} + + +_T_CONTENTS = _add_ns('Contents') +_T_LAST_MODIFIED = _add_ns('LastModified') +_T_ETAG = _add_ns('ETag') +_T_KEY = _add_ns('Key') +_T_SIZE = _add_ns('Size') +_T_PREFIX = _add_ns('Prefix') +_T_COMMON_PREFIXES = _add_ns('CommonPrefixes') +_T_NEXT_MARKER = _add_ns('NextMarker') +_T_IS_TRUNCATED = _add_ns('IsTruncated') diff --git a/catapult/trace_processor/third_party/cloudstorage/errors.py b/catapult/trace_processor/third_party/cloudstorage/errors.py new file mode 100644 index 00000000..21743806 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/errors.py @@ -0,0 +1,143 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Google Cloud Storage specific Files API calls.""" + + + + + +__all__ = ['AuthorizationError', + 'check_status', + 'Error', + 'FatalError', + 'FileClosedError', + 'ForbiddenError', + 'InvalidRange', + 'NotFoundError', + 'ServerError', + 'TimeoutError', + 'TransientError', + ] + +import httplib + + +class Error(Exception): + """Base error for all gcs operations. + + Error can happen on GAE side or GCS server side. + For details on a particular GCS HTTP response code, see + https://developers.google.com/storage/docs/reference-status#standardcodes + """ + + +class TransientError(Error): + """TransientError could be retried.""" + + +class TimeoutError(TransientError): + """HTTP 408 timeout.""" + + +class FatalError(Error): + """FatalError shouldn't be retried.""" + + +class FileClosedError(FatalError): + """File is already closed. + + This can happen when the upload has finished but 'write' is called on + a stale upload handle. + """ + + +class NotFoundError(FatalError): + """HTTP 404 resource not found.""" + + +class ForbiddenError(FatalError): + """HTTP 403 Forbidden. + + While GCS replies with a 403 error for many reasons, the most common one + is due to bucket permission not correctly setup for your app to access. + """ + + +class AuthorizationError(FatalError): + """HTTP 401 authentication required. + + Unauthorized request has been received by GCS. + + This error is mostly handled by GCS client. GCS client will request + a new access token and retry the request. + """ + + +class InvalidRange(FatalError): + """HTTP 416 RequestRangeNotSatifiable.""" + + +class ServerError(TransientError): + """HTTP >= 500 server side error.""" + + +def check_status(status, expected, path, headers=None, + resp_headers=None, body=None, extras=None): + """Check HTTP response status is expected. + + Args: + status: HTTP response status. int. + expected: a list of expected statuses. A list of ints. + path: filename or a path prefix. + headers: HTTP request headers. + resp_headers: HTTP response headers. + body: HTTP response body. + extras: extra info to be logged verbatim if error occurs. + + Raises: + AuthorizationError: if authorization failed. + NotFoundError: if an object that's expected to exist doesn't. + TimeoutError: if HTTP request timed out. + ServerError: if server experienced some errors. + FatalError: if any other unexpected errors occurred. + """ + if status in expected: + return + + msg = ('Expect status %r from Google Storage. But got status %d.\n' + 'Path: %r.\n' + 'Request headers: %r.\n' + 'Response headers: %r.\n' + 'Body: %r.\n' + 'Extra info: %r.\n' % + (expected, status, path, headers, resp_headers, body, extras)) + + if status == httplib.UNAUTHORIZED: + raise AuthorizationError(msg) + elif status == httplib.FORBIDDEN: + raise ForbiddenError(msg) + elif status == httplib.NOT_FOUND: + raise NotFoundError(msg) + elif status == httplib.REQUEST_TIMEOUT: + raise TimeoutError(msg) + elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE: + raise InvalidRange(msg) + elif (status == httplib.OK and 308 in expected and + httplib.OK not in expected): + raise FileClosedError(msg) + elif status >= 500: + raise ServerError(msg) + else: + raise FatalError(msg) diff --git a/catapult/trace_processor/third_party/cloudstorage/rest_api.py b/catapult/trace_processor/third_party/cloudstorage/rest_api.py new file mode 100644 index 00000000..437c09d7 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/rest_api.py @@ -0,0 +1,258 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Base and helper classes for Google RESTful APIs.""" + + + + + +__all__ = ['add_sync_methods'] + +import random +import time + +from . import api_utils + +try: + from google.appengine.api import app_identity + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import app_identity + from google.appengine.ext import ndb + + + +def _make_sync_method(name): + """Helper to synthesize a synchronous method from an async method name. + + Used by the @add_sync_methods class decorator below. + + Args: + name: The name of the synchronous method. + + Returns: + A method (with first argument 'self') that retrieves and calls + self.<name>, passing its own arguments, expects it to return a + Future, and then waits for and returns that Future's result. + """ + + def sync_wrapper(self, *args, **kwds): + method = getattr(self, name) + future = method(*args, **kwds) + return future.get_result() + + return sync_wrapper + + +def add_sync_methods(cls): + """Class decorator to add synchronous methods corresponding to async methods. + + This modifies the class in place, adding additional methods to it. + If a synchronous method of a given name already exists it is not + replaced. + + Args: + cls: A class. + + Returns: + The same class, modified in place. + """ + for name in cls.__dict__.keys(): + if name.endswith('_async'): + sync_name = name[:-6] + if not hasattr(cls, sync_name): + setattr(cls, sync_name, _make_sync_method(name)) + return cls + + +class _AE_TokenStorage_(ndb.Model): + """Entity to store app_identity tokens in memcache.""" + + token = ndb.StringProperty() + expires = ndb.FloatProperty() + + +@ndb.tasklet +def _make_token_async(scopes, service_account_id): + """Get a fresh authentication token. + + Args: + scopes: A list of scopes. + service_account_id: Internal-use only. + + Raises: + An ndb.Return with a tuple (token, expiration_time) where expiration_time is + seconds since the epoch. + """ + rpc = app_identity.create_rpc() + app_identity.make_get_access_token_call(rpc, scopes, service_account_id) + token, expires_at = yield rpc + raise ndb.Return((token, expires_at)) + + +class _RestApi(object): + """Base class for REST-based API wrapper classes. + + This class manages authentication tokens and request retries. All + APIs are available as synchronous and async methods; synchronous + methods are synthesized from async ones by the add_sync_methods() + function in this module. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + """ + + def __init__(self, scopes, service_account_id=None, token_maker=None, + retry_params=None): + """Constructor. + + Args: + scopes: A scope or a list of scopes. + service_account_id: Internal use only. + token_maker: An asynchronous function of the form + (scopes, service_account_id) -> (token, expires). + retry_params: An instance of api_utils.RetryParams. If None, the + default for current thread will be used. + """ + + if isinstance(scopes, basestring): + scopes = [scopes] + self.scopes = scopes + self.service_account_id = service_account_id + self.make_token_async = token_maker or _make_token_async + if not retry_params: + retry_params = api_utils._get_default_retry_params() + self.retry_params = retry_params + self.user_agent = {'User-Agent': retry_params._user_agent} + self.expiration_headroom = random.randint(60, 240) + + def __getstate__(self): + """Store state as part of serialization/pickling.""" + return {'scopes': self.scopes, + 'id': self.service_account_id, + 'a_maker': (None if self.make_token_async == _make_token_async + else self.make_token_async), + 'retry_params': self.retry_params, + 'expiration_headroom': self.expiration_headroom} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling.""" + self.__init__(state['scopes'], + service_account_id=state['id'], + token_maker=state['a_maker'], + retry_params=state['retry_params']) + self.expiration_headroom = state['expiration_headroom'] + + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Issue one HTTP request. + + It performs async retries using tasklets. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + + Yields: + The async fetch of the url. + """ + retry_wrapper = api_utils._RetryWrapper( + self.retry_params, + retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS, + should_retry=api_utils._should_retry) + resp = yield retry_wrapper.run( + self.urlfetch_async, + url=url, + method=method, + headers=headers, + payload=payload, + deadline=deadline, + callback=callback, + follow_redirects=False) + raise ndb.Return((resp.status_code, resp.headers, resp.content)) + + @ndb.tasklet + def get_token_async(self, refresh=False): + """Get an authentication token. + + The token is cached in memcache, keyed by the scopes argument. + Uses a random token expiration headroom value generated in the constructor + to eliminate a burst of GET_ACCESS_TOKEN API requests. + + Args: + refresh: If True, ignore a cached token; default False. + + Yields: + An authentication token. This token is guaranteed to be non-expired. + """ + key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) + ts = yield _AE_TokenStorage_.get_by_id_async( + key, use_cache=True, use_memcache=True, + use_datastore=self.retry_params.save_access_token) + if refresh or ts is None or ts.expires < ( + time.time() + self.expiration_headroom): + token, expires_at = yield self.make_token_async( + self.scopes, self.service_account_id) + timeout = int(expires_at - time.time()) + ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) + if timeout > 0: + yield ts.put_async(memcache_timeout=timeout, + use_datastore=self.retry_params.save_access_token, + use_cache=True, use_memcache=True) + raise ndb.Return(ts.token) + + @ndb.tasklet + def urlfetch_async(self, url, method='GET', headers=None, + payload=None, deadline=None, callback=None, + follow_redirects=False): + """Make an async urlfetch() call. + + This is an async wrapper around urlfetch(). It adds an authentication + header. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + follow_redirects: whether or not to follow redirects. + + Yields: + This returns a Future despite not being decorated with @ndb.tasklet! + """ + headers = {} if headers is None else dict(headers) + headers.update(self.user_agent) + self.token = yield self.get_token_async() + if self.token: + headers['authorization'] = 'OAuth ' + self.token + + deadline = deadline or self.retry_params.urlfetch_timeout + + ctx = ndb.get_context() + resp = yield ctx.urlfetch( + url, payload=payload, method=method, + headers=headers, follow_redirects=follow_redirects, + deadline=deadline, callback=callback) + raise ndb.Return(resp) + + +_RestApi = add_sync_methods(_RestApi) diff --git a/catapult/trace_processor/third_party/cloudstorage/storage_api.py b/catapult/trace_processor/third_party/cloudstorage/storage_api.py new file mode 100644 index 00000000..910c365b --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/storage_api.py @@ -0,0 +1,887 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Python wrappers for the Google Storage RESTful API.""" + + + + + +__all__ = ['ReadBuffer', + 'StreamingBuffer', + ] + +import collections +import os +import urlparse + +from . import api_utils +from . import common +from . import errors +from . import rest_api + +try: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb + + + +def _get_storage_api(retry_params, account_id=None): + """Returns storage_api instance for API methods. + + Args: + retry_params: An instance of api_utils.RetryParams. If none, + thread's default will be used. + account_id: Internal-use only. + + Returns: + A storage_api instance to handle urlfetch work to GCS. + On dev appserver, this instance by default will talk to a local stub + unless common.ACCESS_TOKEN is set. That token will be used to talk + to the real GCS. + """ + + + api = _StorageApi(_StorageApi.full_control_scope, + service_account_id=account_id, + retry_params=retry_params) + if common.local_run() and not common.get_access_token(): + api.api_url = common.local_api_url() + if common.get_access_token(): + api.token = common.get_access_token() + return api + + +class _StorageApi(rest_api._RestApi): + """A simple wrapper for the Google Storage RESTful API. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + + All async methods have similar args and returns. + + Args: + path: The path to the Google Storage object or bucket, e.g. + '/mybucket/myfile' or '/mybucket'. + **kwd: Options for urlfetch. e.g. + headers={'content-type': 'text/plain'}, payload='blah'. + + Returns: + A ndb Future. When fulfilled, future.get_result() should return + a tuple of (status, headers, content) that represents a HTTP response + of Google Cloud Storage XML API. + """ + + api_url = 'https://storage.googleapis.com' + read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' + read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' + full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' + + def __getstate__(self): + """Store state as part of serialization/pickling. + + Returns: + A tuple (of dictionaries) with the state of this object + """ + return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the tuple from a __getstate__ call + """ + superstate, localstate = state + super(_StorageApi, self).__setstate__(superstate) + self.api_url = localstate['api_url'] + + @api_utils._eager_tasklet + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Inherit docs. + + This method translates urlfetch exceptions to more service specific ones. + """ + if headers is None: + headers = {} + if 'x-goog-api-version' not in headers: + headers['x-goog-api-version'] = '2' + headers['accept-encoding'] = 'gzip, *' + try: + resp_tuple = yield super(_StorageApi, self).do_request_async( + url, method=method, headers=headers, payload=payload, + deadline=deadline, callback=callback) + except urlfetch.DownloadError, e: + raise errors.TimeoutError( + 'Request to Google Cloud Storage timed out.', e) + + raise ndb.Return(resp_tuple) + + + def post_object_async(self, path, **kwds): + """POST to an object.""" + return self.do_request_async(self.api_url + path, 'POST', **kwds) + + def put_object_async(self, path, **kwds): + """PUT an object.""" + return self.do_request_async(self.api_url + path, 'PUT', **kwds) + + def get_object_async(self, path, **kwds): + """GET an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + def delete_object_async(self, path, **kwds): + """DELETE an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'DELETE', **kwds) + + def head_object_async(self, path, **kwds): + """HEAD an object. + + Depending on request headers, HEAD returns various object properties, + e.g. Content-Length, Last-Modified, and ETag. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'HEAD', **kwds) + + def get_bucket_async(self, path, **kwds): + """GET a bucket.""" + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + +_StorageApi = rest_api.add_sync_methods(_StorageApi) + + +class ReadBuffer(object): + """A class for reading Google storage files.""" + + DEFAULT_BUFFER_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE + + def __init__(self, + api, + path, + buffer_size=DEFAULT_BUFFER_SIZE, + max_request_size=MAX_REQUEST_SIZE): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + buffer_size: buffer size. The ReadBuffer keeps + one buffer. But there may be a pending future that contains + a second buffer. This size must be less than max_request_size. + max_request_size: Max bytes to request in one urlfetch. + """ + self._api = api + self._path = path + self.name = api_utils._unquote_filename(path) + self.closed = False + + assert buffer_size <= max_request_size + self._buffer_size = buffer_size + self._max_request_size = max_request_size + self._offset = 0 + self._buffer = _Buffer() + self._etag = None + + get_future = self._get_segment(0, self._buffer_size, check_response=False) + + status, headers, content = self._api.head_object(path) + errors.check_status(status, [200], path, resp_headers=headers, body=content) + self._file_size = long(common.get_stored_content_length(headers)) + self._check_etag(headers.get('etag')) + + self._buffer_future = None + + if self._file_size != 0: + content, check_response_closure = get_future.get_result() + check_response_closure() + self._buffer.reset(content) + self._request_next_buffer() + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the read buffer are not stored, only the current offset for + data read by the client. A new read buffer is established at unpickling. + The head information for the object (file size and etag) are stored to + reduce startup and ensure the file has not changed. + + Returns: + A dictionary with the state of this object + """ + return {'api': self._api, + 'path': self._path, + 'buffer_size': self._buffer_size, + 'request_size': self._max_request_size, + 'etag': self._etag, + 'size': self._file_size, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + + Along with restoring the state, pre-fetch the next read buffer. + """ + self._api = state['api'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + self._buffer_size = state['buffer_size'] + self._max_request_size = state['request_size'] + self._etag = state['etag'] + self._file_size = state['size'] + self._offset = state['offset'] + self._buffer = _Buffer() + self.closed = state['closed'] + self._buffer_future = None + if self._remaining() and not self.closed: + self._request_next_buffer() + + def __iter__(self): + """Iterator interface. + + Note the ReadBuffer container itself is the iterator. It's + (quote PEP0234) + 'destructive: they consumes all the values and a second iterator + cannot easily be created that iterates independently over the same values. + You could open the file for the second time, or seek() to the beginning.' + + Returns: + Self. + """ + return self + + def next(self): + line = self.readline() + if not line: + raise StopIteration() + return line + + def readline(self, size=-1): + """Read one line delimited by '\n' from the file. + + A trailing newline character is kept in the string. It may be absent when a + file ends with an incomplete line. If the size argument is non-negative, + it specifies the maximum string size (counting the newline) to return. + A negative size is the same as unspecified. Empty string is returned + only when EOF is encountered immediately. + + Args: + size: Maximum number of bytes to read. If not specified, readline stops + only on '\n' or EOF. + + Returns: + The data read as a string. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if size == 0 or not self._remaining(): + return '' + + data_list = [] + newline_offset = self._buffer.find_newline(size) + while newline_offset < 0: + data = self._buffer.read(size) + size -= len(data) + self._offset += len(data) + data_list.append(data) + if size == 0 or not self._remaining(): + return ''.join(data_list) + self._buffer.reset(self._buffer_future.get_result()) + self._request_next_buffer() + newline_offset = self._buffer.find_newline(size) + + data = self._buffer.read_to_offset(newline_offset + 1) + self._offset += len(data) + data_list.append(data) + + return ''.join(data_list) + + def read(self, size=-1): + """Read data from RAW file. + + Args: + size: Number of bytes to read as integer. Actual number of bytes + read is always equal to size unless EOF is reached. If size is + negative or unspecified, read the entire file. + + Returns: + data read as str. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if not self._remaining(): + return '' + + data_list = [] + while True: + remaining = self._buffer.remaining() + if size >= 0 and size < remaining: + data_list.append(self._buffer.read(size)) + self._offset += size + break + else: + size -= remaining + self._offset += remaining + data_list.append(self._buffer.read()) + + if self._buffer_future is None: + if size < 0 or size >= self._remaining(): + needs = self._remaining() + else: + needs = size + data_list.extend(self._get_segments(self._offset, needs)) + self._offset += needs + break + + if self._buffer_future: + self._buffer.reset(self._buffer_future.get_result()) + self._buffer_future = None + + if self._buffer_future is None: + self._request_next_buffer() + return ''.join(data_list) + + def _remaining(self): + return self._file_size - self._offset + + def _request_next_buffer(self): + """Request next buffer. + + Requires self._offset and self._buffer are in consistent state. + """ + self._buffer_future = None + next_offset = self._offset + self._buffer.remaining() + if next_offset != self._file_size: + self._buffer_future = self._get_segment(next_offset, + self._buffer_size) + + def _get_segments(self, start, request_size): + """Get segments of the file from Google Storage as a list. + + A large request is broken into segments to avoid hitting urlfetch + response size limit. Each segment is returned from a separate urlfetch. + + Args: + start: start offset to request. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. + + Returns: + A list of file segments in order + """ + if not request_size: + return [] + + end = start + request_size + futures = [] + + while request_size > self._max_request_size: + futures.append(self._get_segment(start, self._max_request_size)) + request_size -= self._max_request_size + start += self._max_request_size + if start < end: + futures.append(self._get_segment(start, end-start)) + return [fut.get_result() for fut in futures] + + @ndb.tasklet + def _get_segment(self, start, request_size, check_response=True): + """Get a segment of the file from Google Storage. + + Args: + start: start offset of the segment. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. Have to be small enough + for a single urlfetch request. May go over the logical range of the + file. + check_response: True to check the validity of GCS response automatically + before the future returns. False otherwise. See Yields section. + + Yields: + If check_response is True, the segment [start, start + request_size) + of the file. + Otherwise, a tuple. The first element is the unverified file segment. + The second element is a closure that checks response. Caller should + first invoke the closure before consuing the file segment. + + Raises: + ValueError: if the file has changed while reading. + """ + end = start + request_size - 1 + content_range = '%d-%d' % (start, end) + headers = {'Range': 'bytes=' + content_range} + status, resp_headers, content = yield self._api.get_object_async( + self._path, headers=headers) + def _checker(): + errors.check_status(status, [200, 206], self._path, headers, + resp_headers, body=content) + self._check_etag(resp_headers.get('etag')) + if check_response: + _checker() + raise ndb.Return(content) + raise ndb.Return(content, _checker) + + def _check_etag(self, etag): + """Check if etag is the same across requests to GCS. + + If self._etag is None, set it. If etag is set, check that the new + etag equals the old one. + + In the __init__ method, we fire one HEAD and one GET request using + ndb tasklet. One of them would return first and set the first value. + + Args: + etag: etag from a GCS HTTP response. None if etag is not part of the + response header. It could be None for example in the case of GCS + composite file. + + Raises: + ValueError: if two etags are not equal. + """ + if etag is None: + return + elif self._etag is None: + self._etag = etag + elif self._etag != etag: + raise ValueError('File on GCS has changed while reading.') + + def close(self): + self.closed = True + self._buffer = None + self._buffer_future = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def seek(self, offset, whence=os.SEEK_SET): + """Set the file's current offset. + + Note if the new offset is out of bound, it is adjusted to either 0 or EOF. + + Args: + offset: seek offset as number. + whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), + os.SEEK_CUR (seek relative to the current position), and os.SEEK_END + (seek relative to the end, offset should be negative). + + Raises: + IOError: When this buffer is closed. + ValueError: When whence is invalid. + """ + self._check_open() + + self._buffer.reset() + self._buffer_future = None + + if whence == os.SEEK_SET: + self._offset = offset + elif whence == os.SEEK_CUR: + self._offset += offset + elif whence == os.SEEK_END: + self._offset = self._file_size + offset + else: + raise ValueError('Whence mode %s is invalid.' % str(whence)) + + self._offset = min(self._offset, self._file_size) + self._offset = max(self._offset, 0) + if self._remaining(): + self._request_next_buffer() + + def tell(self): + """Tell the file's current offset. + + Returns: + current offset in reading this file. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + return self._offset + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return True + + def readable(self): + return True + + def writable(self): + return False + + +class _Buffer(object): + """In memory buffer.""" + + def __init__(self): + self.reset() + + def reset(self, content='', offset=0): + self._buffer = content + self._offset = offset + + def read(self, size=-1): + """Returns bytes from self._buffer and update related offsets. + + Args: + size: number of bytes to read starting from current offset. + Read the entire buffer if negative. + + Returns: + Requested bytes from buffer. + """ + if size < 0: + offset = len(self._buffer) + else: + offset = self._offset + size + return self.read_to_offset(offset) + + def read_to_offset(self, offset): + """Returns bytes from self._buffer and update related offsets. + + Args: + offset: read from current offset to this offset, exclusive. + + Returns: + Requested bytes from buffer. + """ + assert offset >= self._offset + result = self._buffer[self._offset: offset] + self._offset += len(result) + return result + + def remaining(self): + return len(self._buffer) - self._offset + + def find_newline(self, size=-1): + """Search for newline char in buffer starting from current offset. + + Args: + size: number of bytes to search. -1 means all. + + Returns: + offset of newline char in buffer. -1 if doesn't exist. + """ + if size < 0: + return self._buffer.find('\n', self._offset) + return self._buffer.find('\n', self._offset, self._offset + size) + + +class StreamingBuffer(object): + """A class for creating large objects using the 'resumable' API. + + The API is a subset of the Python writable stream API sufficient to + support writing zip files using the zipfile module. + + The exact sequence of calls and use of headers is documented at + https://developers.google.com/storage/docs/developer-guide#unknownresumables + """ + + _blocksize = 256 * 1024 + + _flushsize = 8 * _blocksize + + _maxrequestsize = 9 * 4 * _blocksize + + def __init__(self, + api, + path, + content_type=None, + gcs_headers=None): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + content_type: Optional content-type; Default value is + delegate to Google Cloud Storage. + gcs_headers: additional gs headers as a str->str dict, e.g + {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + Raises: + IOError: When this location can not be found. + """ + assert self._maxrequestsize > self._blocksize + assert self._maxrequestsize % self._blocksize == 0 + assert self._maxrequestsize >= self._flushsize + + self._api = api + self._path = path + + self.name = api_utils._unquote_filename(path) + self.closed = False + + self._buffer = collections.deque() + self._buffered = 0 + self._written = 0 + self._offset = 0 + + headers = {'x-goog-resumable': 'start'} + if content_type: + headers['content-type'] = content_type + if gcs_headers: + headers.update(gcs_headers) + status, resp_headers, content = self._api.post_object(path, headers=headers) + errors.check_status(status, [201], path, headers, resp_headers, + body=content) + loc = resp_headers.get('location') + if not loc: + raise IOError('No location header found in 201 response') + parsed = urlparse.urlparse(loc) + self._path_with_token = '%s?%s' % (self._path, parsed.query) + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the write buffer are stored. Writes to the underlying + storage are required to be on block boundaries (_blocksize) except for the + last write. In the worst case the pickled version of this object may be + slightly larger than the blocksize. + + Returns: + A dictionary with the state of this object + + """ + return {'api': self._api, + 'path': self._path, + 'path_token': self._path_with_token, + 'buffer': self._buffer, + 'buffered': self._buffered, + 'written': self._written, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + """ + self._api = state['api'] + self._path_with_token = state['path_token'] + self._buffer = state['buffer'] + self._buffered = state['buffered'] + self._written = state['written'] + self._offset = state['offset'] + self.closed = state['closed'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + + def write(self, data): + """Write some bytes. + + Args: + data: data to write. str. + + Raises: + TypeError: if data is not of type str. + """ + self._check_open() + if not isinstance(data, str): + raise TypeError('Expected str but got %s.' % type(data)) + if not data: + return + self._buffer.append(data) + self._buffered += len(data) + self._offset += len(data) + if self._buffered >= self._flushsize: + self._flush() + + def flush(self): + """Flush as much as possible to GCS. + + GCS *requires* that all writes except for the final one align on + 256KB boundaries. So the internal buffer may still have < 256KB bytes left + after flush. + """ + self._check_open() + self._flush(finish=False) + + def tell(self): + """Return the total number of bytes passed to write() so far. + + (There is no seek() method.) + """ + return self._offset + + def close(self): + """Flush the buffer and finalize the file. + + When this returns the new file is available for reading. + """ + if not self.closed: + self.closed = True + self._flush(finish=True) + self._buffer = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def _flush(self, finish=False): + """Internal API to flush. + + Buffer is flushed to GCS only when the total amount of buffered data is at + least self._blocksize, or to flush the final (incomplete) block of + the file with finish=True. + """ + while ((finish and self._buffered >= 0) or + (not finish and self._buffered >= self._blocksize)): + tmp_buffer = [] + tmp_buffer_len = 0 + + excess = 0 + while self._buffer: + buf = self._buffer.popleft() + size = len(buf) + self._buffered -= size + tmp_buffer.append(buf) + tmp_buffer_len += size + if tmp_buffer_len >= self._maxrequestsize: + excess = tmp_buffer_len - self._maxrequestsize + break + if not finish and ( + tmp_buffer_len % self._blocksize + self._buffered < + self._blocksize): + excess = tmp_buffer_len % self._blocksize + break + + if excess: + over = tmp_buffer.pop() + size = len(over) + assert size >= excess + tmp_buffer_len -= size + head, tail = over[:-excess], over[-excess:] + self._buffer.appendleft(tail) + self._buffered += len(tail) + if head: + tmp_buffer.append(head) + tmp_buffer_len += len(head) + + data = ''.join(tmp_buffer) + file_len = '*' + if finish and not self._buffered: + file_len = self._written + len(data) + self._send_data(data, self._written, file_len) + self._written += len(data) + if file_len != '*': + break + + def _send_data(self, data, start_offset, file_len): + """Send the block to the storage service. + + This is a utility method that does not modify self. + + Args: + data: data to send in str. + start_offset: start offset of the data in relation to the file. + file_len: an int if this is the last data to append to the file. + Otherwise '*'. + """ + headers = {} + end_offset = start_offset + len(data) - 1 + + if data: + headers['content-range'] = ('bytes %d-%d/%s' % + (start_offset, end_offset, file_len)) + else: + headers['content-range'] = ('bytes */%s' % file_len) + + status, response_headers, content = self._api.put_object( + self._path_with_token, payload=data, headers=headers) + if file_len == '*': + expected = 308 + else: + expected = 200 + errors.check_status(status, [expected], self._path, headers, + response_headers, content, + {'upload_path': self._path_with_token}) + + def _get_offset_from_gcs(self): + """Get the last offset that has been written to GCS. + + This is a utility method that does not modify self. + + Returns: + an int of the last offset written to GCS by this upload, inclusive. + -1 means nothing has been written. + """ + headers = {'content-range': 'bytes */*'} + status, response_headers, content = self._api.put_object( + self._path_with_token, headers=headers) + errors.check_status(status, [308], self._path, headers, + response_headers, content, + {'upload_path': self._path_with_token}) + val = response_headers.get('range') + if val is None: + return -1 + _, offset = val.rsplit('-', 1) + return int(offset) + + def _force_close(self, file_length=None): + """Close this buffer on file_length. + + Finalize this upload immediately on file_length. + Contents that are still in memory will not be uploaded. + + This is a utility method that does not modify self. + + Args: + file_length: file length. Must match what has been uploaded. If None, + it will be queried from GCS. + """ + if file_length is None: + file_length = self._get_offset_from_gcs() + 1 + self._send_data('', 0, file_length) + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return False + + def readable(self): + return False + + def writable(self): + return True diff --git a/catapult/trace_processor/third_party/cloudstorage/test_utils.py b/catapult/trace_processor/third_party/cloudstorage/test_utils.py new file mode 100644 index 00000000..e4d82477 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/test_utils.py @@ -0,0 +1,25 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Utils for testing.""" + + +class MockUrlFetchResult(object): + + def __init__(self, status, headers, body): + self.status_code = status + self.headers = headers + self.content = body + self.content_was_truncated = False + self.final_url = None diff --git a/catapult/trace_processor/trace_processor/process_traces.py b/catapult/trace_processor/trace_processor/process_traces.py index 70970f36..017dfd7e 100644 --- a/catapult/trace_processor/trace_processor/process_traces.py +++ b/catapult/trace_processor/trace_processor/process_traces.py @@ -7,11 +7,11 @@ import os import subprocess import sys -from perf_insights.mre import function_handle -from perf_insights.mre import map_runner -from perf_insights.mre import progress_reporter -from perf_insights.mre import file_handle -from perf_insights.mre import job as job_module +from tracing.mre import function_handle +from tracing.mre import map_runner +from tracing.mre import progress_reporter +from tracing.mre import file_handle +from tracing.mre import job as job_module from tracing.metrics import discover from tracing.metrics import metric_runner @@ -41,7 +41,7 @@ def _GetExitCodeForResults(results): def _ProcessTracesWithMetric(metric_name, traces, output_file): - results = metric_runner.RunMetricOnTraces(traces, metric_name) + results = metric_runner.RunMetricOnTraces(traces, [metric_name]) results_dict = {k: v.AsDict() for k, v in results.iteritems()} _DumpToOutputJson(results_dict, output_file) diff --git a/catapult/trace_processor/trace_uploader/__init__.py b/catapult/trace_processor/trace_uploader/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/catapult/trace_processor/trace_uploader/__init__.py diff --git a/catapult/trace_processor/trace_uploader/app.yaml b/catapult/trace_processor/trace_uploader/app.yaml new file mode 100644 index 00000000..6ba58178 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/app.yaml @@ -0,0 +1,20 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +api_version: 1 +runtime: python27 +threadsafe: true + +builtins: +- remote_api: on + +handlers: +- url: /upload + script: trace_uploader.endpoints.upload.app + secure: always + +- url: /corpus_cleanup + script: trace_uploader.endpoints.corpus_cleanup.app + secure: always + login: admin diff --git a/catapult/trace_processor/trace_uploader/appengine_config.py b/catapult/trace_processor/trace_uploader/appengine_config.py new file mode 100644 index 00000000..5f5c04a4 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/appengine_config.py @@ -0,0 +1,52 @@ +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""App Engine config. + +This module is loaded before others and can be used to set up the +App Engine environment. See: + https://cloud.google.com/appengine/docs/python/tools/appengineconfig +""" + +import os + +from google.appengine.ext import vendor + +appstats_SHELL_OK = True + +# Directories in catapult/third_party required by uploader/corpus cleanup. +THIRD_PARTY_LIBRARIES = [ + 'apiclient', + 'uritemplate', +] +# Directories in trace_processor/third_party required by uploader/corpus +# cleanup. +THIRD_PARTY_LIBRARIES_IN_TRACE_PROCESSOR = [ + 'cloudstorage', +] +# Libraries bundled with the App Engine SDK. +THIRD_PARTY_LIBRARIES_IN_SDK = [ + 'httplib2', + 'oauth2client', + 'six', +] + +def _AddThirdPartyLibraries(): + """Registers the third party libraries with App Engine. + + In order for third-party libraries to be available in the App Engine + runtime environment, they must be added with vendor.add. The directories + added this way must be inside the App Engine project directory. + """ + # The deploy script is expected to add links to third party libraries + # before deploying. If the directories aren't there (e.g. when running tests) + # then just ignore it. + for library_dir in (THIRD_PARTY_LIBRARIES + + THIRD_PARTY_LIBRARIES_IN_TRACE_PROCESSOR + + THIRD_PARTY_LIBRARIES_IN_SDK): + if os.path.exists(library_dir): + vendor.add(os.path.join(os.path.dirname(__file__), library_dir)) + + +_AddThirdPartyLibraries() diff --git a/catapult/trace_processor/trace_uploader/cloud_config.py b/catapult/trace_processor/trace_uploader/cloud_config.py new file mode 100644 index 00000000..ae1fa6ac --- /dev/null +++ b/catapult/trace_processor/trace_uploader/cloud_config.py @@ -0,0 +1,60 @@ +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +import os +import logging + +from google.appengine.api import app_identity +from google.appengine.ext import ndb + + +def _is_devserver(): + server_software = os.environ.get('SERVER_SOFTWARE', '') + return server_software and server_software.startswith('Development') + +_DEFAULT_CATAPULT_PATH = '/catapult' +_DEFAULT_TARGET = 'prod' +if _is_devserver(): + _DEFAULT_TARGET = 'test' + +_CONFIG_KEY_NAME = 'pi_cloud_mapper_config_%s' % _DEFAULT_TARGET + +_DEFAULT_CONTROL_BUCKET_PATH = 'gs://%s/%s' % ( + app_identity.get_default_gcs_bucket_name(), _DEFAULT_TARGET) + +_DEFAULT_SOURCE_DISK_IMAGE = ('https://www.googleapis.com/compute/v1/projects/' + 'debian-cloud/global/images/debian-8-jessie-v20151104') + +_GCE_DEFAULT_ZONE = 'us-central1-f' +_GCE_DEFAULT_MACHINE_TYPE = 'n1-standard-1' + + +class CloudConfig(ndb.Model): + control_bucket_path = ndb.StringProperty(default=_DEFAULT_CONTROL_BUCKET_PATH) + setup_scheme = 'http' if _is_devserver() else 'https' + default_corpus = ndb.StringProperty( + default='%s://%s' % ( + setup_scheme, app_identity.get_default_version_hostname())) + urlfetch_service_id = ndb.StringProperty(default='') + gce_project_name = ndb.StringProperty( + default=app_identity.get_application_id()) + gce_source_disk_image = ndb.StringProperty(default=_DEFAULT_SOURCE_DISK_IMAGE) + gce_zone = ndb.StringProperty(default=_GCE_DEFAULT_ZONE) + gce_machine_type = ndb.StringProperty(default=_GCE_DEFAULT_MACHINE_TYPE) + trace_upload_bucket = ndb.StringProperty( + default='%s/traces' % app_identity.get_default_gcs_bucket_name()) + catapult_path = ndb.StringProperty(default=_DEFAULT_CATAPULT_PATH) + + +def Get(): + config = CloudConfig.get_by_id(_CONFIG_KEY_NAME) + if not config: + logging.warning('CloudConfig found, creating a default one.') + config = CloudConfig(id=_CONFIG_KEY_NAME) + + if 'GCS_BUCKET_NAME' in os.environ: + config.trace_upload_bucket = os.environ['GCS_BUCKET_NAME'] + + config.put() + + return config diff --git a/catapult/trace_processor/trace_uploader/cron.yaml b/catapult/trace_processor/trace_uploader/cron.yaml new file mode 100644 index 00000000..1c4dcb2a --- /dev/null +++ b/catapult/trace_processor/trace_uploader/cron.yaml @@ -0,0 +1,8 @@ +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +cron: +- description: Hourly cleanup job + url: /corpus_cleanup + schedule: every 1 hours diff --git a/catapult/trace_processor/trace_uploader/dispatch.yaml b/catapult/trace_processor/trace_uploader/dispatch.yaml new file mode 100644 index 00000000..f33e5193 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/dispatch.yaml @@ -0,0 +1,6 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +- url: "*/*" + module: default diff --git a/catapult/trace_processor/trace_uploader/endpoints/__init__.py b/catapult/trace_processor/trace_uploader/endpoints/__init__.py new file mode 100644 index 00000000..047b03ca --- /dev/null +++ b/catapult/trace_processor/trace_uploader/endpoints/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. diff --git a/catapult/trace_processor/trace_uploader/endpoints/corpus_cleanup.py b/catapult/trace_processor/trace_uploader/endpoints/corpus_cleanup.py new file mode 100644 index 00000000..5fd3cb56 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/endpoints/corpus_cleanup.py @@ -0,0 +1,57 @@ +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import datetime +import logging +import webapp2 + +from . import cloud_config +from .trace_info import TraceInfo +import cloudstorage as gcs + +BATCH_SIZE = 100 +MAX_DAYS = 30 +DEFAULT_RETRY_PARAMS = gcs.RetryParams(initial_delay=0.2, + max_delay=5.0, + backoff_factor=2, + max_retry_period=15) + +class CorpusCleanupPage(webapp2.RequestHandler): + + def _delete_traces(self): + trace_bucket = cloud_config.Get().trace_upload_bucket + deleted_traces = 0 + + oldest_time = datetime.datetime.now() - datetime.timedelta(days=MAX_DAYS) + q = TraceInfo.query(TraceInfo.date < oldest_time) + + for key in q.fetch(BATCH_SIZE, keys_only=True): + gcs_path = '/%s/%s.gz' % (trace_bucket, key.id()) + try: + gcs.delete(gcs_path, retry_params=DEFAULT_RETRY_PARAMS) + except gcs.NotFoundError: + pass + + key.delete() + deleted_traces += 1 + + return deleted_traces + + def get(self): + self.response.out.write('<html><body>') + + while True: + deleted_traces = self._delete_traces() + self.response.out.write("<br><div><bold>Traces Cleaned:</bold> %s</div>" + % deleted_traces) + + logging.info('Daily cleanup deleted %s traces.' % deleted_traces) + + if deleted_traces < BATCH_SIZE: + break + + self.response.out.write('</body></html>') + + +app = webapp2.WSGIApplication([('/corpus_cleanup', CorpusCleanupPage)]) diff --git a/catapult/trace_processor/trace_uploader/endpoints/upload.py b/catapult/trace_processor/trace_uploader/endpoints/upload.py new file mode 100644 index 00000000..642fcb62 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/endpoints/upload.py @@ -0,0 +1,82 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import json +import logging +import os +import re +import webapp2 +import uuid + +from . import trace_info +from . import cloud_config + +import cloudstorage as gcs + +from google.appengine.api import datastore_errors + +default_retry_params = gcs.RetryParams(initial_delay=0.2, + max_delay=5.0, + backoff_factor=2, + max_retry_period=15) +gcs.set_default_retry_params(default_retry_params) + + +class UploadPage(webapp2.RequestHandler): + + def get(self): + self.response.out.write(""" + <html><body> + <head><title>Performance Insights - Trace Uploader</title></head> + <form action="/upload" enctype="multipart/form-data" method="post"> + <div><input type="file" name="trace"/></div> + <div><input type="submit" value="Upload"></div> + </form><hr> + </body></html>""") + + def post(self): + trace_uuid = str(uuid.uuid4()) + + gcs_path = '/%s/%s.gz' % ( + cloud_config.Get().trace_upload_bucket, trace_uuid) + gcs_file = gcs.open(gcs_path, + 'w', + content_type='application/octet-stream', + options={}, + retry_params=default_retry_params) + gcs_file.write(self.request.get('trace')) + gcs_file.close() + + trace_object = trace_info.TraceInfo(id=trace_uuid) + trace_object.remote_addr = os.environ["REMOTE_ADDR"] + + for arg in self.request.arguments(): + arg_key = arg.replace('-', '_').lower() + if arg_key in trace_object._properties: + try: + setattr(trace_object, arg_key, self.request.get(arg)) + except datastore_errors.BadValueError: + pass + + scenario_config = self.request.get('config') + if scenario_config: + config_json = json.loads(scenario_config) + if 'scenario_name' in config_json: + trace_object.scenario_name = config_json['scenario_name'] + + tags_string = self.request.get('tags') + if tags_string: + # Tags are comma separated and should only include alphanumeric + '-'. + if re.match('^[a-zA-Z0-9-,]+$', tags_string): + trace_object.tags = tags_string.split(',') + else: + logging.warning('The provided tags string includes one or more invalid' + ' characters and will be ignored') + + trace_object.ver = self.request.get('product-version') + trace_object.put() + + self.response.write(trace_uuid) + +app = webapp2.WSGIApplication([('/upload', UploadPage)]) diff --git a/catapult/trace_processor/trace_uploader/index.yaml b/catapult/trace_processor/trace_uploader/index.yaml new file mode 100644 index 00000000..5886087a --- /dev/null +++ b/catapult/trace_processor/trace_uploader/index.yaml @@ -0,0 +1,49 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +indexes: + +- kind: TraceInfo + properties: + - name: prod + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: ver + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: prod + - name: ver + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: tags + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: network_type + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: tags + - name: network_type + - name: date + direction: desc + +- kind: TraceInfo + properties: + - name: user_agent + - name: date + direction: desc diff --git a/catapult/trace_processor/trace_uploader/trace_info.py b/catapult/trace_processor/trace_uploader/trace_info.py new file mode 100644 index 00000000..e7909402 --- /dev/null +++ b/catapult/trace_processor/trace_uploader/trace_info.py @@ -0,0 +1,34 @@ +# Copyright (c) 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +from google.appengine.ext import ndb + + +class TraceInfo(ndb.Model): + config = ndb.StringProperty(indexed=True) + date = ndb.DateTimeProperty(auto_now_add=True, indexed=True) + network_type = ndb.StringProperty(indexed=True, default=None) + prod = ndb.StringProperty(indexed=True) + remote_addr = ndb.StringProperty(indexed=True) + tags = ndb.StringProperty(indexed=True, repeated=True) + user_agent = ndb.StringProperty(indexed=True, default=None) + ver = ndb.StringProperty(indexed=True) + scenario_name = ndb.StringProperty(indexed=True, default=None) + os_name = ndb.StringProperty(indexed=True, default=None) + os_arch = ndb.StringProperty(indexed=True, default=None) + os_version = ndb.StringProperty(indexed=True, default=None) + field_trials = ndb.StringProperty(indexed=True, default=None) + product_version = ndb.StringProperty(indexed=True, default=None) + physical_memory = ndb.StringProperty(indexed=True, default=None) + cpu_brand = ndb.StringProperty(indexed=True, default=None) + cpu_family = ndb.StringProperty(indexed=True, default=None) + cpu_stepping = ndb.StringProperty(indexed=True, default=None) + cpu_model = ndb.StringProperty(indexed=True, default=None) + num_cpus = ndb.StringProperty(indexed=True, default=None) + gpu_devid = ndb.StringProperty(indexed=True, default=None) + gpu_driver = ndb.StringProperty(indexed=True, default=None) + gpu_psver = ndb.StringProperty(indexed=True, default=None) + gpu_vsver = ndb.StringProperty(indexed=True, default=None) + gpu_venid = ndb.StringProperty(indexed=True, default=None) + highres_ticks = ndb.BooleanProperty(indexed=True, default=True) |