diff options
author | Peter Lamut <plamut@users.noreply.github.com> | 2019-06-18 20:25:44 +0200 |
---|---|---|
committer | Solomon Duskis <sduskis@google.com> | 2019-06-18 14:25:44 -0400 |
commit | 7515c07f4e6b42d42cbc1c4606a20aeca278853a (patch) | |
tree | edab5dd98f6146bb82ac8890dfeeefa2367f85b9 /tests/unit/test_bidi.py | |
parent | 2675514a55b6ac76bb385b84c44db02d6805189b (diff) | |
download | python-api-core-7515c07f4e6b42d42cbc1c4606a20aeca278853a.tar.gz |
Core: Mitigate busy reopen loop in ResumableBidiRpc consuming 100% CPU (#8193)
* Add bidi._Throttle helper class
* Add optional reopen throttling to ResumableBidiRpc
* Enable Bidi reopen throttling in SPM
* Change bidi._Throttle signature
The commit renames the entry_cap parameter to access_limit, and
changes the type of the time_window argument from float to timedelta.
Diffstat (limited to 'tests/unit/test_bidi.py')
-rw-r--r-- | tests/unit/test_bidi.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/tests/unit/test_bidi.py b/tests/unit/test_bidi.py index 08d2021..8e9f262 100644 --- a/tests/unit/test_bidi.py +++ b/tests/unit/test_bidi.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import logging import threading @@ -116,6 +117,87 @@ class Test_RequestQueueGenerator(object): assert items == [] +class Test_Throttle(object): + def test_repr(self): + delta = datetime.timedelta(seconds=4.5) + instance = bidi._Throttle(access_limit=42, time_window=delta) + assert repr(instance) == \ + "_Throttle(access_limit=42, time_window={})".format(repr(delta)) + + def test_raises_error_on_invalid_init_arguments(self): + with pytest.raises(ValueError) as exc_info: + bidi._Throttle( + access_limit=10, time_window=datetime.timedelta(seconds=0.0) + ) + assert "time_window" in str(exc_info.value) + assert "must be a positive timedelta" in str(exc_info.value) + + with pytest.raises(ValueError) as exc_info: + bidi._Throttle( + access_limit=0, time_window=datetime.timedelta(seconds=10) + ) + assert "access_limit" in str(exc_info.value) + assert "must be positive" in str(exc_info.value) + + def test_does_not_delay_entry_attempts_under_threshold(self): + throttle = bidi._Throttle( + access_limit=3, time_window=datetime.timedelta(seconds=1) + ) + entries = [] + + for _ in range(3): + with throttle as time_waited: + entry_info = { + "entered_at": datetime.datetime.now(), + "reported_wait": time_waited, + } + entries.append(entry_info) + + # check the reported wait times ... + assert all(entry["reported_wait"] == 0.0 for entry in entries) + + # .. and the actual wait times + delta = entries[1]["entered_at"] - entries[0]["entered_at"] + assert delta.total_seconds() < 0.1 + delta = entries[2]["entered_at"] - entries[1]["entered_at"] + assert delta.total_seconds() < 0.1 + + def test_delays_entry_attempts_above_threshold(self): + throttle = bidi._Throttle( + access_limit=3, time_window=datetime.timedelta(seconds=1) + ) + entries = [] + + for _ in range(6): + with throttle as time_waited: + entry_info = { + "entered_at": datetime.datetime.now(), + "reported_wait": time_waited, + } + entries.append(entry_info) + + # For each group of 4 consecutive entries the time difference between + # the first and the last entry must have been greater than time_window, + # because a maximum of 3 are allowed in each time_window. + for i, entry in enumerate(entries[3:], start=3): + first_entry = entries[i - 3] + delta = entry["entered_at"] - first_entry["entered_at"] + assert delta.total_seconds() > 1.0 + + # check the reported wait times + # (NOTE: not using assert all(...), b/c the coverage check would complain) + for i, entry in enumerate(entries): + if i != 3: + assert entry["reported_wait"] == 0.0 + + # The delayed entry is expected to have been delayed for a significant + # chunk of the full second, and the actual and reported delay times + # should reflect that. + assert entries[3]["reported_wait"] > 0.7 + delta = entries[3]["entered_at"] - entries[2]["entered_at"] + assert delta.total_seconds() > 0.7 + + class _CallAndFuture(grpc.Call, grpc.Future): pass @@ -442,6 +524,22 @@ class TestResumableBidiRpc(object): assert bidi_rpc.is_active is False callback.assert_called_once_with(error2) + def test_using_throttle_on_reopen_requests(self): + call = CallStub([]) + start_rpc = mock.create_autospec( + grpc.StreamStreamMultiCallable, instance=True, return_value=call + ) + should_recover = mock.Mock(spec=["__call__"], return_value=True) + bidi_rpc = bidi.ResumableBidiRpc( + start_rpc, should_recover, throttle_reopen=True + ) + + patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__") + with patcher as mock_enter: + bidi_rpc._reopen() + + mock_enter.assert_called_once() + def test_send_not_open(self): bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False) |