diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-02-18 04:31:44 -0800 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-02-18 04:31:44 -0800 |
commit | ab9173b220f1b88a9a9eaa7ee0133e60b7bffff6 (patch) | |
tree | f01e2ea2dbafa176ca7dae708ad4205fda124ea2 | |
parent | 32de1342e656362dc75d3dd13a3a99a9496bebe6 (diff) | |
parent | d13fc7dbbd00ce411f734653fb711e48dff5ba6d (diff) | |
download | oauth2client-ab9173b220f1b88a9a9eaa7ee0133e60b7bffff6.tar.gz |
Merge pull request #413 from dhermes/fix-412
Implement ServiceAccountCredentials.from_p12_keyfile_buffer().
-rw-r--r-- | oauth2client/service_account.py | 65 | ||||
-rw-r--r-- | tests/test_service_account.py | 34 |
2 files changed, 89 insertions, 10 deletions
diff --git a/oauth2client/service_account.py b/oauth2client/service_account.py index f18f192..3c9bffe 100644 --- a/oauth2client/service_account.py +++ b/oauth2client/service_account.py @@ -216,14 +216,15 @@ class ServiceAccountCredentials(AssertionCredentials): return cls._from_parsed_json_keyfile(keyfile_dict, scopes) @classmethod - def from_p12_keyfile(cls, service_account_email, filename, - private_key_password=None, scopes=''): + def _from_p12_keyfile_contents(cls, service_account_email, + private_key_pkcs12, + private_key_password=None, scopes=''): """Factory constructor from JSON keyfile. Args: service_account_email: string, The email associated with the service account. - filename: string, The location of the PKCS#12 keyfile. + private_key_pkcs12: string, The contents of a PKCS#12 keyfile. private_key_password: string, (Optional) Password for PKCS#12 private key. Defaults to ``notasecret``. scopes: List or string, (Optional) Scopes to use when acquiring an @@ -237,8 +238,6 @@ class ServiceAccountCredentials(AssertionCredentials): NotImplementedError if pyOpenSSL is not installed / not the active crypto library. """ - with open(filename, 'rb') as file_obj: - private_key_pkcs12 = file_obj.read() if private_key_password is None: private_key_password = _PASSWORD_DEFAULT signer = crypt.Signer.from_string(private_key_pkcs12, @@ -248,6 +247,62 @@ class ServiceAccountCredentials(AssertionCredentials): credentials._private_key_password = private_key_password return credentials + @classmethod + def from_p12_keyfile(cls, service_account_email, filename, + private_key_password=None, scopes=''): + """Factory constructor from JSON keyfile. + + Args: + service_account_email: string, The email associated with the + service account. + filename: string, The location of the PKCS#12 keyfile. + private_key_password: string, (Optional) Password for PKCS#12 + private key. Defaults to ``notasecret``. + scopes: List or string, (Optional) Scopes to use when acquiring an + access token. + + Returns: + ServiceAccountCredentials, a credentials object created from + the keyfile. + + Raises: + NotImplementedError if pyOpenSSL is not installed / not the + active crypto library. + """ + with open(filename, 'rb') as file_obj: + private_key_pkcs12 = file_obj.read() + return cls._from_p12_keyfile_contents( + service_account_email, private_key_pkcs12, + private_key_password=private_key_password, scopes=scopes) + + @classmethod + def from_p12_keyfile_buffer(cls, service_account_email, file_buffer, + private_key_password=None, scopes=''): + """Factory constructor from JSON keyfile. + + Args: + service_account_email: string, The email associated with the + service account. + file_buffer: stream, A buffer that implements ``read()`` + and contains the PKCS#12 key contents. + private_key_password: string, (Optional) Password for PKCS#12 + private key. Defaults to ``notasecret``. + scopes: List or string, (Optional) Scopes to use when acquiring an + access token. + + Returns: + ServiceAccountCredentials, a credentials object created from + the keyfile. + + Raises: + NotImplementedError if pyOpenSSL is not installed / not the + active crypto library. + """ + private_key_pkcs12 = file_buffer.read() + return cls._from_p12_keyfile_contents( + service_account_email, private_key_pkcs12, + private_key_password=private_key_password, scopes=scopes) + def _generate_assertion(self): """Generate the assertion that will be used in the request.""" now = int(time.time()) diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 3c91d19..0b03319 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -156,10 +156,10 @@ class ServiceAccountCredentialsTests(unittest2.TestCase): private_key_password=private_key_password, scopes=scopes) self.assertIsInstance(creds, ServiceAccountCredentials) - self.assertEqual(creds.client_id, None) + self.assertIsNone(creds.client_id) self.assertEqual(creds._service_account_email, service_account_email) - self.assertEqual(creds._private_key_id, None) - self.assertEqual(creds._private_key_pkcs8_pem, None) + self.assertIsNone(creds._private_key_id) + self.assertIsNone(creds._private_key_pkcs8_pem) self.assertEqual(creds._private_key_pkcs12, key_contents) if private_key_password is not None: self.assertEqual(creds._private_key_password, private_key_password) @@ -173,6 +173,30 @@ class ServiceAccountCredentialsTests(unittest2.TestCase): self._from_p12_keyfile_helper(private_key_password=password, scopes=['foo', 'bar']) + def test_from_p12_keyfile_buffer(self): + service_account_email = 'name@email.com' + filename = data_filename('privatekey.p12') + private_key_password = 'notasecret' + scopes = ['foo', 'bar'] + with open(filename, 'rb') as file_obj: + key_contents = file_obj.read() + # Seek back to the beginning so the buffer can be + # passed to the constructor. + file_obj.seek(0) + creds = ServiceAccountCredentials.from_p12_keyfile_buffer( + service_account_email, file_obj, + private_key_password=private_key_password, + scopes=scopes) + # Check the created object. + self.assertIsInstance(creds, ServiceAccountCredentials) + self.assertIsNone(creds.client_id) + self.assertEqual(creds._service_account_email, service_account_email) + self.assertIsNone(creds._private_key_id) + self.assertIsNone(creds._private_key_pkcs8_pem) + self.assertEqual(creds._private_key_pkcs12, key_contents) + self.assertEqual(creds._private_key_password, private_key_password) + self.assertEqual(creds._scopes, ' '.join(scopes)) + def test_create_scoped_required_without_scopes(self): self.assertTrue(self.credentials.create_scoped_required()) @@ -236,9 +260,9 @@ class ServiceAccountCredentialsTests(unittest2.TestCase): ]) # Get Access Token, First attempt. - self.assertEqual(credentials.access_token, None) + self.assertIsNone(credentials.access_token) self.assertFalse(credentials.access_token_expired) - self.assertEqual(credentials.token_expiry, None) + self.assertIsNone(credentials.token_expiry) token = credentials.get_access_token(http=http) self.assertEqual(credentials.token_expiry, EXPIRY_TIME) self.assertEqual(token1, token.access_token) |