diff options
Diffstat (limited to 'catapult/trace_processor/third_party/cloudstorage/rest_api.py')
-rw-r--r-- | catapult/trace_processor/third_party/cloudstorage/rest_api.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/catapult/trace_processor/third_party/cloudstorage/rest_api.py b/catapult/trace_processor/third_party/cloudstorage/rest_api.py new file mode 100644 index 00000000..437c09d7 --- /dev/null +++ b/catapult/trace_processor/third_party/cloudstorage/rest_api.py @@ -0,0 +1,258 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Base and helper classes for Google RESTful APIs.""" + + + + + +__all__ = ['add_sync_methods'] + +import random +import time + +from . import api_utils + +try: + from google.appengine.api import app_identity + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import app_identity + from google.appengine.ext import ndb + + + +def _make_sync_method(name): + """Helper to synthesize a synchronous method from an async method name. + + Used by the @add_sync_methods class decorator below. + + Args: + name: The name of the synchronous method. + + Returns: + A method (with first argument 'self') that retrieves and calls + self.<name>, passing its own arguments, expects it to return a + Future, and then waits for and returns that Future's result. + """ + + def sync_wrapper(self, *args, **kwds): + method = getattr(self, name) + future = method(*args, **kwds) + return future.get_result() + + return sync_wrapper + + +def add_sync_methods(cls): + """Class decorator to add synchronous methods corresponding to async methods. + + This modifies the class in place, adding additional methods to it. + If a synchronous method of a given name already exists it is not + replaced. + + Args: + cls: A class. + + Returns: + The same class, modified in place. + """ + for name in cls.__dict__.keys(): + if name.endswith('_async'): + sync_name = name[:-6] + if not hasattr(cls, sync_name): + setattr(cls, sync_name, _make_sync_method(name)) + return cls + + +class _AE_TokenStorage_(ndb.Model): + """Entity to store app_identity tokens in memcache.""" + + token = ndb.StringProperty() + expires = ndb.FloatProperty() + + +@ndb.tasklet +def _make_token_async(scopes, service_account_id): + """Get a fresh authentication token. + + Args: + scopes: A list of scopes. + service_account_id: Internal-use only. + + Raises: + An ndb.Return with a tuple (token, expiration_time) where expiration_time is + seconds since the epoch. + """ + rpc = app_identity.create_rpc() + app_identity.make_get_access_token_call(rpc, scopes, service_account_id) + token, expires_at = yield rpc + raise ndb.Return((token, expires_at)) + + +class _RestApi(object): + """Base class for REST-based API wrapper classes. + + This class manages authentication tokens and request retries. All + APIs are available as synchronous and async methods; synchronous + methods are synthesized from async ones by the add_sync_methods() + function in this module. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + """ + + def __init__(self, scopes, service_account_id=None, token_maker=None, + retry_params=None): + """Constructor. + + Args: + scopes: A scope or a list of scopes. + service_account_id: Internal use only. + token_maker: An asynchronous function of the form + (scopes, service_account_id) -> (token, expires). + retry_params: An instance of api_utils.RetryParams. If None, the + default for current thread will be used. + """ + + if isinstance(scopes, basestring): + scopes = [scopes] + self.scopes = scopes + self.service_account_id = service_account_id + self.make_token_async = token_maker or _make_token_async + if not retry_params: + retry_params = api_utils._get_default_retry_params() + self.retry_params = retry_params + self.user_agent = {'User-Agent': retry_params._user_agent} + self.expiration_headroom = random.randint(60, 240) + + def __getstate__(self): + """Store state as part of serialization/pickling.""" + return {'scopes': self.scopes, + 'id': self.service_account_id, + 'a_maker': (None if self.make_token_async == _make_token_async + else self.make_token_async), + 'retry_params': self.retry_params, + 'expiration_headroom': self.expiration_headroom} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling.""" + self.__init__(state['scopes'], + service_account_id=state['id'], + token_maker=state['a_maker'], + retry_params=state['retry_params']) + self.expiration_headroom = state['expiration_headroom'] + + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Issue one HTTP request. + + It performs async retries using tasklets. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + + Yields: + The async fetch of the url. + """ + retry_wrapper = api_utils._RetryWrapper( + self.retry_params, + retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS, + should_retry=api_utils._should_retry) + resp = yield retry_wrapper.run( + self.urlfetch_async, + url=url, + method=method, + headers=headers, + payload=payload, + deadline=deadline, + callback=callback, + follow_redirects=False) + raise ndb.Return((resp.status_code, resp.headers, resp.content)) + + @ndb.tasklet + def get_token_async(self, refresh=False): + """Get an authentication token. + + The token is cached in memcache, keyed by the scopes argument. + Uses a random token expiration headroom value generated in the constructor + to eliminate a burst of GET_ACCESS_TOKEN API requests. + + Args: + refresh: If True, ignore a cached token; default False. + + Yields: + An authentication token. This token is guaranteed to be non-expired. + """ + key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) + ts = yield _AE_TokenStorage_.get_by_id_async( + key, use_cache=True, use_memcache=True, + use_datastore=self.retry_params.save_access_token) + if refresh or ts is None or ts.expires < ( + time.time() + self.expiration_headroom): + token, expires_at = yield self.make_token_async( + self.scopes, self.service_account_id) + timeout = int(expires_at - time.time()) + ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) + if timeout > 0: + yield ts.put_async(memcache_timeout=timeout, + use_datastore=self.retry_params.save_access_token, + use_cache=True, use_memcache=True) + raise ndb.Return(ts.token) + + @ndb.tasklet + def urlfetch_async(self, url, method='GET', headers=None, + payload=None, deadline=None, callback=None, + follow_redirects=False): + """Make an async urlfetch() call. + + This is an async wrapper around urlfetch(). It adds an authentication + header. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + follow_redirects: whether or not to follow redirects. + + Yields: + This returns a Future despite not being decorated with @ndb.tasklet! + """ + headers = {} if headers is None else dict(headers) + headers.update(self.user_agent) + self.token = yield self.get_token_async() + if self.token: + headers['authorization'] = 'OAuth ' + self.token + + deadline = deadline or self.retry_params.urlfetch_timeout + + ctx = ndb.get_context() + resp = yield ctx.urlfetch( + url, payload=payload, method=method, + headers=headers, follow_redirects=follow_redirects, + deadline=deadline, callback=callback) + raise ndb.Return(resp) + + +_RestApi = add_sync_methods(_RestApi) |