aboutsummaryrefslogtreecommitdiff
path: root/tests/unit/test_bidi.py
diff options
context:
space:
mode:
authorPeter Lamut <plamut@users.noreply.github.com>2019-06-18 20:25:44 +0200
committerSolomon Duskis <sduskis@google.com>2019-06-18 14:25:44 -0400
commit7515c07f4e6b42d42cbc1c4606a20aeca278853a (patch)
treeedab5dd98f6146bb82ac8890dfeeefa2367f85b9 /tests/unit/test_bidi.py
parent2675514a55b6ac76bb385b84c44db02d6805189b (diff)
downloadpython-api-core-7515c07f4e6b42d42cbc1c4606a20aeca278853a.tar.gz
Core: Mitigate busy reopen loop in ResumableBidiRpc consuming 100% CPU (#8193)
* Add bidi._Throttle helper class * Add optional reopen throttling to ResumableBidiRpc * Enable Bidi reopen throttling in SPM * Change bidi._Throttle signature The commit renames the entry_cap parameter to access_limit, and changes the type of the time_window argument from float to timedelta.
Diffstat (limited to 'tests/unit/test_bidi.py')
-rw-r--r--tests/unit/test_bidi.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/tests/unit/test_bidi.py b/tests/unit/test_bidi.py
index 08d2021..8e9f262 100644
--- a/tests/unit/test_bidi.py
+++ b/tests/unit/test_bidi.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import datetime
import logging
import threading
@@ -116,6 +117,87 @@ class Test_RequestQueueGenerator(object):
assert items == []
+class Test_Throttle(object):
+ def test_repr(self):
+ delta = datetime.timedelta(seconds=4.5)
+ instance = bidi._Throttle(access_limit=42, time_window=delta)
+ assert repr(instance) == \
+ "_Throttle(access_limit=42, time_window={})".format(repr(delta))
+
+ def test_raises_error_on_invalid_init_arguments(self):
+ with pytest.raises(ValueError) as exc_info:
+ bidi._Throttle(
+ access_limit=10, time_window=datetime.timedelta(seconds=0.0)
+ )
+ assert "time_window" in str(exc_info.value)
+ assert "must be a positive timedelta" in str(exc_info.value)
+
+ with pytest.raises(ValueError) as exc_info:
+ bidi._Throttle(
+ access_limit=0, time_window=datetime.timedelta(seconds=10)
+ )
+ assert "access_limit" in str(exc_info.value)
+ assert "must be positive" in str(exc_info.value)
+
+ def test_does_not_delay_entry_attempts_under_threshold(self):
+ throttle = bidi._Throttle(
+ access_limit=3, time_window=datetime.timedelta(seconds=1)
+ )
+ entries = []
+
+ for _ in range(3):
+ with throttle as time_waited:
+ entry_info = {
+ "entered_at": datetime.datetime.now(),
+ "reported_wait": time_waited,
+ }
+ entries.append(entry_info)
+
+ # check the reported wait times ...
+ assert all(entry["reported_wait"] == 0.0 for entry in entries)
+
+ # .. and the actual wait times
+ delta = entries[1]["entered_at"] - entries[0]["entered_at"]
+ assert delta.total_seconds() < 0.1
+ delta = entries[2]["entered_at"] - entries[1]["entered_at"]
+ assert delta.total_seconds() < 0.1
+
+ def test_delays_entry_attempts_above_threshold(self):
+ throttle = bidi._Throttle(
+ access_limit=3, time_window=datetime.timedelta(seconds=1)
+ )
+ entries = []
+
+ for _ in range(6):
+ with throttle as time_waited:
+ entry_info = {
+ "entered_at": datetime.datetime.now(),
+ "reported_wait": time_waited,
+ }
+ entries.append(entry_info)
+
+ # For each group of 4 consecutive entries the time difference between
+ # the first and the last entry must have been greater than time_window,
+ # because a maximum of 3 are allowed in each time_window.
+ for i, entry in enumerate(entries[3:], start=3):
+ first_entry = entries[i - 3]
+ delta = entry["entered_at"] - first_entry["entered_at"]
+ assert delta.total_seconds() > 1.0
+
+ # check the reported wait times
+ # (NOTE: not using assert all(...), b/c the coverage check would complain)
+ for i, entry in enumerate(entries):
+ if i != 3:
+ assert entry["reported_wait"] == 0.0
+
+ # The delayed entry is expected to have been delayed for a significant
+ # chunk of the full second, and the actual and reported delay times
+ # should reflect that.
+ assert entries[3]["reported_wait"] > 0.7
+ delta = entries[3]["entered_at"] - entries[2]["entered_at"]
+ assert delta.total_seconds() > 0.7
+
+
class _CallAndFuture(grpc.Call, grpc.Future):
pass
@@ -442,6 +524,22 @@ class TestResumableBidiRpc(object):
assert bidi_rpc.is_active is False
callback.assert_called_once_with(error2)
+ def test_using_throttle_on_reopen_requests(self):
+ call = CallStub([])
+ start_rpc = mock.create_autospec(
+ grpc.StreamStreamMultiCallable, instance=True, return_value=call
+ )
+ should_recover = mock.Mock(spec=["__call__"], return_value=True)
+ bidi_rpc = bidi.ResumableBidiRpc(
+ start_rpc, should_recover, throttle_reopen=True
+ )
+
+ patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
+ with patcher as mock_enter:
+ bidi_rpc._reopen()
+
+ mock_enter.assert_called_once()
+
def test_send_not_open(self):
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)