aboutsummaryrefslogtreecommitdiff
path: root/scripts/run_system_tests.py
blob: 3f7baba7d939902117e64a6f5ed091389c18ecd2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json
import os

import httplib2
from oauth2client import client
from oauth2client import service_account


JSON_KEY_PATH = os.getenv('OAUTH2CLIENT_TEST_JSON_KEY_PATH')
P12_KEY_PATH = os.getenv('OAUTH2CLIENT_TEST_P12_KEY_PATH')
P12_KEY_EMAIL = os.getenv('OAUTH2CLIENT_TEST_P12_KEY_EMAIL')
USER_KEY_PATH = os.getenv('OAUTH2CLIENT_TEST_USER_KEY_PATH')
USER_KEY_EMAIL = os.getenv('OAUTH2CLIENT_TEST_USER_KEY_EMAIL')

SCOPE = ('https://www.googleapis.com/auth/plus.login',
         'https://www.googleapis.com/auth/plus.me',
         'https://www.googleapis.com/auth/userinfo.email',
         'https://www.googleapis.com/auth/userinfo.profile')
USER_INFO = 'https://www.googleapis.com/oauth2/v2/userinfo'


def _require_environ():
    if (JSON_KEY_PATH is None or P12_KEY_PATH is None or
        P12_KEY_EMAIL is None or USER_KEY_PATH is None or
        USER_KEY_EMAIL is None):
        raise EnvironmentError('Expected environment variables to be set:',
                               'OAUTH2CLIENT_TEST_JSON_KEY_PATH',
                               'OAUTH2CLIENT_TEST_P12_KEY_PATH',
                               'OAUTH2CLIENT_TEST_P12_KEY_EMAIL',
                               'OAUTH2CLIENT_TEST_USER_KEY_PATH',
                               'OAUTH2CLIENT_TEST_USER_KEY_EMAIL')

    if not os.path.isfile(JSON_KEY_PATH):
        raise EnvironmentError(JSON_KEY_PATH, 'is not a file')
    if not os.path.isfile(P12_KEY_PATH):
        raise EnvironmentError(P12_KEY_PATH, 'is not a file')
    if not os.path.isfile(USER_KEY_PATH):
        raise EnvironmentError(USER_KEY_PATH, 'is not a file')


def _check_user_info(credentials, expected_email):
    http = credentials.authorize(httplib2.Http())
    response, content = http.request(USER_INFO)
    if response.status != 200:
        raise ValueError('Expected 200 response.')

    content = content.decode('utf-8')
    payload = json.loads(content)
    if payload['email'] != expected_email:
        raise ValueError('User info email does not match credentials.')


def run_json():
    with open(JSON_KEY_PATH, 'r') as file_object:
        client_credentials = json.load(file_object)

    credentials = service_account._ServiceAccountCredentials(
        service_account_id=client_credentials['client_id'],
        service_account_email=client_credentials['client_email'],
        private_key_id=client_credentials['private_key_id'],
        private_key_pkcs8_text=client_credentials['private_key'],
        scopes=SCOPE,
    )

    _check_user_info(credentials, client_credentials['client_email'])


def run_p12():
    with open(P12_KEY_PATH, 'rb') as file_object:
        private_key_contents = file_object.read()

    credentials = client.SignedJwtAssertionCredentials(
        service_account_name=P12_KEY_EMAIL,
        private_key=private_key_contents,
        scope=SCOPE,
    )

    _check_user_info(credentials, P12_KEY_EMAIL)


def run_user_json():
    with open(USER_KEY_PATH, 'r') as file_object:
        client_credentials = json.load(file_object)

    credentials = client.GoogleCredentials(
        access_token=None,
        client_id=client_credentials['client_id'],
        client_secret=client_credentials['client_secret'],
        refresh_token=client_credentials['refresh_token'],
        token_expiry=None,
        token_uri=client.GOOGLE_TOKEN_URI,
        user_agent='Python client library',
    )

    _check_user_info(credentials, USER_KEY_EMAIL)


def main():
    _require_environ()
    run_json()
    run_p12()
    run_user_json()


if __name__ == '__main__':
    main()