diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-07-15 01:49:47 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-07-14 15:49:47 -0700 |
commit | 5aea424898994b3d409db268486202244cfe5053 (patch) | |
tree | 3d0f93b34d0bacbc48c77db8486624b9110b0208 | |
parent | 1d75538fa780e530bcd724433257c65b3fe36b29 (diff) | |
download | RxCpp-5aea424898994b3d409db268486202244cfe5053.tar.gz |
fix timeout when no items are emitted (#387)
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-timeout.hpp | 14 | ||||
-rw-r--r-- | Rx/v2/test/operators/timeout.cpp | 18 |
2 files changed, 25 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp index 6750e34..841df12 100644 --- a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp @@ -126,6 +126,20 @@ struct timeout localState->cs.add([=](){ localState->worker.schedule(selectedDisposer.get()); }); + + auto work = [v, localState](const rxsc::schedulable&) { + auto new_id = ++localState->index; + auto produce_time = localState->worker.now() + localState->period; + + localState->worker.schedule(produce_time, produce_timeout(new_id, localState)); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); } static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) { diff --git a/Rx/v2/test/operators/timeout.cpp b/Rx/v2/test/operators/timeout.cpp index 3005209..dbbea6c 100644 --- a/Rx/v2/test/operators/timeout.cpp +++ b/Rx/v2/test/operators/timeout.cpp @@ -3,13 +3,15 @@ using namespace std::chrono; -SCENARIO("should not timeout if the source never emits any items", "[timeout][operators]"){ +SCENARIO("should timeout if the source never emits any items", "[timeout][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<int> on; + rxcpp::timeout_error ex("timeout has occurred"); + auto xs = sc.make_hot_observable({ on.next(150, 1) }); @@ -23,15 +25,17 @@ SCENARIO("should not timeout if the source never emits any items", "[timeout][op } ); - THEN("the output is empty"){ - auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + THEN("the error notification message is captured"){ + auto required = rxu::to_vector({ + on.error(211, ex) + }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ - on.subscribe(200, 1001) + on.subscribe(200, 212) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); @@ -40,7 +44,7 @@ SCENARIO("should not timeout if the source never emits any items", "[timeout][op } } -SCENARIO("should not timeout if the source observable is empty", "[timeout][operators]"){ +SCENARIO("should not timeout if completed before the specified timeout duration", "[timeout][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); @@ -56,7 +60,7 @@ SCENARIO("should not timeout if the source observable is empty", "[timeout][oper auto res = w.start( [so, xs]() { - return xs.timeout(so, milliseconds(10)); + return xs.timeout(so, milliseconds(100)); } ); @@ -189,7 +193,7 @@ SCENARIO("should not timeout if there is an error", "[timeout][operators]"){ auto res = w.start( [so, xs]() { - return xs.timeout(milliseconds(40), so); + return xs.timeout(milliseconds(100), so); } ); |