aboutsummaryrefslogtreecommitdiff
path: root/oauth2client/contrib/multiprocess_file_storage.py
blob: e9e8c8cd1d15be02d42a6c596eb8140a53b61f23 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Multiprocess file credential storage.

This module provides file-based storage that supports multiple credentials and
cross-thread and process access.

This module supersedes the functionality previously found in `multistore_file`.

This module provides :class:`MultiprocessFileStorage` which:
    * Is tied to a single credential via a user-specified key. This key can be
      used to distinguish between multiple users, client ids, and/or scopes.
    * Can be safely accessed and refreshed across threads and processes.

Process & thread safety guarantees the following behavior:
    * If one thread or process refreshes a credential, subsequent refreshes
      from other processes will re-fetch the credentials from the file instead
      of performing an http request.
    * If two processes or threads attempt to refresh concurrently, only one
      will be able to acquire the lock and refresh, with the deadlock caveat
      below.
    * The interprocess lock will not deadlock, instead, the if a process can
      not acquire the interprocess lock within ``INTERPROCESS_LOCK_DEADLINE``
      it will allow refreshing the credential but will not write the updated
      credential to disk, This logic happens during every lock cycle - if the
      credentials are refreshed again it will retry locking and writing as
      normal.

Usage
=====

Before using the storage, you need to decide how you want to key the
credentials. A few common strategies include:

    * If you're storing credentials for multiple users in a single file, use
      a unique identifier for each user as the key.
    * If you're storing credentials for multiple client IDs in a single file,
      use the client ID as the key.
    * If you're storing multiple credentials for one user, use the scopes as
      the key.
    * If you have a complicated setup, use a compound key. For example, you
      can use a combination of the client ID and scopes as the key.

Create an instance of :class:`MultiprocessFileStorage` for each credential you
want to store, for example::

    filename = 'credentials'
    key = '{}-{}'.format(client_id, user_id)
    storage = MultiprocessFileStorage(filename, key)

To store the credentials::

    storage.put(credentials)

If you're going to continue to use the credentials after storing them, be sure
to call :func:`set_store`::

    credentials.set_store(storage)

To retrieve the credentials::

    storage.get(credentials)

"""

import base64
import json
import logging
import os
import threading

import fasteners
from six import iteritems

from oauth2client import _helpers
from oauth2client import client


#: The maximum amount of time, in seconds, to wait when acquire the
#: interprocess lock before falling back to read-only mode.
INTERPROCESS_LOCK_DEADLINE = 1

logger = logging.getLogger(__name__)
_backends = {}
_backends_lock = threading.Lock()


def _create_file_if_needed(filename):
    """Creates the an empty file if it does not already exist.

    Returns:
        True if the file was created, False otherwise.
    """
    if os.path.exists(filename):
        return False
    else:
        # Equivalent to "touch".
        open(filename, 'a+b').close()
        logger.info('Credential file {0} created'.format(filename))
        return True


def _load_credentials_file(credentials_file):
    """Load credentials from the given file handle.

    The file is expected to be in this format:

        {
            "file_version": 2,
            "credentials": {
                "key": "base64 encoded json representation of credentials."
            }
        }

    This function will warn and return empty credentials instead of raising
    exceptions.

    Args:
        credentials_file: An open file handle.

    Returns:
        A dictionary mapping user-defined keys to an instance of
        :class:`oauth2client.client.Credentials`.
    """
    try:
        credentials_file.seek(0)
        data = json.load(credentials_file)
    except Exception:
        logger.warning(
            'Credentials file could not be loaded, will ignore and '
            'overwrite.')
        return {}

    if data.get('file_version') != 2:
        logger.warning(
            'Credentials file is not version 2, will ignore and '
            'overwrite.')
        return {}

    credentials = {}

    for key, encoded_credential in iteritems(data.get('credentials', {})):
        try:
            credential_json = base64.b64decode(encoded_credential)
            credential = client.Credentials.new_from_json(credential_json)
            credentials[key] = credential
        except:
            logger.warning(
                'Invalid credential {0} in file, ignoring.'.format(key))

    return credentials


def _write_credentials_file(credentials_file, credentials):
    """Writes credentials to a file.

    Refer to :func:`_load_credentials_file` for the format.

    Args:
        credentials_file: An open file handle, must be read/write.
        credentials: A dictionary mapping user-defined keys to an instance of
            :class:`oauth2client.client.Credentials`.
    """
    data = {'file_version': 2, 'credentials': {}}

    for key, credential in iteritems(credentials):
        credential_json = credential.to_json()
        encoded_credential = _helpers._from_bytes(base64.b64encode(
            _helpers._to_bytes(credential_json)))
        data['credentials'][key] = encoded_credential

    credentials_file.seek(0)
    json.dump(data, credentials_file)
    credentials_file.truncate()


class _MultiprocessStorageBackend(object):
    """Thread-local backend for multiprocess storage.

    Each process has only one instance of this backend per file. All threads
    share a single instance of this backend. This ensures that all threads
    use the same thread lock and process lock when accessing the file.
    """

    def __init__(self, filename):
        self._file = None
        self._filename = filename
        self._process_lock = fasteners.InterProcessLock(
            '{0}.lock'.format(filename))
        self._thread_lock = threading.Lock()
        self._read_only = False
        self._credentials = {}

    def _load_credentials(self):
        """(Re-)loads the credentials from the file."""
        if not self._file:
            return

        loaded_credentials = _load_credentials_file(self._file)
        self._credentials.update(loaded_credentials)

        logger.debug('Read credential file')

    def _write_credentials(self):
        if self._read_only:
            logger.debug('In read-only mode, not writing credentials.')
            return

        _write_credentials_file(self._file, self._credentials)
        logger.debug('Wrote credential file {0}.'.format(self._filename))

    def acquire_lock(self):
        self._thread_lock.acquire()
        locked = self._process_lock.acquire(timeout=INTERPROCESS_LOCK_DEADLINE)

        if locked:
            _create_file_if_needed(self._filename)
            self._file = open(self._filename, 'r+')
            self._read_only = False

        else:
            logger.warn(
                'Failed to obtain interprocess lock for credentials. '
                'If a credential is being refreshed, other processes may '
                'not see the updated access token and refresh as well.')
            if os.path.exists(self._filename):
                self._file = open(self._filename, 'r')
            else:
                self._file = None
            self._read_only = True

        self._load_credentials()

    def release_lock(self):
        if self._file is not None:
            self._file.close()
            self._file = None

        if not self._read_only:
            self._process_lock.release()

        self._thread_lock.release()

    def _refresh_predicate(self, credentials):
        if credentials is None:
            return True
        elif credentials.invalid:
            return True
        elif credentials.access_token_expired:
            return True
        else:
            return False

    def locked_get(self, key):
        # Check if the credential is already in memory.
        credentials = self._credentials.get(key, None)

        # Use the refresh predicate to determine if the entire store should be
        # reloaded. This basically checks if the credentials are invalid
        # or expired. This covers the situation where another process has
        # refreshed the credentials and this process doesn't know about it yet.
        # In that case, this process won't needlessly refresh the credentials.
        if self._refresh_predicate(credentials):
            self._load_credentials()
            credentials = self._credentials.get(key, None)

        return credentials

    def locked_put(self, key, credentials):
        self._load_credentials()
        self._credentials[key] = credentials
        self._write_credentials()

    def locked_delete(self, key):
        self._load_credentials()
        self._credentials.pop(key, None)
        self._write_credentials()


def _get_backend(filename):
    """A helper method to get or create a backend with thread locking.

    This ensures that only one backend is used per-file per-process, so that
    thread and process locks are appropriately shared.

    Args:
        filename: The full path to the credential storage file.

    Returns:
        An instance of :class:`_MultiprocessStorageBackend`.
    """
    filename = os.path.abspath(filename)

    with _backends_lock:
        if filename not in _backends:
            _backends[filename] = _MultiprocessStorageBackend(filename)
        return _backends[filename]


class MultiprocessFileStorage(client.Storage):
    """Multiprocess file credential storage.

    Args:
      filename: The path to the file where credentials will be stored.
      key: An arbitrary string used to uniquely identify this set of
          credentials. For example, you may use the user's ID as the key or
          a combination of the client ID and user ID.
    """
    def __init__(self, filename, key):
        self._key = key
        self._backend = _get_backend(filename)

    def acquire_lock(self):
        self._backend.acquire_lock()

    def release_lock(self):
        self._backend.release_lock()

    def locked_get(self):
        """Retrieves the current credentials from the store.

        Returns:
            An instance of :class:`oauth2client.client.Credentials` or `None`.
        """
        credential = self._backend.locked_get(self._key)

        if credential is not None:
            credential.set_store(self)

        return credential

    def locked_put(self, credentials):
        """Writes the given credentials to the store.

        Args:
            credentials: an instance of
                :class:`oauth2client.client.Credentials`.
        """
        return self._backend.locked_put(self._key, credentials)

    def locked_delete(self):
        """Deletes the current credentials from the store."""
        return self._backend.locked_delete(self._key)