aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorLidi Zheng <lidiz@google.com>2020-06-02 15:06:04 -0700
committerGitHub <noreply@github.com>2020-06-02 22:06:04 +0000
commitdd9b2f38a70e85952cc05552ec8070cdf29ddbb4 (patch)
tree91de6df237390c1c3fb1588fa6fd06b8707d9113 /tests
parent7be1e59e9d75c112f346d2b76dce3dd60e3584a1 (diff)
downloadpython-api-core-dd9b2f38a70e85952cc05552ec8070cdf29ddbb4.tar.gz
feat: AsyncIO Integration [Part 2] (#28)
Children PR of https://github.com/googleapis/python-api-core/pull/26. This PR includes AsyncIO version of: * Polling future * Page iterator The AsyncIO version of polling future still uses the same mechanism as the sync version. The AsyncIO polling future tries to update its own state whenever the application want to access information or perform actions. For page iterator, it has similar interface design as sync version. But instead of fulfilling normal generator protocol, it is based on the async generator. Related #23
Diffstat (limited to 'tests')
-rw-r--r--tests/asyncio/future/__init__.py0
-rw-r--r--tests/asyncio/future/test_async_future.py229
-rw-r--r--tests/asyncio/test_operation_async.py193
-rw-r--r--tests/asyncio/test_page_iterator_async.py261
4 files changed, 683 insertions, 0 deletions
diff --git a/tests/asyncio/future/__init__.py b/tests/asyncio/future/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/asyncio/future/__init__.py
diff --git a/tests/asyncio/future/test_async_future.py b/tests/asyncio/future/test_async_future.py
new file mode 100644
index 0000000..3322cb0
--- /dev/null
+++ b/tests/asyncio/future/test_async_future.py
@@ -0,0 +1,229 @@
+# Copyright 2017, Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core.future import async_future
+
+
+class AsyncFuture(async_future.AsyncFuture):
+ async def done(self):
+ return False
+
+ async def cancel(self):
+ return True
+
+ async def cancelled(self):
+ return False
+
+ async def running(self):
+ return True
+
+
+@pytest.mark.asyncio
+async def test_polling_future_constructor():
+ future = AsyncFuture()
+ assert not await future.done()
+ assert not await future.cancelled()
+ assert await future.running()
+ assert await future.cancel()
+
+
+@pytest.mark.asyncio
+async def test_set_result():
+ future = AsyncFuture()
+ callback = mock.Mock()
+
+ future.set_result(1)
+
+ assert await future.result() == 1
+ callback_called = asyncio.Event()
+
+ def callback(unused_future):
+ callback_called.set()
+
+ future.add_done_callback(callback)
+ await callback_called.wait()
+
+
+@pytest.mark.asyncio
+async def test_set_exception():
+ future = AsyncFuture()
+ exception = ValueError("meep")
+
+ future.set_exception(exception)
+
+ assert await future.exception() == exception
+ with pytest.raises(ValueError):
+ await future.result()
+
+ callback_called = asyncio.Event()
+
+ def callback(unused_future):
+ callback_called.set()
+
+ future.add_done_callback(callback)
+ await callback_called.wait()
+
+
+@pytest.mark.asyncio
+async def test_invoke_callback_exception():
+ future = AsyncFuture()
+ future.set_result(42)
+
+ # This should not raise, despite the callback causing an exception.
+ callback_called = asyncio.Event()
+
+ def callback(unused_future):
+ callback_called.set()
+ raise ValueError()
+
+ future.add_done_callback(callback)
+ await callback_called.wait()
+
+
+class AsyncFutureWithPoll(AsyncFuture):
+ def __init__(self):
+ super().__init__()
+ self.poll_count = 0
+ self.event = asyncio.Event()
+
+ async def done(self):
+ self.poll_count += 1
+ await self.event.wait()
+ self.set_result(42)
+ return True
+
+
+@pytest.mark.asyncio
+async def test_result_with_polling():
+ future = AsyncFutureWithPoll()
+
+ future.event.set()
+ result = await future.result()
+
+ assert result == 42
+ assert future.poll_count == 1
+ # Repeated calls should not cause additional polling
+ assert await future.result() == result
+ assert future.poll_count == 1
+
+
+class AsyncFutureTimeout(AsyncFutureWithPoll):
+
+ async def done(self):
+ await asyncio.sleep(0.2)
+ return False
+
+
+@pytest.mark.asyncio
+async def test_result_timeout():
+ future = AsyncFutureTimeout()
+ with pytest.raises(asyncio.TimeoutError):
+ await future.result(timeout=0.2)
+
+
+@pytest.mark.asyncio
+async def test_exception_timeout():
+ future = AsyncFutureTimeout()
+ with pytest.raises(asyncio.TimeoutError):
+ await future.exception(timeout=0.2)
+
+
+@pytest.mark.asyncio
+async def test_result_timeout_with_retry():
+ future = AsyncFutureTimeout()
+ with pytest.raises(asyncio.TimeoutError):
+ await future.exception(timeout=0.4)
+
+
+class AsyncFutureTransient(AsyncFutureWithPoll):
+ def __init__(self, errors):
+ super().__init__()
+ self._errors = errors
+
+ async def done(self):
+ if self._errors:
+ error, self._errors = self._errors[0], self._errors[1:]
+ raise error("testing")
+ self.poll_count += 1
+ self.set_result(42)
+ return True
+
+
+@mock.patch("asyncio.sleep", autospec=True)
+@pytest.mark.asyncio
+async def test_result_transient_error(unused_sleep):
+ future = AsyncFutureTransient(
+ (
+ exceptions.TooManyRequests,
+ exceptions.InternalServerError,
+ exceptions.BadGateway,
+ )
+ )
+ result = await future.result()
+ assert result == 42
+ assert future.poll_count == 1
+ # Repeated calls should not cause additional polling
+ assert await future.result() == result
+ assert future.poll_count == 1
+
+
+@pytest.mark.asyncio
+async def test_callback_concurrency():
+ future = AsyncFutureWithPoll()
+
+ callback_called = asyncio.Event()
+
+ def callback(unused_future):
+ callback_called.set()
+
+ future.add_done_callback(callback)
+
+ # Give the thread a second to poll
+ await asyncio.sleep(1)
+ assert future.poll_count == 1
+
+ future.event.set()
+ await callback_called.wait()
+
+
+@pytest.mark.asyncio
+async def test_double_callback_concurrency():
+ future = AsyncFutureWithPoll()
+
+ callback_called = asyncio.Event()
+
+ def callback(unused_future):
+ callback_called.set()
+
+ callback_called2 = asyncio.Event()
+
+ def callback2(unused_future):
+ callback_called2.set()
+
+ future.add_done_callback(callback)
+ future.add_done_callback(callback2)
+
+ # Give the thread a second to poll
+ await asyncio.sleep(1)
+ future.event.set()
+
+ assert future.poll_count == 1
+ await callback_called.wait()
+ await callback_called2.wait()
diff --git a/tests/asyncio/test_operation_async.py b/tests/asyncio/test_operation_async.py
new file mode 100644
index 0000000..419749f
--- /dev/null
+++ b/tests/asyncio/test_operation_async.py
@@ -0,0 +1,193 @@
+# Copyright 2017, Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import operation_async
+from google.api_core import operations_v1
+from google.api_core import retry_async
+from google.longrunning import operations_pb2
+from google.protobuf import struct_pb2
+from google.rpc import code_pb2
+from google.rpc import status_pb2
+
+TEST_OPERATION_NAME = "test/operation"
+
+
+def make_operation_proto(
+ name=TEST_OPERATION_NAME, metadata=None, response=None, error=None, **kwargs
+):
+ operation_proto = operations_pb2.Operation(name=name, **kwargs)
+
+ if metadata is not None:
+ operation_proto.metadata.Pack(metadata)
+
+ if response is not None:
+ operation_proto.response.Pack(response)
+
+ if error is not None:
+ operation_proto.error.CopyFrom(error)
+
+ return operation_proto
+
+
+def make_operation_future(client_operations_responses=None):
+ if client_operations_responses is None:
+ client_operations_responses = [make_operation_proto()]
+
+ refresh = mock.AsyncMock(spec=["__call__"], side_effect=client_operations_responses)
+ refresh.responses = client_operations_responses
+ cancel = mock.AsyncMock(spec=["__call__"])
+ operation_future = operation_async.AsyncOperation(
+ client_operations_responses[0],
+ refresh,
+ cancel,
+ result_type=struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct,
+ )
+
+ return operation_future, refresh, cancel
+
+
+@pytest.mark.asyncio
+async def test_constructor():
+ future, refresh, _ = make_operation_future()
+
+ assert future.operation == refresh.responses[0]
+ assert future.operation.done is False
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.metadata is None
+ assert await future.running()
+
+
+def test_metadata():
+ expected_metadata = struct_pb2.Struct()
+ future, _, _ = make_operation_future(
+ [make_operation_proto(metadata=expected_metadata)]
+ )
+
+ assert future.metadata == expected_metadata
+
+
+@pytest.mark.asyncio
+async def test_cancellation():
+ responses = [
+ make_operation_proto(),
+ # Second response indicates that the operation was cancelled.
+ make_operation_proto(
+ done=True, error=status_pb2.Status(code=code_pb2.CANCELLED)
+ ),
+ ]
+ future, _, cancel = make_operation_future(responses)
+
+ assert await future.cancel()
+ assert await future.cancelled()
+ cancel.assert_called_once_with()
+
+ # Cancelling twice should have no effect.
+ assert not await future.cancel()
+ cancel.assert_called_once_with()
+
+
+@pytest.mark.asyncio
+async def test_result():
+ expected_result = struct_pb2.Struct()
+ responses = [
+ make_operation_proto(),
+ # Second operation response includes the result.
+ make_operation_proto(done=True, response=expected_result),
+ ]
+ future, _, _ = make_operation_future(responses)
+
+ result = await future.result()
+
+ assert result == expected_result
+ assert await future.done()
+
+
+@pytest.mark.asyncio
+async def test_done_w_retry():
+ RETRY_PREDICATE = retry_async.if_exception_type(exceptions.TooManyRequests)
+ test_retry = retry_async.AsyncRetry(predicate=RETRY_PREDICATE)
+
+ expected_result = struct_pb2.Struct()
+ responses = [
+ make_operation_proto(),
+ # Second operation response includes the result.
+ make_operation_proto(done=True, response=expected_result),
+ ]
+ future, refresh, _ = make_operation_future(responses)
+
+ await future.done(retry=test_retry)
+ refresh.assert_called_once_with(retry=test_retry)
+
+
+@pytest.mark.asyncio
+async def test_exception():
+ expected_exception = status_pb2.Status(message="meep")
+ responses = [
+ make_operation_proto(),
+ # Second operation response includes the error.
+ make_operation_proto(done=True, error=expected_exception),
+ ]
+ future, _, _ = make_operation_future(responses)
+
+ exception = await future.exception()
+
+ assert expected_exception.message in "{!r}".format(exception)
+
+
+@mock.patch("asyncio.sleep", autospec=True)
+@pytest.mark.asyncio
+async def test_unexpected_result(unused_sleep):
+ responses = [
+ make_operation_proto(),
+ # Second operation response is done, but has not error or response.
+ make_operation_proto(done=True),
+ ]
+ future, _, _ = make_operation_future(responses)
+
+ exception = await future.exception()
+
+ assert "Unexpected state" in "{!r}".format(exception)
+
+
+def test_from_gapic():
+ operation_proto = make_operation_proto(done=True)
+ operations_client = mock.create_autospec(
+ operations_v1.OperationsClient, instance=True
+ )
+
+ future = operation_async.from_gapic(
+ operation_proto,
+ operations_client,
+ struct_pb2.Struct,
+ metadata_type=struct_pb2.Struct,
+ )
+
+ assert future._result_type == struct_pb2.Struct
+ assert future._metadata_type == struct_pb2.Struct
+ assert future.operation.name == TEST_OPERATION_NAME
+ assert future.done
+
+
+def test_deserialize():
+ op = make_operation_proto(name="foobarbaz")
+ serialized = op.SerializeToString()
+ deserialized_op = operation_async.AsyncOperation.deserialize(serialized)
+ assert op.name == deserialized_op.name
+ assert type(op) is type(deserialized_op)
diff --git a/tests/asyncio/test_page_iterator_async.py b/tests/asyncio/test_page_iterator_async.py
new file mode 100644
index 0000000..42fac2a
--- /dev/null
+++ b/tests/asyncio/test_page_iterator_async.py
@@ -0,0 +1,261 @@
+# Copyright 2015 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import inspect
+
+import mock
+import pytest
+
+from google.api_core import page_iterator_async
+
+
+class PageAsyncIteratorImpl(page_iterator_async.AsyncIterator):
+
+ async def _next_page(self):
+ return mock.create_autospec(page_iterator_async.Page, instance=True)
+
+
+class TestAsyncIterator:
+
+ def test_constructor(self):
+ client = mock.sentinel.client
+ item_to_value = mock.sentinel.item_to_value
+ token = "ab13nceor03"
+ max_results = 1337
+
+ iterator = PageAsyncIteratorImpl(
+ client, item_to_value, page_token=token, max_results=max_results
+ )
+
+ assert not iterator._started
+ assert iterator.client is client
+ assert iterator.item_to_value == item_to_value
+ assert iterator.max_results == max_results
+ # Changing attributes.
+ assert iterator.page_number == 0
+ assert iterator.next_page_token == token
+ assert iterator.num_results == 0
+
+ def test_pages_property_starts(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+
+ assert not iterator._started
+
+ assert inspect.isasyncgen(iterator.pages)
+
+ assert iterator._started
+
+ def test_pages_property_restart(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+
+ assert iterator.pages
+
+ # Make sure we cannot restart.
+ with pytest.raises(ValueError):
+ assert iterator.pages
+
+ @pytest.mark.asyncio
+ async def test__page_aiter_increment(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+ page = page_iterator_async.Page(
+ iterator, ("item",), page_iterator_async._item_to_value_identity)
+ iterator._next_page = mock.AsyncMock(side_effect=[page, None])
+
+ assert iterator.num_results == 0
+
+ page_aiter = iterator._page_aiter(increment=True)
+ await page_aiter.__anext__()
+
+ assert iterator.num_results == 1
+
+ @pytest.mark.asyncio
+ async def test__page_aiter_no_increment(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+
+ assert iterator.num_results == 0
+
+ page_aiter = iterator._page_aiter(increment=False)
+ await page_aiter.__anext__()
+
+ # results should still be 0 after fetching a page.
+ assert iterator.num_results == 0
+
+ @pytest.mark.asyncio
+ async def test__items_aiter(self):
+ # Items to be returned.
+ item1 = 17
+ item2 = 100
+ item3 = 211
+
+ # Make pages from mock responses
+ parent = mock.sentinel.parent
+ page1 = page_iterator_async.Page(
+ parent, (item1, item2), page_iterator_async._item_to_value_identity)
+ page2 = page_iterator_async.Page(
+ parent, (item3,), page_iterator_async._item_to_value_identity)
+
+ iterator = PageAsyncIteratorImpl(None, None)
+ iterator._next_page = mock.AsyncMock(side_effect=[page1, page2, None])
+
+ items_aiter = iterator._items_aiter()
+
+ assert inspect.isasyncgen(items_aiter)
+
+ # Consume items and check the state of the iterator.
+ assert iterator.num_results == 0
+ assert await items_aiter.__anext__() == item1
+ assert iterator.num_results == 1
+
+ assert await items_aiter.__anext__() == item2
+ assert iterator.num_results == 2
+
+ assert await items_aiter.__anext__() == item3
+ assert iterator.num_results == 3
+
+ with pytest.raises(StopAsyncIteration):
+ await items_aiter.__anext__()
+
+ @pytest.mark.asyncio
+ async def test___aiter__(self):
+ async_iterator = PageAsyncIteratorImpl(None, None)
+ async_iterator._next_page = mock.AsyncMock(side_effect=[(1, 2), (3,), None])
+
+ assert not async_iterator._started
+
+ result = []
+ async for item in async_iterator:
+ result.append(item)
+
+ assert result == [1, 2, 3]
+ assert async_iterator._started
+
+ def test___aiter__restart(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+
+ iterator.__aiter__()
+
+ # Make sure we cannot restart.
+ with pytest.raises(ValueError):
+ iterator.__aiter__()
+
+ def test___aiter___restart_after_page(self):
+ iterator = PageAsyncIteratorImpl(None, None)
+
+ assert iterator.pages
+
+ # Make sure we cannot restart after starting the page iterator
+ with pytest.raises(ValueError):
+ iterator.__aiter__()
+
+
+class TestAsyncGRPCIterator(object):
+
+ def test_constructor(self):
+ client = mock.sentinel.client
+ items_field = "items"
+ iterator = page_iterator_async.AsyncGRPCIterator(
+ client, mock.sentinel.method, mock.sentinel.request, items_field
+ )
+
+ assert not iterator._started
+ assert iterator.client is client
+ assert iterator.max_results is None
+ assert iterator.item_to_value is page_iterator_async._item_to_value_identity
+ assert iterator._method == mock.sentinel.method
+ assert iterator._request == mock.sentinel.request
+ assert iterator._items_field == items_field
+ assert (
+ iterator._request_token_field
+ == page_iterator_async.AsyncGRPCIterator._DEFAULT_REQUEST_TOKEN_FIELD
+ )
+ assert (
+ iterator._response_token_field
+ == page_iterator_async.AsyncGRPCIterator._DEFAULT_RESPONSE_TOKEN_FIELD
+ )
+ # Changing attributes.
+ assert iterator.page_number == 0
+ assert iterator.next_page_token is None
+ assert iterator.num_results == 0
+
+ def test_constructor_options(self):
+ client = mock.sentinel.client
+ items_field = "items"
+ request_field = "request"
+ response_field = "response"
+ iterator = page_iterator_async.AsyncGRPCIterator(
+ client,
+ mock.sentinel.method,
+ mock.sentinel.request,
+ items_field,
+ item_to_value=mock.sentinel.item_to_value,
+ request_token_field=request_field,
+ response_token_field=response_field,
+ max_results=42,
+ )
+
+ assert iterator.client is client
+ assert iterator.max_results == 42
+ assert iterator.item_to_value is mock.sentinel.item_to_value
+ assert iterator._method == mock.sentinel.method
+ assert iterator._request == mock.sentinel.request
+ assert iterator._items_field == items_field
+ assert iterator._request_token_field == request_field
+ assert iterator._response_token_field == response_field
+
+ @pytest.mark.asyncio
+ async def test_iterate(self):
+ request = mock.Mock(spec=["page_token"], page_token=None)
+ response1 = mock.Mock(items=["a", "b"], next_page_token="1")
+ response2 = mock.Mock(items=["c"], next_page_token="2")
+ response3 = mock.Mock(items=["d"], next_page_token="")
+ method = mock.AsyncMock(side_effect=[response1, response2, response3])
+ iterator = page_iterator_async.AsyncGRPCIterator(
+ mock.sentinel.client, method, request, "items"
+ )
+
+ assert iterator.num_results == 0
+
+ items = []
+ async for item in iterator:
+ items.append(item)
+
+ assert items == ["a", "b", "c", "d"]
+
+ method.assert_called_with(request)
+ assert method.call_count == 3
+ assert request.page_token == "2"
+
+ @pytest.mark.asyncio
+ async def test_iterate_with_max_results(self):
+ request = mock.Mock(spec=["page_token"], page_token=None)
+ response1 = mock.Mock(items=["a", "b"], next_page_token="1")
+ response2 = mock.Mock(items=["c"], next_page_token="2")
+ response3 = mock.Mock(items=["d"], next_page_token="")
+ method = mock.AsyncMock(side_effect=[response1, response2, response3])
+ iterator = page_iterator_async.AsyncGRPCIterator(
+ mock.sentinel.client, method, request, "items", max_results=3
+ )
+
+ assert iterator.num_results == 0
+
+ items = []
+ async for item in iterator:
+ items.append(item)
+
+ assert items == ["a", "b", "c"]
+ assert iterator.num_results == 3
+
+ method.assert_called_with(request)
+ assert method.call_count == 2
+ assert request.page_token == "1"