diff options
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp | 106 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 21 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 |
3 files changed, 128 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp new file mode 100644 index 0000000..9356a4c --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP) +#define RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + + +template<class T, class Selector> +struct on_error_resume_next +{ + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Selector> select_type; + typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type; + select_type selector; + + on_error_resume_next(select_type s) + : selector(std::move(s)) + { + } + + template<class Subscriber> + struct on_error_resume_next_observer + { + typedef on_error_resume_next_observer<Subscriber> this_type; + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Selector> select_type; + typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<T, this_type> observer_type; + dest_type dest; + composite_subscription lifetime; + select_type selector; + + on_error_resume_next_observer(dest_type d, composite_subscription cs, select_type s) + : dest(std::move(d)) + , lifetime(std::move(cs)) + , selector(std::move(s)) + { + dest.add(lifetime); + } + void on_next(value_type v) const { + dest.on_next(std::move(v)); + } + void on_error(std::exception_ptr e) const { + auto selected = on_exception( + [&](){ + return this->selector(std::move(e));}, + dest); + if (selected.empty()) { + return; + } + selected->subscribe(dest); + } + void on_completed() const { + dest.on_completed(); + } + + static subscriber<T, observer_type> make(dest_type d, select_type s) { + auto cs = composite_subscription(); + return make_subscriber<T>(cs, observer_type(this_type(std::move(d), cs, std::move(s)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector)) { + return on_error_resume_next_observer<Subscriber>::make(std::move(dest), selector); + } +}; + +template<class Selector> +class on_error_resume_next_factory +{ + typedef rxu::decay_t<Selector> select_type; + select_type selector; +public: + on_error_resume_next_factory(select_type s) : selector(std::move(s)) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector))) { + return source.template lift<rxu::value_type_t<on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(on_error_resume_next<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector)); + } +}; + +} + +template<class Selector> +auto on_error_resume_next(Selector&& p) + -> detail::on_error_resume_next_factory<Selector> { + return detail::on_error_resume_next_factory<Selector>(std::forward<Selector>(p)); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 811e345..b0e4a5a 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -754,6 +754,27 @@ public: return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))); } + /*! If an error occurs, take the result from the Selector and subscribe to that instead. + + \tparam Selector the actual type of a function of the form `observable<T>(std::exception_ptr)` + + \param s the function of the form `observable<T>(std::exception_ptr)` + + \return Observable that emits the items from the source observable and switches to a new observable on error. + + \sample + \snippet on_error_resume_next.cpp on_error_resume_next sample + \snippet output.txt on_error_resume_next sample + */ + template<class Selector> + auto on_error_resume_next(Selector s) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s)))) + /// \endcond + { + return lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s))); + } + /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned. \tparam Selector the type of the transforming function diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4886ace..78f1fb7 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -52,6 +52,7 @@ namespace rxo=operators; #include "operators/rx-merge.hpp" #include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" +#include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-reduce.hpp" |