summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/examples/doxygen/on_error_resume_next.cpp23
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp106
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp21
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt2
-rw-r--r--Rx/v2/test/operators/on_error_resume_next.cpp152
-rw-r--r--projects/CMake/CMakeLists.txt1
-rw-r--r--projects/doxygen/CMakeLists.txt1
8 files changed, 307 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/on_error_resume_next.cpp b/Rx/v2/examples/doxygen/on_error_resume_next.cpp
new file mode 100644
index 0000000..0c9873b
--- /dev/null
+++ b/Rx/v2/examples/doxygen/on_error_resume_next.cpp
@@ -0,0 +1,23 @@
+#include "rxcpp/rx.hpp"
+namespace rxu=rxcpp::util;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("on_error_resume_next sample"){
+ printf("//! [on_error_resume_next sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ on_error_resume_next([](std::exception_ptr ep){
+ printf("Resuming after: %s\n", rxu::what(ep).c_str());
+ return rxcpp::observable<>::just(-1);
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ printf("OnError: %s\n", rxu::what(ep).c_str());
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [on_error_resume_next sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
new file mode 100644
index 0000000..9356a4c
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
@@ -0,0 +1,106 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP)
+#define RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+
+template<class T, class Selector>
+struct on_error_resume_next
+{
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Selector> select_type;
+ typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type;
+ select_type selector;
+
+ on_error_resume_next(select_type s)
+ : selector(std::move(s))
+ {
+ }
+
+ template<class Subscriber>
+ struct on_error_resume_next_observer
+ {
+ typedef on_error_resume_next_observer<Subscriber> this_type;
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Selector> select_type;
+ typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<T, this_type> observer_type;
+ dest_type dest;
+ composite_subscription lifetime;
+ select_type selector;
+
+ on_error_resume_next_observer(dest_type d, composite_subscription cs, select_type s)
+ : dest(std::move(d))
+ , lifetime(std::move(cs))
+ , selector(std::move(s))
+ {
+ dest.add(lifetime);
+ }
+ void on_next(value_type v) const {
+ dest.on_next(std::move(v));
+ }
+ void on_error(std::exception_ptr e) const {
+ auto selected = on_exception(
+ [&](){
+ return this->selector(std::move(e));},
+ dest);
+ if (selected.empty()) {
+ return;
+ }
+ selected->subscribe(dest);
+ }
+ void on_completed() const {
+ dest.on_completed();
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, select_type s) {
+ auto cs = composite_subscription();
+ return make_subscriber<T>(cs, observer_type(this_type(std::move(d), cs, std::move(s))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector)) {
+ return on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector);
+ }
+};
+
+template<class Selector>
+class on_error_resume_next_factory
+{
+ typedef rxu::decay_t<Selector> select_type;
+ select_type selector;
+public:
+ on_error_resume_next_factory(select_type s) : selector(std::move(s)) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector))) {
+ return source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector));
+ }
+};
+
+}
+
+template<class Selector>
+auto on_error_resume_next(Selector&& p)
+ -> detail::on_error_resume_next_factory<Selector> {
+ return detail::on_error_resume_next_factory<Selector>(std::forward<Selector>(p));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 811e345..b0e4a5a 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -754,6 +754,27 @@ public:
return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)));
}
+ /*! If an error occurs, take the result from the Selector and subscribe to that instead.
+
+ \tparam Selector the actual type of a function of the form `observable<T>(std::exception_ptr)`
+
+ \param s the function of the form `observable<T>(std::exception_ptr)`
+
+ \return Observable that emits the items from the source observable and switches to a new observable on error.
+
+ \sample
+ \snippet on_error_resume_next.cpp on_error_resume_next sample
+ \snippet output.txt on_error_resume_next sample
+ */
+ template<class Selector>
+ auto on_error_resume_next(Selector s) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s))))
+ /// \endcond
+ {
+ return lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s)));
+ }
+
/*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned.
\tparam Selector the type of the transforming function
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 4886ace..78f1fb7 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -52,6 +52,7 @@ namespace rxo=operators;
#include "operators/rx-merge.hpp"
#include "operators/rx-multicast.hpp"
#include "operators/rx-observe_on.hpp"
+#include "operators/rx-on_error_resume_next.hpp"
#include "operators/rx-pairwise.hpp"
#include "operators/rx-publish.hpp"
#include "operators/rx-reduce.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 5f42853..70f4cab 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -40,6 +40,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/map.cpp
${TEST_DIR}/operators/merge.cpp
${TEST_DIR}/operators/observe_on.cpp
+ ${TEST_DIR}/operators/on_error_resume_next.cpp
${TEST_DIR}/operators/pairwise.cpp
${TEST_DIR}/operators/publish.cpp
${TEST_DIR}/operators/reduce.cpp
@@ -71,6 +72,7 @@ target_link_libraries(rxcppv2_test ${CMAKE_THREAD_LIBS_INIT})
set(ONE_SOURCES
${TEST_DIR}/test.cpp
#${TEST_DIR}/operators/tap.cpp
+ #${TEST_DIR}/operators/on_error_resume_next.cpp
)
add_executable(one_test ${ONE_SOURCES})
add_executable(rxcpp::one_test ALIAS one_test)
diff --git a/Rx/v2/test/operators/on_error_resume_next.cpp b/Rx/v2/test/operators/on_error_resume_next.cpp
new file mode 100644
index 0000000..aec1774
--- /dev/null
+++ b/Rx/v2/test/operators/on_error_resume_next.cpp
@@ -0,0 +1,152 @@
+#include "rxcpp/rx.hpp"
+namespace rxu=rxcpp::util;
+namespace rxsc=rxcpp::schedulers;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("on_error_resume_next stops on completion", "[on_error_resume_next][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(180, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.next(350, 5),
+ on.completed(400),
+ on.next(410, -1),
+ on.completed(420),
+ on.error(430, std::runtime_error("error on unsubscribed stream"))
+ });
+
+ auto ys = sc.make_cold_observable({
+ on.next(10, -1),
+ on.completed(20),
+ });
+
+ WHEN("passed through unchanged"){
+
+ auto res = w.start(
+ [xs, ys, &invoked]() {
+ return xs
+ .on_error_resume_next([ys, &invoked](std::exception_ptr) {
+ invoked++;
+ return ys;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output stops on completion"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.next(350, 5),
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one xs subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was no ys subscription"){
+ auto required = std::vector<rxcpp::notifications::subscription>();
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("on_error_resume_next selector was not called"){
+ REQUIRE(0 == invoked);
+ }
+ }
+ }
+}
+
+SCENARIO("on_error_resume_next stops on error", "[on_error_resume_next][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ std::runtime_error ex("on_error_resume_next on_error from source");
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(180, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.error(300, ex),
+ on.next(350, 5),
+ on.completed(400),
+ on.next(410, -1),
+ on.completed(420),
+ on.error(430, std::runtime_error("error on unsubscribed stream"))
+ });
+
+ auto ys = sc.make_cold_observable({
+ on.next(10, -1),
+ on.completed(20),
+ });
+
+ WHEN("are resumed after an error"){
+
+ auto res = w.start(
+ [xs, ys, &invoked]() {
+ return xs
+ .on_error_resume_next([ys, &invoked](std::exception_ptr) {
+ invoked++;
+ return ys;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output stops on completion"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.next(310, -1),
+ on.completed(320)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one xs subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one ys subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 320)
+ });
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("on_error_resume_next selector was called once"){
+ REQUIRE(1 == invoked);
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 6c4b564..8d1fe07 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -44,6 +44,7 @@ set(RX_SOURCES
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-merge.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-publish.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 8c0b8e6..46f469e 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -67,6 +67,7 @@ if(DOXYGEN_FOUND)
${DOXY_EXAMPLES_SRC_DIR}/merge.cpp
${DOXY_EXAMPLES_SRC_DIR}/never.cpp
${DOXY_EXAMPLES_SRC_DIR}/observe_on.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/on_error_resume_next.cpp
${DOXY_EXAMPLES_SRC_DIR}/pairwise.cpp
${DOXY_EXAMPLES_SRC_DIR}/publish.cpp
${DOXY_EXAMPLES_SRC_DIR}/range.cpp