diff options
-rw-r--r-- | Rx/v2/examples/doxygen/on_error_resume_next.cpp | 23 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp | 106 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 21 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 2 | ||||
-rw-r--r-- | Rx/v2/test/operators/on_error_resume_next.cpp | 152 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 1 | ||||
-rw-r--r-- | projects/doxygen/CMakeLists.txt | 1 |
8 files changed, 307 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/on_error_resume_next.cpp b/Rx/v2/examples/doxygen/on_error_resume_next.cpp new file mode 100644 index 0000000..0c9873b --- /dev/null +++ b/Rx/v2/examples/doxygen/on_error_resume_next.cpp @@ -0,0 +1,23 @@ +#include "rxcpp/rx.hpp" +namespace rxu=rxcpp::util; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("on_error_resume_next sample"){ + printf("//! [on_error_resume_next sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + on_error_resume_next([](std::exception_ptr ep){ + printf("Resuming after: %s\n", rxu::what(ep).c_str()); + return rxcpp::observable<>::just(-1); + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + printf("OnError: %s\n", rxu::what(ep).c_str()); + }, + [](){printf("OnCompleted\n");}); + printf("//! [on_error_resume_next sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp new file mode 100644 index 0000000..9356a4c --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP) +#define RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + + +template<class T, class Selector> +struct on_error_resume_next +{ + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Selector> select_type; + typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type; + select_type selector; + + on_error_resume_next(select_type s) + : selector(std::move(s)) + { + } + + template<class Subscriber> + struct on_error_resume_next_observer + { + typedef on_error_resume_next_observer<Subscriber> this_type; + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Selector> select_type; + typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<T, this_type> observer_type; + dest_type dest; + composite_subscription lifetime; + select_type selector; + + on_error_resume_next_observer(dest_type d, composite_subscription cs, select_type s) + : dest(std::move(d)) + , lifetime(std::move(cs)) + , selector(std::move(s)) + { + dest.add(lifetime); + } + void on_next(value_type v) const { + dest.on_next(std::move(v)); + } + void on_error(std::exception_ptr e) const { + auto selected = on_exception( + [&](){ + return this->selector(std::move(e));}, + dest); + if (selected.empty()) { + return; + } + selected->subscribe(dest); + } + void on_completed() const { + dest.on_completed(); + } + + static subscriber<T, observer_type> make(dest_type d, select_type s) { + auto cs = composite_subscription(); + return make_subscriber<T>(cs, observer_type(this_type(std::move(d), cs, std::move(s)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector)) { + return on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector); + } +}; + +template<class Selector> +class on_error_resume_next_factory +{ + typedef rxu::decay_t<Selector> select_type; + select_type selector; +public: + on_error_resume_next_factory(select_type s) : selector(std::move(s)) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector))) { + return source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector)); + } +}; + +} + +template<class Selector> +auto on_error_resume_next(Selector&& p) + -> detail::on_error_resume_next_factory<Selector> { + return detail::on_error_resume_next_factory<Selector>(std::forward<Selector>(p)); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 811e345..b0e4a5a 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -754,6 +754,27 @@ public: return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))); } + /*! If an error occurs, take the result from the Selector and subscribe to that instead. + + \tparam Selector the actual type of a function of the form `observable<T>(std::exception_ptr)` + + \param s the function of the form `observable<T>(std::exception_ptr)` + + \return Observable that emits the items from the source observable and switches to a new observable on error. + + \sample + \snippet on_error_resume_next.cpp on_error_resume_next sample + \snippet output.txt on_error_resume_next sample + */ + template<class Selector> + auto on_error_resume_next(Selector s) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s)))) + /// \endcond + { + return lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s))); + } + /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned. \tparam Selector the type of the transforming function diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4886ace..78f1fb7 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -52,6 +52,7 @@ namespace rxo=operators; #include "operators/rx-merge.hpp" #include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" +#include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-reduce.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 5f42853..70f4cab 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -40,6 +40,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/map.cpp ${TEST_DIR}/operators/merge.cpp ${TEST_DIR}/operators/observe_on.cpp + ${TEST_DIR}/operators/on_error_resume_next.cpp ${TEST_DIR}/operators/pairwise.cpp ${TEST_DIR}/operators/publish.cpp ${TEST_DIR}/operators/reduce.cpp @@ -71,6 +72,7 @@ target_link_libraries(rxcppv2_test ${CMAKE_THREAD_LIBS_INIT}) set(ONE_SOURCES ${TEST_DIR}/test.cpp #${TEST_DIR}/operators/tap.cpp + #${TEST_DIR}/operators/on_error_resume_next.cpp ) add_executable(one_test ${ONE_SOURCES}) add_executable(rxcpp::one_test ALIAS one_test) diff --git a/Rx/v2/test/operators/on_error_resume_next.cpp b/Rx/v2/test/operators/on_error_resume_next.cpp new file mode 100644 index 0000000..aec1774 --- /dev/null +++ b/Rx/v2/test/operators/on_error_resume_next.cpp @@ -0,0 +1,152 @@ +#include "rxcpp/rx.hpp" +namespace rxu=rxcpp::util; +namespace rxsc=rxcpp::schedulers; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("on_error_resume_next stops on completion", "[on_error_resume_next][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + long invoked = 0; + + auto xs = sc.make_hot_observable({ + on.next(180, 1), + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.next(350, 5), + on.completed(400), + on.next(410, -1), + on.completed(420), + on.error(430, std::runtime_error("error on unsubscribed stream")) + }); + + auto ys = sc.make_cold_observable({ + on.next(10, -1), + on.completed(20), + }); + + WHEN("passed through unchanged"){ + + auto res = w.start( + [xs, ys, &invoked]() { + return xs + .on_error_resume_next([ys, &invoked](std::exception_ptr) { + invoked++; + return ys; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output stops on completion"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.next(350, 5), + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one xs subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was no ys subscription"){ + auto required = std::vector<rxcpp::notifications::subscription>(); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + + THEN("on_error_resume_next selector was not called"){ + REQUIRE(0 == invoked); + } + } + } +} + +SCENARIO("on_error_resume_next stops on error", "[on_error_resume_next][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + std::runtime_error ex("on_error_resume_next on_error from source"); + long invoked = 0; + + auto xs = sc.make_hot_observable({ + on.next(180, 1), + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.error(300, ex), + on.next(350, 5), + on.completed(400), + on.next(410, -1), + on.completed(420), + on.error(430, std::runtime_error("error on unsubscribed stream")) + }); + + auto ys = sc.make_cold_observable({ + on.next(10, -1), + on.completed(20), + }); + + WHEN("are resumed after an error"){ + + auto res = w.start( + [xs, ys, &invoked]() { + return xs + .on_error_resume_next([ys, &invoked](std::exception_ptr) { + invoked++; + return ys; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output stops on completion"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.next(310, -1), + on.completed(320) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one xs subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one ys subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(300, 320) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + + THEN("on_error_resume_next selector was called once"){ + REQUIRE(1 == invoked); + } + } + } +}
\ No newline at end of file diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 6c4b564..8d1fe07 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -44,6 +44,7 @@ set(RX_SOURCES ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-merge.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-multicast.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp + ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-publish.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-reduce.hpp diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index 8c0b8e6..46f469e 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -67,6 +67,7 @@ if(DOXYGEN_FOUND) ${DOXY_EXAMPLES_SRC_DIR}/merge.cpp ${DOXY_EXAMPLES_SRC_DIR}/never.cpp ${DOXY_EXAMPLES_SRC_DIR}/observe_on.cpp + ${DOXY_EXAMPLES_SRC_DIR}/on_error_resume_next.cpp ${DOXY_EXAMPLES_SRC_DIR}/pairwise.cpp ${DOXY_EXAMPLES_SRC_DIR}/publish.cpp ${DOXY_EXAMPLES_SRC_DIR}/range.cpp |