summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-07-15 01:49:47 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-07-14 15:49:47 -0700
commit5aea424898994b3d409db268486202244cfe5053 (patch)
tree3d0f93b34d0bacbc48c77db8486624b9110b0208 /Rx
parent1d75538fa780e530bcd724433257c65b3fe36b29 (diff)
downloadRxCpp-5aea424898994b3d409db268486202244cfe5053.tar.gz
fix timeout when no items are emitted (#387)
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timeout.hpp14
-rw-r--r--Rx/v2/test/operators/timeout.cpp18
2 files changed, 25 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
index 6750e34..841df12 100644
--- a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
@@ -126,6 +126,20 @@ struct timeout
localState->cs.add([=](){
localState->worker.schedule(selectedDisposer.get());
});
+
+ auto work = [v, localState](const rxsc::schedulable&) {
+ auto new_id = ++localState->index;
+ auto produce_time = localState->worker.now() + localState->period;
+
+ localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
}
static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) {
diff --git a/Rx/v2/test/operators/timeout.cpp b/Rx/v2/test/operators/timeout.cpp
index 3005209..dbbea6c 100644
--- a/Rx/v2/test/operators/timeout.cpp
+++ b/Rx/v2/test/operators/timeout.cpp
@@ -3,13 +3,15 @@
using namespace std::chrono;
-SCENARIO("should not timeout if the source never emits any items", "[timeout][operators]"){
+SCENARIO("should timeout if the source never emits any items", "[timeout][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
auto so = rx::synchronize_in_one_worker(sc);
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
+ rxcpp::timeout_error ex("timeout has occurred");
+
auto xs = sc.make_hot_observable({
on.next(150, 1)
});
@@ -23,15 +25,17 @@ SCENARIO("should not timeout if the source never emits any items", "[timeout][op
}
);
- THEN("the output is empty"){
- auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ THEN("the error notification message is captured"){
+ auto required = rxu::to_vector({
+ on.error(211, ex)
+ });
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was 1 subscription/unsubscription to the source"){
auto required = rxu::to_vector({
- on.subscribe(200, 1001)
+ on.subscribe(200, 212)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
@@ -40,7 +44,7 @@ SCENARIO("should not timeout if the source never emits any items", "[timeout][op
}
}
-SCENARIO("should not timeout if the source observable is empty", "[timeout][operators]"){
+SCENARIO("should not timeout if completed before the specified timeout duration", "[timeout][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
auto so = rx::synchronize_in_one_worker(sc);
@@ -56,7 +60,7 @@ SCENARIO("should not timeout if the source observable is empty", "[timeout][oper
auto res = w.start(
[so, xs]() {
- return xs.timeout(so, milliseconds(10));
+ return xs.timeout(so, milliseconds(100));
}
);
@@ -189,7 +193,7 @@ SCENARIO("should not timeout if there is an error", "[timeout][operators]"){
auto res = w.start(
[so, xs]() {
- return xs.timeout(milliseconds(40), so);
+ return xs.timeout(milliseconds(100), so);
}
);