aboutsummaryrefslogtreecommitdiff
path: root/tests/unit/test_bidi.py
diff options
context:
space:
mode:
authorchojoyce <chojoyce@google.com>2022-01-05 05:23:52 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-01-05 05:23:52 +0000
commite11dd659005b21df839d0acad276b52f449e4794 (patch)
tree412a59ade78bcacc2449642d7cfd7ccb6a92d8e4 /tests/unit/test_bidi.py
parentd4dcb747f8ac4df33c321b19ee1c511f6e47707e (diff)
parente82c234cd2a425bd77dc184ad3003fd9c37e57fe (diff)
downloadpython-api-core-e11dd659005b21df839d0acad276b52f449e4794.tar.gz
Merge platform/external/python/python-api-core v2.3.0 am: 4e81cd9cc2 am: 95950852f5 am: 01e15770ed am: e82c234cd2
Original change: https://android-review.googlesource.com/c/platform/external/python/python-api-core/+/1931601 Change-Id: Id40439a8a0e0e860ca38e36d776fdd36ce2bc163
Diffstat (limited to 'tests/unit/test_bidi.py')
-rw-r--r--tests/unit/test_bidi.py869
1 files changed, 869 insertions, 0 deletions
diff --git a/tests/unit/test_bidi.py b/tests/unit/test_bidi.py
new file mode 100644
index 0000000..7fb1620
--- /dev/null
+++ b/tests/unit/test_bidi.py
@@ -0,0 +1,869 @@
+# Copyright 2018, Google LLC All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import queue
+import threading
+
+import mock
+import pytest
+
+try:
+ import grpc
+except ImportError:
+ pytest.skip("No GRPC", allow_module_level=True)
+
+from google.api_core import bidi
+from google.api_core import exceptions
+
+
+class Test_RequestQueueGenerator(object):
+ def test_bounded_consume(self):
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = True
+
+ def queue_generator(rpc):
+ yield mock.sentinel.A
+ yield queue.Empty()
+ yield mock.sentinel.B
+ rpc.is_active.return_value = False
+ yield mock.sentinel.C
+
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = queue_generator(call)
+
+ generator = bidi._RequestQueueGenerator(q)
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == [mock.sentinel.A, mock.sentinel.B]
+
+ def test_yield_initial_and_exit(self):
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = queue.Empty()
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = False
+
+ generator = bidi._RequestQueueGenerator(q, initial_request=mock.sentinel.A)
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == [mock.sentinel.A]
+
+ def test_yield_initial_callable_and_exit(self):
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = queue.Empty()
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = False
+
+ generator = bidi._RequestQueueGenerator(
+ q, initial_request=lambda: mock.sentinel.A
+ )
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == [mock.sentinel.A]
+
+ def test_exit_when_inactive_with_item(self):
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = [mock.sentinel.A, queue.Empty()]
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = False
+
+ generator = bidi._RequestQueueGenerator(q)
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == []
+ # Make sure it put the item back.
+ q.put.assert_called_once_with(mock.sentinel.A)
+
+ def test_exit_when_inactive_empty(self):
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = queue.Empty()
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = False
+
+ generator = bidi._RequestQueueGenerator(q)
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == []
+
+ def test_exit_with_stop(self):
+ q = mock.create_autospec(queue.Queue, instance=True)
+ q.get.side_effect = [None, queue.Empty()]
+ call = mock.create_autospec(grpc.Call, instance=True)
+ call.is_active.return_value = True
+
+ generator = bidi._RequestQueueGenerator(q)
+ generator.call = call
+
+ items = list(generator)
+
+ assert items == []
+
+
+class Test_Throttle(object):
+ def test_repr(self):
+ delta = datetime.timedelta(seconds=4.5)
+ instance = bidi._Throttle(access_limit=42, time_window=delta)
+ assert repr(instance) == "_Throttle(access_limit=42, time_window={})".format(
+ repr(delta)
+ )
+
+ def test_raises_error_on_invalid_init_arguments(self):
+ with pytest.raises(ValueError) as exc_info:
+ bidi._Throttle(access_limit=10, time_window=datetime.timedelta(seconds=0.0))
+ assert "time_window" in str(exc_info.value)
+ assert "must be a positive timedelta" in str(exc_info.value)
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi._Throttle(access_limit=0, time_window=datetime.timedelta(seconds=10))
+ assert "access_limit" in str(exc_info.value)
+ assert "must be positive" in str(exc_info.value)
+
+ def test_does_not_delay_entry_attempts_under_threshold(self):
+ throttle = bidi._Throttle(
+ access_limit=3, time_window=datetime.timedelta(seconds=1)
+ )
+ entries = []
+
+ for _ in range(3):
+ with throttle as time_waited:
+ entry_info = {
+ "entered_at": datetime.datetime.now(),
+ "reported_wait": time_waited,
+ }
+ entries.append(entry_info)
+
+ # check the reported wait times ...
+ assert all(entry["reported_wait"] == 0.0 for entry in entries)
+
+ # .. and the actual wait times
+ delta = entries[1]["entered_at"] - entries[0]["entered_at"]
+ assert delta.total_seconds() < 0.1
+ delta = entries[2]["entered_at"] - entries[1]["entered_at"]
+ assert delta.total_seconds() < 0.1
+
+ def test_delays_entry_attempts_above_threshold(self):
+ throttle = bidi._Throttle(
+ access_limit=3, time_window=datetime.timedelta(seconds=1)
+ )
+ entries = []
+
+ for _ in range(6):
+ with throttle as time_waited:
+ entry_info = {
+ "entered_at": datetime.datetime.now(),
+ "reported_wait": time_waited,
+ }
+ entries.append(entry_info)
+
+ # For each group of 4 consecutive entries the time difference between
+ # the first and the last entry must have been greater than time_window,
+ # because a maximum of 3 are allowed in each time_window.
+ for i, entry in enumerate(entries[3:], start=3):
+ first_entry = entries[i - 3]
+ delta = entry["entered_at"] - first_entry["entered_at"]
+ assert delta.total_seconds() > 1.0
+
+ # check the reported wait times
+ # (NOTE: not using assert all(...), b/c the coverage check would complain)
+ for i, entry in enumerate(entries):
+ if i != 3:
+ assert entry["reported_wait"] == 0.0
+
+ # The delayed entry is expected to have been delayed for a significant
+ # chunk of the full second, and the actual and reported delay times
+ # should reflect that.
+ assert entries[3]["reported_wait"] > 0.7
+ delta = entries[3]["entered_at"] - entries[2]["entered_at"]
+ assert delta.total_seconds() > 0.7
+
+
+class _CallAndFuture(grpc.Call, grpc.Future):
+ pass
+
+
+def make_rpc():
+ """Makes a mock RPC used to test Bidi classes."""
+ call = mock.create_autospec(_CallAndFuture, instance=True)
+ rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
+
+ def rpc_side_effect(request, metadata=None):
+ call.is_active.return_value = True
+ call.request = request
+ call.metadata = metadata
+ return call
+
+ rpc.side_effect = rpc_side_effect
+
+ def cancel_side_effect():
+ call.is_active.return_value = False
+
+ call.cancel.side_effect = cancel_side_effect
+
+ return rpc, call
+
+
+class ClosedCall(object):
+ def __init__(self, exception):
+ self.exception = exception
+
+ def __next__(self):
+ raise self.exception
+
+ def is_active(self):
+ return False
+
+
+class TestBidiRpc(object):
+ def test_initial_state(self):
+ bidi_rpc = bidi.BidiRpc(None)
+
+ assert bidi_rpc.is_active is False
+
+ def test_done_callbacks(self):
+ bidi_rpc = bidi.BidiRpc(None)
+ callback = mock.Mock(spec=["__call__"])
+
+ bidi_rpc.add_done_callback(callback)
+ bidi_rpc._on_call_done(mock.sentinel.future)
+
+ callback.assert_called_once_with(mock.sentinel.future)
+
+ def test_metadata(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc, metadata=mock.sentinel.A)
+ assert bidi_rpc._rpc_metadata == mock.sentinel.A
+
+ bidi_rpc.open()
+ assert bidi_rpc.call == call
+ assert bidi_rpc.call.metadata == mock.sentinel.A
+
+ def test_open(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+
+ bidi_rpc.open()
+
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active
+ call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done)
+
+ def test_open_error_already_open(self):
+ rpc, _ = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+
+ bidi_rpc.open()
+
+ with pytest.raises(ValueError):
+ bidi_rpc.open()
+
+ def test_close(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+ bidi_rpc.open()
+
+ bidi_rpc.close()
+
+ call.cancel.assert_called_once()
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active is False
+ # ensure the request queue was signaled to stop.
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is None
+
+ def test_close_no_rpc(self):
+ bidi_rpc = bidi.BidiRpc(None)
+ bidi_rpc.close()
+
+ def test_send(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+ bidi_rpc.open()
+
+ bidi_rpc.send(mock.sentinel.request)
+
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is mock.sentinel.request
+
+ def test_send_not_open(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+
+ with pytest.raises(ValueError):
+ bidi_rpc.send(mock.sentinel.request)
+
+ def test_send_dead_rpc(self):
+ error = ValueError()
+ bidi_rpc = bidi.BidiRpc(None)
+ bidi_rpc.call = ClosedCall(error)
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi_rpc.send(mock.sentinel.request)
+
+ assert exc_info.value == error
+
+ def test_recv(self):
+ bidi_rpc = bidi.BidiRpc(None)
+ bidi_rpc.call = iter([mock.sentinel.response])
+
+ response = bidi_rpc.recv()
+
+ assert response == mock.sentinel.response
+
+ def test_recv_not_open(self):
+ rpc, call = make_rpc()
+ bidi_rpc = bidi.BidiRpc(rpc)
+
+ with pytest.raises(ValueError):
+ bidi_rpc.recv()
+
+
+class CallStub(object):
+ def __init__(self, values, active=True):
+ self.values = iter(values)
+ self._is_active = active
+ self.cancelled = False
+
+ def __next__(self):
+ item = next(self.values)
+ if isinstance(item, Exception):
+ self._is_active = False
+ raise item
+ return item
+
+ def is_active(self):
+ return self._is_active
+
+ def add_done_callback(self, callback):
+ pass
+
+ def cancel(self):
+ self.cancelled = True
+
+
+class TestResumableBidiRpc(object):
+ def test_ctor_defaults(self):
+ start_rpc = mock.Mock()
+ should_recover = mock.Mock()
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+
+ assert bidi_rpc.is_active is False
+ assert bidi_rpc._finalized is False
+ assert bidi_rpc._start_rpc is start_rpc
+ assert bidi_rpc._should_recover is should_recover
+ assert bidi_rpc._should_terminate is bidi._never_terminate
+ assert bidi_rpc._initial_request is None
+ assert bidi_rpc._rpc_metadata is None
+ assert bidi_rpc._reopen_throttle is None
+
+ def test_ctor_explicit(self):
+ start_rpc = mock.Mock()
+ should_recover = mock.Mock()
+ should_terminate = mock.Mock()
+ initial_request = mock.Mock()
+ metadata = {"x-foo": "bar"}
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc,
+ should_recover,
+ should_terminate=should_terminate,
+ initial_request=initial_request,
+ metadata=metadata,
+ throttle_reopen=True,
+ )
+
+ assert bidi_rpc.is_active is False
+ assert bidi_rpc._finalized is False
+ assert bidi_rpc._should_recover is should_recover
+ assert bidi_rpc._should_terminate is should_terminate
+ assert bidi_rpc._initial_request is initial_request
+ assert bidi_rpc._rpc_metadata == metadata
+ assert isinstance(bidi_rpc._reopen_throttle, bidi._Throttle)
+
+ def test_done_callbacks_terminate(self):
+ cancellation = mock.Mock()
+ start_rpc = mock.Mock()
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ should_terminate = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc, should_recover, should_terminate=should_terminate
+ )
+ callback = mock.Mock(spec=["__call__"])
+
+ bidi_rpc.add_done_callback(callback)
+ bidi_rpc._on_call_done(cancellation)
+
+ should_terminate.assert_called_once_with(cancellation)
+ should_recover.assert_not_called()
+ callback.assert_called_once_with(cancellation)
+ assert not bidi_rpc.is_active
+
+ def test_done_callbacks_recoverable(self):
+ start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+ callback = mock.Mock(spec=["__call__"])
+
+ bidi_rpc.add_done_callback(callback)
+ bidi_rpc._on_call_done(mock.sentinel.future)
+
+ callback.assert_not_called()
+ start_rpc.assert_called_once()
+ should_recover.assert_called_once_with(mock.sentinel.future)
+ assert bidi_rpc.is_active
+
+ def test_done_callbacks_non_recoverable(self):
+ start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+ callback = mock.Mock(spec=["__call__"])
+
+ bidi_rpc.add_done_callback(callback)
+ bidi_rpc._on_call_done(mock.sentinel.future)
+
+ callback.assert_called_once_with(mock.sentinel.future)
+ should_recover.assert_called_once_with(mock.sentinel.future)
+ assert not bidi_rpc.is_active
+
+ def test_send_terminate(self):
+ cancellation = ValueError()
+ call_1 = CallStub([cancellation], active=False)
+ call_2 = CallStub([])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ should_terminate = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc, should_recover, should_terminate=should_terminate
+ )
+
+ bidi_rpc.open()
+
+ bidi_rpc.send(mock.sentinel.request)
+
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is None
+
+ should_recover.assert_not_called()
+ should_terminate.assert_called_once_with(cancellation)
+ assert bidi_rpc.call == call_1
+ assert bidi_rpc.is_active is False
+ assert call_1.cancelled is True
+
+ def test_send_recover(self):
+ error = ValueError()
+ call_1 = CallStub([error], active=False)
+ call_2 = CallStub([])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+
+ bidi_rpc.open()
+
+ bidi_rpc.send(mock.sentinel.request)
+
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is mock.sentinel.request
+
+ should_recover.assert_called_once_with(error)
+ assert bidi_rpc.call == call_2
+ assert bidi_rpc.is_active is True
+
+ def test_send_failure(self):
+ error = ValueError()
+ call = CallStub([error], active=False)
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+
+ bidi_rpc.open()
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi_rpc.send(mock.sentinel.request)
+
+ assert exc_info.value == error
+ should_recover.assert_called_once_with(error)
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active is False
+ assert call.cancelled is True
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is None
+
+ def test_recv_terminate(self):
+ cancellation = ValueError()
+ call = CallStub([cancellation])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ should_terminate = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc, should_recover, should_terminate=should_terminate
+ )
+
+ bidi_rpc.open()
+
+ bidi_rpc.recv()
+
+ should_recover.assert_not_called()
+ should_terminate.assert_called_once_with(cancellation)
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active is False
+ assert call.cancelled is True
+
+ def test_recv_recover(self):
+ error = ValueError()
+ call_1 = CallStub([1, error])
+ call_2 = CallStub([2, 3])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+
+ bidi_rpc.open()
+
+ values = []
+ for n in range(3):
+ values.append(bidi_rpc.recv())
+
+ assert values == [1, 2, 3]
+ should_recover.assert_called_once_with(error)
+ assert bidi_rpc.call == call_2
+ assert bidi_rpc.is_active is True
+
+ def test_recv_recover_already_recovered(self):
+ call_1 = CallStub([])
+ call_2 = CallStub([])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
+ )
+ callback = mock.Mock()
+ callback.return_value = True
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, callback)
+
+ bidi_rpc.open()
+
+ bidi_rpc._reopen()
+
+ assert bidi_rpc.call is call_1
+ assert bidi_rpc.is_active is True
+
+ def test_recv_failure(self):
+ error = ValueError()
+ call = CallStub([error])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+
+ bidi_rpc.open()
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi_rpc.recv()
+
+ assert exc_info.value == error
+ should_recover.assert_called_once_with(error)
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active is False
+ assert call.cancelled is True
+
+ def test_close(self):
+ call = mock.create_autospec(_CallAndFuture, instance=True)
+
+ def cancel_side_effect():
+ call.is_active.return_value = False
+
+ call.cancel.side_effect = cancel_side_effect
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+ bidi_rpc.open()
+
+ bidi_rpc.close()
+
+ should_recover.assert_not_called()
+ call.cancel.assert_called_once()
+ assert bidi_rpc.call == call
+ assert bidi_rpc.is_active is False
+ # ensure the request queue was signaled to stop.
+ assert bidi_rpc.pending_requests == 1
+ assert bidi_rpc._request_queue.get() is None
+ assert bidi_rpc._finalized
+
+ def test_reopen_failure_on_rpc_restart(self):
+ error1 = ValueError("1")
+ error2 = ValueError("2")
+ call = CallStub([error1])
+ # Invoking start RPC a second time will trigger an error.
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, side_effect=[call, error2]
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ callback = mock.Mock(spec=["__call__"])
+
+ bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
+ bidi_rpc.add_done_callback(callback)
+
+ bidi_rpc.open()
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi_rpc.recv()
+
+ assert exc_info.value == error2
+ should_recover.assert_called_once_with(error1)
+ assert bidi_rpc.call is None
+ assert bidi_rpc.is_active is False
+ callback.assert_called_once_with(error2)
+
+ def test_using_throttle_on_reopen_requests(self):
+ call = CallStub([])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc, should_recover, throttle_reopen=True
+ )
+
+ patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
+ with patcher as mock_enter:
+ bidi_rpc._reopen()
+
+ mock_enter.assert_called_once()
+
+ def test_send_not_open(self):
+ bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
+
+ with pytest.raises(ValueError):
+ bidi_rpc.send(mock.sentinel.request)
+
+ def test_recv_not_open(self):
+ bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
+
+ with pytest.raises(ValueError):
+ bidi_rpc.recv()
+
+ def test_finalize_idempotent(self):
+ error1 = ValueError("1")
+ error2 = ValueError("2")
+ callback = mock.Mock(spec=["__call__"])
+ should_recover = mock.Mock(spec=["__call__"], return_value=False)
+
+ bidi_rpc = bidi.ResumableBidiRpc(mock.sentinel.start_rpc, should_recover)
+
+ bidi_rpc.add_done_callback(callback)
+
+ bidi_rpc._on_call_done(error1)
+ bidi_rpc._on_call_done(error2)
+
+ callback.assert_called_once_with(error1)
+
+
+class TestBackgroundConsumer(object):
+ def test_consume_once_then_exit(self):
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+ bidi_rpc.recv.side_effect = [mock.sentinel.response_1]
+ recved = threading.Event()
+
+ def on_response(response):
+ assert response == mock.sentinel.response_1
+ bidi_rpc.is_active = False
+ recved.set()
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
+
+ consumer.start()
+
+ recved.wait()
+
+ bidi_rpc.recv.assert_called_once()
+ assert bidi_rpc.is_active is False
+
+ consumer.stop()
+
+ bidi_rpc.close.assert_called_once()
+ assert consumer.is_active is False
+
+ def test_pause_resume_and_close(self):
+ # This test is relatively complex. It attempts to start the consumer,
+ # consume one item, pause the consumer, check the state of the world,
+ # then resume the consumer. Doing this in a deterministic fashion
+ # requires a bit more mocking and patching than usual.
+
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+
+ def close_side_effect():
+ bidi_rpc.is_active = False
+
+ bidi_rpc.close.side_effect = close_side_effect
+
+ # These are used to coordinate the two threads to ensure deterministic
+ # execution.
+ should_continue = threading.Event()
+ responses_and_events = {
+ mock.sentinel.response_1: threading.Event(),
+ mock.sentinel.response_2: threading.Event(),
+ }
+ bidi_rpc.recv.side_effect = [mock.sentinel.response_1, mock.sentinel.response_2]
+
+ recved_responses = []
+ consumer = None
+
+ def on_response(response):
+ if response == mock.sentinel.response_1:
+ consumer.pause()
+
+ recved_responses.append(response)
+ responses_and_events[response].set()
+ should_continue.wait()
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
+
+ consumer.start()
+
+ # Wait for the first response to be recved.
+ responses_and_events[mock.sentinel.response_1].wait()
+
+ # Ensure only one item has been recved and that the consumer is paused.
+ assert recved_responses == [mock.sentinel.response_1]
+ assert consumer.is_paused is True
+ assert consumer.is_active is True
+
+ # Unpause the consumer, wait for the second item, then close the
+ # consumer.
+ should_continue.set()
+ consumer.resume()
+
+ responses_and_events[mock.sentinel.response_2].wait()
+
+ assert recved_responses == [mock.sentinel.response_1, mock.sentinel.response_2]
+
+ consumer.stop()
+
+ assert consumer.is_active is False
+
+ def test_wake_on_error(self):
+ should_continue = threading.Event()
+
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+ bidi_rpc.add_done_callback.side_effect = lambda _: should_continue.set()
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, mock.sentinel.on_response)
+
+ # Start the consumer paused, which should immediately put it into wait
+ # state.
+ consumer.pause()
+ consumer.start()
+
+ # Wait for add_done_callback to be called
+ should_continue.wait()
+ bidi_rpc.add_done_callback.assert_called_once_with(consumer._on_call_done)
+
+ # The consumer should now be blocked on waiting to be unpaused.
+ assert consumer.is_active
+ assert consumer.is_paused
+
+ # Trigger the done callback, it should unpause the consumer and cause
+ # it to exit.
+ bidi_rpc.is_active = False
+ consumer._on_call_done(bidi_rpc)
+
+ # It may take a few cycles for the thread to exit.
+ while consumer.is_active:
+ pass
+
+ def test_consumer_expected_error(self, caplog):
+ caplog.set_level(logging.DEBUG)
+
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+ bidi_rpc.recv.side_effect = exceptions.ServiceUnavailable("Gone away")
+
+ on_response = mock.Mock(spec=["__call__"])
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
+
+ consumer.start()
+
+ # Wait for the consumer's thread to exit.
+ while consumer.is_active:
+ pass
+
+ on_response.assert_not_called()
+ bidi_rpc.recv.assert_called_once()
+ assert "caught error" in caplog.text
+
+ def test_consumer_unexpected_error(self, caplog):
+ caplog.set_level(logging.DEBUG)
+
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+ bidi_rpc.recv.side_effect = ValueError()
+
+ on_response = mock.Mock(spec=["__call__"])
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
+
+ consumer.start()
+
+ # Wait for the consumer's thread to exit.
+ while consumer.is_active:
+ pass # pragma: NO COVER (race condition)
+
+ on_response.assert_not_called()
+ bidi_rpc.recv.assert_called_once()
+ assert "caught unexpected exception" in caplog.text
+
+ def test_double_stop(self, caplog):
+ caplog.set_level(logging.DEBUG)
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+ on_response = mock.Mock(spec=["__call__"])
+
+ def close_side_effect():
+ bidi_rpc.is_active = False
+
+ bidi_rpc.close.side_effect = close_side_effect
+
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
+
+ consumer.start()
+ assert consumer.is_active is True
+
+ consumer.stop()
+ assert consumer.is_active is False
+
+ # calling stop twice should not result in an error.
+ consumer.stop()