aboutsummaryrefslogtreecommitdiff
path: root/tests/asyncio/test_operation_async.py
blob: 26ad7cef212628f6ed0c365867fc5a5c6bf5e751 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright 2017, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import mock
import pytest

try:
    import grpc  # noqa: F401
except ImportError:
    pytest.skip("No GRPC", allow_module_level=True)

from google.api_core import exceptions
from google.api_core import operation_async
from google.api_core import operations_v1
from google.api_core import retry_async
from google.longrunning import operations_pb2
from google.protobuf import struct_pb2
from google.rpc import code_pb2
from google.rpc import status_pb2

TEST_OPERATION_NAME = "test/operation"


def make_operation_proto(
    name=TEST_OPERATION_NAME, metadata=None, response=None, error=None, **kwargs
):
    operation_proto = operations_pb2.Operation(name=name, **kwargs)

    if metadata is not None:
        operation_proto.metadata.Pack(metadata)

    if response is not None:
        operation_proto.response.Pack(response)

    if error is not None:
        operation_proto.error.CopyFrom(error)

    return operation_proto


def make_operation_future(client_operations_responses=None):
    if client_operations_responses is None:
        client_operations_responses = [make_operation_proto()]

    refresh = mock.AsyncMock(spec=["__call__"], side_effect=client_operations_responses)
    refresh.responses = client_operations_responses
    cancel = mock.AsyncMock(spec=["__call__"])
    operation_future = operation_async.AsyncOperation(
        client_operations_responses[0],
        refresh,
        cancel,
        result_type=struct_pb2.Struct,
        metadata_type=struct_pb2.Struct,
    )

    return operation_future, refresh, cancel


@pytest.mark.asyncio
async def test_constructor():
    future, refresh, _ = make_operation_future()

    assert future.operation == refresh.responses[0]
    assert future.operation.done is False
    assert future.operation.name == TEST_OPERATION_NAME
    assert future.metadata is None
    assert await future.running()


def test_metadata():
    expected_metadata = struct_pb2.Struct()
    future, _, _ = make_operation_future(
        [make_operation_proto(metadata=expected_metadata)]
    )

    assert future.metadata == expected_metadata


@pytest.mark.asyncio
async def test_cancellation():
    responses = [
        make_operation_proto(),
        # Second response indicates that the operation was cancelled.
        make_operation_proto(
            done=True, error=status_pb2.Status(code=code_pb2.CANCELLED)
        ),
    ]
    future, _, cancel = make_operation_future(responses)

    assert await future.cancel()
    assert await future.cancelled()
    cancel.assert_called_once_with()

    # Cancelling twice should have no effect.
    assert not await future.cancel()
    cancel.assert_called_once_with()


@pytest.mark.asyncio
async def test_result():
    expected_result = struct_pb2.Struct()
    responses = [
        make_operation_proto(),
        # Second operation response includes the result.
        make_operation_proto(done=True, response=expected_result),
    ]
    future, _, _ = make_operation_future(responses)

    result = await future.result()

    assert result == expected_result
    assert await future.done()


@pytest.mark.asyncio
async def test_done_w_retry():
    RETRY_PREDICATE = retry_async.if_exception_type(exceptions.TooManyRequests)
    test_retry = retry_async.AsyncRetry(predicate=RETRY_PREDICATE)

    expected_result = struct_pb2.Struct()
    responses = [
        make_operation_proto(),
        # Second operation response includes the result.
        make_operation_proto(done=True, response=expected_result),
    ]
    future, refresh, _ = make_operation_future(responses)

    await future.done(retry=test_retry)
    refresh.assert_called_once_with(retry=test_retry)


@pytest.mark.asyncio
async def test_exception():
    expected_exception = status_pb2.Status(message="meep")
    responses = [
        make_operation_proto(),
        # Second operation response includes the error.
        make_operation_proto(done=True, error=expected_exception),
    ]
    future, _, _ = make_operation_future(responses)

    exception = await future.exception()

    assert expected_exception.message in "{!r}".format(exception)


@mock.patch("asyncio.sleep", autospec=True)
@pytest.mark.asyncio
async def test_unexpected_result(unused_sleep):
    responses = [
        make_operation_proto(),
        # Second operation response is done, but has not error or response.
        make_operation_proto(done=True),
    ]
    future, _, _ = make_operation_future(responses)

    exception = await future.exception()

    assert "Unexpected state" in "{!r}".format(exception)


def test_from_gapic():
    operation_proto = make_operation_proto(done=True)
    operations_client = mock.create_autospec(
        operations_v1.OperationsClient, instance=True
    )

    future = operation_async.from_gapic(
        operation_proto,
        operations_client,
        struct_pb2.Struct,
        metadata_type=struct_pb2.Struct,
        grpc_metadata=[("x-goog-request-params", "foo")],
    )

    assert future._result_type == struct_pb2.Struct
    assert future._metadata_type == struct_pb2.Struct
    assert future.operation.name == TEST_OPERATION_NAME
    assert future.done
    assert future._refresh.keywords["metadata"] == [("x-goog-request-params", "foo")]
    assert future._cancel.keywords["metadata"] == [("x-goog-request-params", "foo")]


def test_deserialize():
    op = make_operation_proto(name="foobarbaz")
    serialized = op.SerializeToString()
    deserialized_op = operation_async.AsyncOperation.deserialize(serialized)
    assert op.name == deserialized_op.name
    assert type(op) is type(deserialized_op)