summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-04-12 10:01:40 +0300
committerGrigoriy Chudnov <g.chudnov@gmail.com>2016-04-12 10:01:40 +0300
commit4e7e721138a6d41d48840e6d2aa0a949c40920c2 (patch)
treedec20085f8a00cf1feb10528be91e400f5a933da
parentc7b40011452bd35f00f1dd4cf5c5a0769b615a27 (diff)
downloadRxCpp-4e7e721138a6d41d48840e6d2aa0a949c40920c2.tar.gz
add timeout operator
-rw-r--r--Rx/v2/examples/doxygen/timeout.cpp26
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timeout.hpp218
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp44
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/timeout.cpp212
-rw-r--r--projects/CMake/CMakeLists.txt1
-rw-r--r--projects/doxygen/CMakeLists.txt1
8 files changed, 504 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/timeout.cpp b/Rx/v2/examples/doxygen/timeout.cpp
new file mode 100644
index 0000000..c61d519
--- /dev/null
+++ b/Rx/v2/examples/doxygen/timeout.cpp
@@ -0,0 +1,26 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("timeout sample"){
+ printf("//! [timeout sample]\n");
+
+ using namespace std::chrono;
+ auto values = rxcpp::observable<>::interval(milliseconds(100))
+ .take(3)
+ .concat(rxcpp::observable<>::interval(milliseconds(500)))
+ .timeout(milliseconds(200));
+ values.
+ subscribe(
+ [](long v) { printf("OnNext: %ld\n", v); },
+ [](std::exception_ptr ep) {
+ try {
+ std::rethrow_exception(ep);
+ } catch (const rxcpp::timeout_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [timeout sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
new file mode 100644
index 0000000..4989374
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
@@ -0,0 +1,218 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP)
+#define RXCPP_OPERATORS_RX_TIMEOUT_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+class timeout_error: public std::runtime_error
+{
+ public:
+ explicit timeout_error(const std::string& msg):
+ std::runtime_error(msg)
+ {}
+};
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Duration, class Coordination>
+struct timeout
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+ typedef rxu::decay_t<Duration> duration_type;
+
+ struct timeout_values
+ {
+ timeout_values(duration_type p, coordination_type c)
+ : period(p)
+ , coordination(c)
+ {
+ }
+
+ duration_type period;
+ coordination_type coordination;
+ };
+ timeout_values initial;
+
+ timeout(duration_type period, coordination_type coordination)
+ : initial(period, coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct timeout_observer
+ {
+ typedef timeout_observer<Subscriber> this_type;
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<T, this_type> observer_type;
+
+ struct timeout_subscriber_values : public timeout_values
+ {
+ timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
+ : timeout_values(v)
+ , cs(std::move(cs))
+ , dest(std::move(d))
+ , coordinator(std::move(c))
+ , worker(coordinator.get_worker())
+ , index(0)
+ {
+ }
+
+ composite_subscription cs;
+ dest_type dest;
+ coordinator_type coordinator;
+ rxsc::worker worker;
+ mutable std::size_t index;
+ };
+ typedef std::shared_ptr<timeout_subscriber_values> state_type;
+ state_type state;
+
+ timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
+ : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
+ {
+ auto localState = state;
+
+ auto disposer = [=](const rxsc::schedulable&){
+ localState->cs.unsubscribe();
+ localState->dest.unsubscribe();
+ localState->worker.unsubscribe();
+ };
+ auto selectedDisposer = on_exception(
+ [&](){ return localState->coordinator.act(disposer); },
+ localState->dest);
+ if (selectedDisposer.empty()) {
+ return;
+ }
+
+ localState->dest.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ localState->cs.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ }
+
+ static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) {
+ auto produce = [id, state](const rxsc::schedulable&) {
+ if(id != state->index)
+ return;
+
+ state->dest.on_error(std::make_exception_ptr(rxcpp::timeout_error("timeout has occurred")));
+ };
+
+ auto selectedProduce = on_exception(
+ [&](){ return state->coordinator.act(produce); },
+ state->dest);
+ if (selectedProduce.empty()) {
+ return std::function<void(const rxsc::schedulable&)>();
+ }
+
+ return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
+ }
+
+ void on_next(T v) const {
+ auto localState = state;
+ auto work = [v, localState](const rxsc::schedulable&) {
+ auto new_id = ++localState->index;
+ auto produce_time = localState->worker.now() + localState->period;
+
+ localState->dest.on_next(v);
+ localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_error(std::exception_ptr e) const {
+ auto localState = state;
+ auto work = [e, localState](const rxsc::schedulable&) {
+ localState->dest.on_error(e);
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_completed() const {
+ auto localState = state;
+ auto work = [localState](const rxsc::schedulable&) {
+ localState->dest.on_completed();
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, timeout_values v) {
+ auto cs = composite_subscription();
+ auto coordinator = v.coordination.create_coordinator();
+
+ return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
+ return timeout_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class Duration, class Coordination>
+class timeout_factory
+{
+ typedef rxu::decay_t<Duration> duration_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ duration_type period;
+ coordination_type coordination;
+public:
+ timeout_factory(duration_type p, coordination_type c) : period(p), coordination(c) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(timeout<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(timeout<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination));
+ }
+};
+
+}
+
+template<class Duration, class Coordination>
+inline auto timeout(Duration period, Coordination coordination)
+ -> detail::timeout_factory<Duration, Coordination> {
+ return detail::timeout_factory<Duration, Coordination>(period, coordination);
+}
+
+template<class Duration>
+inline auto timeout(Duration period)
+ -> detail::timeout_factory<Duration, identity_one_worker> {
+ return detail::timeout_factory<Duration, identity_one_worker>(period, identity_current_thread());
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index f197935..e1e3e5a 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -812,6 +812,50 @@ public:
return lift<T>(rxo::detail::tap<T, std::tuple<MakeObserverArgN...>>(std::make_tuple(std::forward<MakeObserverArgN>(an)...)));
}
+ /*! Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.
+
+ \tparam Duration the type of time interval
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time wait for another item from the source observable.
+ \param coordination the scheduler to manage timeout for each event
+
+ \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
+
+ \sample
+ \snippet timeout.cpp timeout sample
+ \snippet output.txt timeout sample
+ */
+ template<class Duration, class Coordination>
+ auto timeout(Duration period, Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::timeout<T, Duration, Coordination>(period, coordination)))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::timeout<T, Duration, Coordination>(period, coordination));
+ }
+
+ /*! Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.
+
+ \tparam Duration the type of time interval
+
+ \param period the period of time wait for another item from the source observable.
+
+ \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
+
+ \sample
+ \snippet timeout.cpp timeout sample
+ \snippet output.txt timeout sample
+ */
+ template<class Duration>
+ auto timeout(Duration period) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread())))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread()));
+ }
+
/*! Add a new action at the end of the new observable that is returned.
\tparam LastCall the type of the action function
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 1204a4b..d9aadf4 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -77,6 +77,7 @@ namespace rxo=operators;
#include "operators/rx-take.hpp"
#include "operators/rx-take_until.hpp"
#include "operators/rx-tap.hpp"
+#include "operators/rx-timeout.hpp"
#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
#include "operators/rx-window_time_count.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index f47ffd6..be8394d 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -64,6 +64,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/take.cpp
${TEST_DIR}/operators/take_until.cpp
${TEST_DIR}/operators/tap.cpp
+ ${TEST_DIR}/operators/timeout.cpp
${TEST_DIR}/operators/window.cpp
${TEST_DIR}/operators/zip.cpp
)
diff --git a/Rx/v2/test/operators/timeout.cpp b/Rx/v2/test/operators/timeout.cpp
new file mode 100644
index 0000000..61d67a6
--- /dev/null
+++ b/Rx/v2/test/operators/timeout.cpp
@@ -0,0 +1,212 @@
+#include "../test.h"
+
+using namespace std::chrono;
+
+SCENARIO("should not timeout if the source never emits any items", "[timeout][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("timeout is set"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timeout(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1001)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("should not timeout if the source observable is empty", "[timeout][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("timeout is set"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timeout(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on.completed(251)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("should not timeout if all items are emitted within the specified timeout duration", "[timeout][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(250)
+ });
+
+ WHEN("timeout is set"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timeout(milliseconds(40), so);
+ }
+ );
+
+ THEN("the output contains the emitted items while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(211, 2),
+ on.next(241, 3),
+ on.completed(251)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("should timeout if there are no emitted items within the timeout duration", "[timeout][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ rxcpp::timeout_error ex("timeout has occurred");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ // -- no emissions
+ on.completed(300)
+ });
+
+ WHEN("timeout is set"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timeout(milliseconds(40), so);
+ }
+ );
+
+ THEN("an error notification message is captured"){
+ auto required = rxu::to_vector({
+ on.next(211, 2),
+ on.next(241, 3),
+ on.error(281, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 282)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("should not timeout if there is an error", "[timeout][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("timeout is set"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timeout(milliseconds(40), so);
+ }
+ );
+
+ THEN("the output contains only an error message"){
+ auto required = rxu::to_vector({
+ on.error(251, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 8485740..5fe2cad 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -69,6 +69,7 @@ set(RX_SOURCES
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-take.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-tap.hpp
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 8db674b..7dd414e 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -95,6 +95,7 @@ if(DOXYGEN_FOUND)
${DOXY_EXAMPLES_SRC_DIR}/take.cpp
${DOXY_EXAMPLES_SRC_DIR}/take_until.cpp
${DOXY_EXAMPLES_SRC_DIR}/tap.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/timeout.cpp
${DOXY_EXAMPLES_SRC_DIR}/timer.cpp
${DOXY_EXAMPLES_SRC_DIR}/window.cpp
${DOXY_EXAMPLES_SRC_DIR}/zip.cpp