diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-04-12 10:01:40 +0300 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-04-12 10:01:40 +0300 |
commit | 4e7e721138a6d41d48840e6d2aa0a949c40920c2 (patch) | |
tree | dec20085f8a00cf1feb10528be91e400f5a933da | |
parent | c7b40011452bd35f00f1dd4cf5c5a0769b615a27 (diff) | |
download | RxCpp-4e7e721138a6d41d48840e6d2aa0a949c40920c2.tar.gz |
add timeout operator
-rw-r--r-- | Rx/v2/examples/doxygen/timeout.cpp | 26 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-timeout.hpp | 218 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 44 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/timeout.cpp | 212 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 1 | ||||
-rw-r--r-- | projects/doxygen/CMakeLists.txt | 1 |
8 files changed, 504 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/timeout.cpp b/Rx/v2/examples/doxygen/timeout.cpp new file mode 100644 index 0000000..c61d519 --- /dev/null +++ b/Rx/v2/examples/doxygen/timeout.cpp @@ -0,0 +1,26 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("timeout sample"){ + printf("//! [timeout sample]\n"); + + using namespace std::chrono; + auto values = rxcpp::observable<>::interval(milliseconds(100)) + .take(3) + .concat(rxcpp::observable<>::interval(milliseconds(500))) + .timeout(milliseconds(200)); + values. + subscribe( + [](long v) { printf("OnNext: %ld\n", v); }, + [](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (const rxcpp::timeout_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + []() { printf("OnCompleted\n"); }); + printf("//! [timeout sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp new file mode 100644 index 0000000..4989374 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP) +#define RXCPP_OPERATORS_RX_TIMEOUT_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +class timeout_error: public std::runtime_error +{ + public: + explicit timeout_error(const std::string& msg): + std::runtime_error(msg) + {} +}; + +namespace operators { + +namespace detail { + +template<class T, class Duration, class Coordination> +struct timeout +{ + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Coordination> coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + typedef rxu::decay_t<Duration> duration_type; + + struct timeout_values + { + timeout_values(duration_type p, coordination_type c) + : period(p) + , coordination(c) + { + } + + duration_type period; + coordination_type coordination; + }; + timeout_values initial; + + timeout(duration_type period, coordination_type coordination) + : initial(period, coordination) + { + } + + template<class Subscriber> + struct timeout_observer + { + typedef timeout_observer<Subscriber> this_type; + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<T, this_type> observer_type; + + struct timeout_subscriber_values : public timeout_values + { + timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c) + : timeout_values(v) + , cs(std::move(cs)) + , dest(std::move(d)) + , coordinator(std::move(c)) + , worker(coordinator.get_worker()) + , index(0) + { + } + + composite_subscription cs; + dest_type dest; + coordinator_type coordinator; + rxsc::worker worker; + mutable std::size_t index; + }; + typedef std::shared_ptr<timeout_subscriber_values> state_type; + state_type state; + + timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c) + : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) + { + auto localState = state; + + auto disposer = [=](const rxsc::schedulable&){ + localState->cs.unsubscribe(); + localState->dest.unsubscribe(); + localState->worker.unsubscribe(); + }; + auto selectedDisposer = on_exception( + [&](){ return localState->coordinator.act(disposer); }, + localState->dest); + if (selectedDisposer.empty()) { + return; + } + + localState->dest.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + localState->cs.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + } + + static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) { + auto produce = [id, state](const rxsc::schedulable&) { + if(id != state->index) + return; + + state->dest.on_error(std::make_exception_ptr(rxcpp::timeout_error("timeout has occurred"))); + }; + + auto selectedProduce = on_exception( + [&](){ return state->coordinator.act(produce); }, + state->dest); + if (selectedProduce.empty()) { + return std::function<void(const rxsc::schedulable&)>(); + } + + return std::function<void(const rxsc::schedulable&)>(selectedProduce.get()); + } + + void on_next(T v) const { + auto localState = state; + auto work = [v, localState](const rxsc::schedulable&) { + auto new_id = ++localState->index; + auto produce_time = localState->worker.now() + localState->period; + + localState->dest.on_next(v); + localState->worker.schedule(produce_time, produce_timeout(new_id, localState)); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_error(std::exception_ptr e) const { + auto localState = state; + auto work = [e, localState](const rxsc::schedulable&) { + localState->dest.on_error(e); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_completed() const { + auto localState = state; + auto work = [localState](const rxsc::schedulable&) { + localState->dest.on_completed(); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + static subscriber<T, observer_type> make(dest_type d, timeout_values v) { + auto cs = composite_subscription(); + auto coordinator = v.coordination.create_coordinator(); + + return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) { + return timeout_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template<class Duration, class Coordination> +class timeout_factory +{ + typedef rxu::decay_t<Duration> duration_type; + typedef rxu::decay_t<Coordination> coordination_type; + + duration_type period; + coordination_type coordination; +public: + timeout_factory(duration_type p, coordination_type c) : period(p), coordination(c) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(timeout<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(timeout<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination)); + } +}; + +} + +template<class Duration, class Coordination> +inline auto timeout(Duration period, Coordination coordination) + -> detail::timeout_factory<Duration, Coordination> { + return detail::timeout_factory<Duration, Coordination>(period, coordination); +} + +template<class Duration> +inline auto timeout(Duration period) + -> detail::timeout_factory<Duration, identity_one_worker> { + return detail::timeout_factory<Duration, identity_one_worker>(period, identity_current_thread()); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index f197935..e1e3e5a 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -812,6 +812,50 @@ public: return lift<T>(rxo::detail::tap<T, std::tuple<MakeObserverArgN...>>(std::make_tuple(std::forward<MakeObserverArgN>(an)...))); } + /*! Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable. + + \tparam Duration the type of time interval + \tparam Coordination the type of the scheduler + + \param period the period of time wait for another item from the source observable. + \param coordination the scheduler to manage timeout for each event + + \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable. + + \sample + \snippet timeout.cpp timeout sample + \snippet output.txt timeout sample + */ + template<class Duration, class Coordination> + auto timeout(Duration period, Coordination coordination) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::timeout<T, Duration, Coordination>(period, coordination))) + /// \endcond + { + return lift<T>(rxo::detail::timeout<T, Duration, Coordination>(period, coordination)); + } + + /*! Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable. + + \tparam Duration the type of time interval + + \param period the period of time wait for another item from the source observable. + + \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable. + + \sample + \snippet timeout.cpp timeout sample + \snippet output.txt timeout sample + */ + template<class Duration> + auto timeout(Duration period) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread()))) + /// \endcond + { + return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread())); + } + /*! Add a new action at the end of the new observable that is returned. \tparam LastCall the type of the action function diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 1204a4b..d9aadf4 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -77,6 +77,7 @@ namespace rxo=operators; #include "operators/rx-take.hpp" #include "operators/rx-take_until.hpp" #include "operators/rx-tap.hpp" +#include "operators/rx-timeout.hpp" #include "operators/rx-window.hpp" #include "operators/rx-window_time.hpp" #include "operators/rx-window_time_count.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index f47ffd6..be8394d 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -64,6 +64,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/take.cpp ${TEST_DIR}/operators/take_until.cpp ${TEST_DIR}/operators/tap.cpp + ${TEST_DIR}/operators/timeout.cpp ${TEST_DIR}/operators/window.cpp ${TEST_DIR}/operators/zip.cpp ) diff --git a/Rx/v2/test/operators/timeout.cpp b/Rx/v2/test/operators/timeout.cpp new file mode 100644 index 0000000..61d67a6 --- /dev/null +++ b/Rx/v2/test/operators/timeout.cpp @@ -0,0 +1,212 @@ +#include "../test.h" + +using namespace std::chrono; + +SCENARIO("should not timeout if the source never emits any items", "[timeout][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("timeout is set"){ + + auto res = w.start( + [so, xs]() { + return xs.timeout(milliseconds(10), so); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1001) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("should not timeout if the source observable is empty", "[timeout][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("timeout is set"){ + + auto res = w.start( + [so, xs]() { + return xs.timeout(milliseconds(10), so); + } + ); + + THEN("the output only contains complete message"){ + auto required = rxu::to_vector({ + on.completed(251) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("should not timeout if all items are emitted within the specified timeout duration", "[timeout][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(250) + }); + + WHEN("timeout is set"){ + + auto res = w.start( + [so, xs]() { + return xs.timeout(milliseconds(40), so); + } + ); + + THEN("the output contains the emitted items while subscribed"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.completed(251) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("should timeout if there are no emitted items within the timeout duration", "[timeout][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + rxcpp::timeout_error ex("timeout has occurred"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + // -- no emissions + on.completed(300) + }); + + WHEN("timeout is set"){ + + auto res = w.start( + [so, xs]() { + return xs.timeout(milliseconds(40), so); + } + ); + + THEN("an error notification message is captured"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.error(281, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 282) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("should not timeout if there is an error", "[timeout][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("timeout is set"){ + + auto res = w.start( + [so, xs]() { + return xs.timeout(milliseconds(40), so); + } + ); + + THEN("the output contains only an error message"){ + auto required = rxu::to_vector({ + on.error(251, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 8485740..5fe2cad 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -69,6 +69,7 @@ set(RX_SOURCES ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-take.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-take_until.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-tap.hpp + ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-timeout.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index 8db674b..7dd414e 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -95,6 +95,7 @@ if(DOXYGEN_FOUND) ${DOXY_EXAMPLES_SRC_DIR}/take.cpp ${DOXY_EXAMPLES_SRC_DIR}/take_until.cpp ${DOXY_EXAMPLES_SRC_DIR}/tap.cpp + ${DOXY_EXAMPLES_SRC_DIR}/timeout.cpp ${DOXY_EXAMPLES_SRC_DIR}/timer.cpp ${DOXY_EXAMPLES_SRC_DIR}/window.cpp ${DOXY_EXAMPLES_SRC_DIR}/zip.cpp |