aboutsummaryrefslogtreecommitdiff
path: root/catapult/trace_processor/third_party/cloudstorage/rest_api.py
blob: 437c09d78845c7f3efcf41f03483a35795a7588f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

"""Base and helper classes for Google RESTful APIs."""





__all__ = ['add_sync_methods']

import random
import time

from . import api_utils

try:
  from google.appengine.api import app_identity
  from google.appengine.ext import ndb
except ImportError:
  from google.appengine.api import app_identity
  from google.appengine.ext import ndb



def _make_sync_method(name):
  """Helper to synthesize a synchronous method from an async method name.

  Used by the @add_sync_methods class decorator below.

  Args:
    name: The name of the synchronous method.

  Returns:
    A method (with first argument 'self') that retrieves and calls
    self.<name>, passing its own arguments, expects it to return a
    Future, and then waits for and returns that Future's result.
  """

  def sync_wrapper(self, *args, **kwds):
    method = getattr(self, name)
    future = method(*args, **kwds)
    return future.get_result()

  return sync_wrapper


def add_sync_methods(cls):
  """Class decorator to add synchronous methods corresponding to async methods.

  This modifies the class in place, adding additional methods to it.
  If a synchronous method of a given name already exists it is not
  replaced.

  Args:
    cls: A class.

  Returns:
    The same class, modified in place.
  """
  for name in cls.__dict__.keys():
    if name.endswith('_async'):
      sync_name = name[:-6]
      if not hasattr(cls, sync_name):
        setattr(cls, sync_name, _make_sync_method(name))
  return cls


class _AE_TokenStorage_(ndb.Model):
  """Entity to store app_identity tokens in memcache."""

  token = ndb.StringProperty()
  expires = ndb.FloatProperty()


@ndb.tasklet
def _make_token_async(scopes, service_account_id):
  """Get a fresh authentication token.

  Args:
    scopes: A list of scopes.
    service_account_id: Internal-use only.

  Raises:
    An ndb.Return with a tuple (token, expiration_time) where expiration_time is
    seconds since the epoch.
  """
  rpc = app_identity.create_rpc()
  app_identity.make_get_access_token_call(rpc, scopes, service_account_id)
  token, expires_at = yield rpc
  raise ndb.Return((token, expires_at))


class _RestApi(object):
  """Base class for REST-based API wrapper classes.

  This class manages authentication tokens and request retries.  All
  APIs are available as synchronous and async methods; synchronous
  methods are synthesized from async ones by the add_sync_methods()
  function in this module.

  WARNING: Do NOT directly use this api. It's an implementation detail
  and is subject to change at any release.
  """

  def __init__(self, scopes, service_account_id=None, token_maker=None,
               retry_params=None):
    """Constructor.

    Args:
      scopes: A scope or a list of scopes.
      service_account_id: Internal use only.
      token_maker: An asynchronous function of the form
        (scopes, service_account_id) -> (token, expires).
      retry_params: An instance of api_utils.RetryParams. If None, the
        default for current thread will be used.
    """

    if isinstance(scopes, basestring):
      scopes = [scopes]
    self.scopes = scopes
    self.service_account_id = service_account_id
    self.make_token_async = token_maker or _make_token_async
    if not retry_params:
      retry_params = api_utils._get_default_retry_params()
    self.retry_params = retry_params
    self.user_agent = {'User-Agent': retry_params._user_agent}
    self.expiration_headroom = random.randint(60, 240)

  def __getstate__(self):
    """Store state as part of serialization/pickling."""
    return {'scopes': self.scopes,
            'id': self.service_account_id,
            'a_maker': (None if self.make_token_async == _make_token_async
                        else self.make_token_async),
            'retry_params': self.retry_params,
            'expiration_headroom': self.expiration_headroom}

  def __setstate__(self, state):
    """Restore state as part of deserialization/unpickling."""
    self.__init__(state['scopes'],
                  service_account_id=state['id'],
                  token_maker=state['a_maker'],
                  retry_params=state['retry_params'])
    self.expiration_headroom = state['expiration_headroom']

  @ndb.tasklet
  def do_request_async(self, url, method='GET', headers=None, payload=None,
                       deadline=None, callback=None):
    """Issue one HTTP request.

    It performs async retries using tasklets.

    Args:
      url: the url to fetch.
      method: the method in which to fetch.
      headers: the http headers.
      payload: the data to submit in the fetch.
      deadline: the deadline in which to make the call.
      callback: the call to make once completed.

    Yields:
      The async fetch of the url.
    """
    retry_wrapper = api_utils._RetryWrapper(
        self.retry_params,
        retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS,
        should_retry=api_utils._should_retry)
    resp = yield retry_wrapper.run(
        self.urlfetch_async,
        url=url,
        method=method,
        headers=headers,
        payload=payload,
        deadline=deadline,
        callback=callback,
        follow_redirects=False)
    raise ndb.Return((resp.status_code, resp.headers, resp.content))

  @ndb.tasklet
  def get_token_async(self, refresh=False):
    """Get an authentication token.

    The token is cached in memcache, keyed by the scopes argument.
    Uses a random token expiration headroom value generated in the constructor
    to eliminate a burst of GET_ACCESS_TOKEN API requests.

    Args:
      refresh: If True, ignore a cached token; default False.

    Yields:
      An authentication token. This token is guaranteed to be non-expired.
    """
    key = '%s,%s' % (self.service_account_id, ','.join(self.scopes))
    ts = yield _AE_TokenStorage_.get_by_id_async(
        key, use_cache=True, use_memcache=True,
        use_datastore=self.retry_params.save_access_token)
    if refresh or ts is None or ts.expires < (
        time.time() + self.expiration_headroom):
      token, expires_at = yield self.make_token_async(
          self.scopes, self.service_account_id)
      timeout = int(expires_at - time.time())
      ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at)
      if timeout > 0:
        yield ts.put_async(memcache_timeout=timeout,
                           use_datastore=self.retry_params.save_access_token,
                           use_cache=True, use_memcache=True)
    raise ndb.Return(ts.token)

  @ndb.tasklet
  def urlfetch_async(self, url, method='GET', headers=None,
                     payload=None, deadline=None, callback=None,
                     follow_redirects=False):
    """Make an async urlfetch() call.

    This is an async wrapper around urlfetch(). It adds an authentication
    header.

    Args:
      url: the url to fetch.
      method: the method in which to fetch.
      headers: the http headers.
      payload: the data to submit in the fetch.
      deadline: the deadline in which to make the call.
      callback: the call to make once completed.
      follow_redirects: whether or not to follow redirects.

    Yields:
      This returns a Future despite not being decorated with @ndb.tasklet!
    """
    headers = {} if headers is None else dict(headers)
    headers.update(self.user_agent)
    self.token = yield self.get_token_async()
    if self.token:
      headers['authorization'] = 'OAuth ' + self.token

    deadline = deadline or self.retry_params.urlfetch_timeout

    ctx = ndb.get_context()
    resp = yield ctx.urlfetch(
        url, payload=payload, method=method,
        headers=headers, follow_redirects=follow_redirects,
        deadline=deadline, callback=callback)
    raise ndb.Return(resp)


_RestApi = add_sync_methods(_RestApi)