diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-repeat.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-repeat.hpp | 146 |
1 files changed, 18 insertions, 128 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 3b11860..97ba197 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -25,6 +25,7 @@ #define RXCPP_OPERATORS_RX_REPEAT_HPP #include "../rx-includes.hpp" +#include "rx-retry-repeat-common.hpp" namespace rxcpp { @@ -44,147 +45,36 @@ using repeat_invalid_t = typename repeat_invalid<AN...>::type; // Contain repeat variations in a namespace namespace repeat { - // Structure to perform general repeat operations on state - template <class ValuesType, class Subscriber, class T> - struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>, - public ValuesType { - - typedef Subscriber output_type; - state_type(const ValuesType& i, const output_type& oarg) - : ValuesType(i), - source_lifetime(composite_subscription::empty()), - out(oarg) { + struct event_handlers { + template <typename State> + static inline void on_error(State& state, std::exception_ptr& e) { + state->out.on_error(e); } - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - state->update(); - // Use specialized predicate for finite/infinte case - if (state->completed_predicate()) { - state->out.on_completed(); - } else { - state->do_subscribe(); - } - } - ); - } - - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - }; - - // Finite repeat case (explicitely limited with the number of times) - template <class T, class Observable, class Count> - struct finite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - typedef rxu::decay_t<Count> count_type; - - struct values { - values(source_type s, count_type t) - : source(std::move(s)), - remaining_(std::move(t)) { - } - - inline bool completed_predicate() const { - // Return true if we are completed - return remaining_ <= 0; - } - - inline void update() { - // Decrement counter - --remaining_; - } - - source_type source; - - private: - // Counter to hold number of times remaining to complete - count_type remaining_; - }; - - finite(source_type s, count_type t) - : initial_(std::move(s), std::move(t)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - if (initial_.completed_predicate()) { - // return completed + template <typename State> + static inline void on_completed(State& state) { + // Functions update() and completed_predicate() vary between finite and infinte versions + state->update(); + if (state->completed_predicate()) { state->out.on_completed(); } else { - // start the first iteration state->do_subscribe(); } } - - private: - values initial_; }; + // Finite repeat case (explicitely limited with the number of times) + template <class T, class Observable, class Count> + using finite = ::rxcpp::operators::detail::retry_repeat_common::finite + <event_handlers, T, Observable, Count>; + // Infinite repeat case template <class T, class Observable> - struct infinite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - - struct values { - values(source_type s) - : source(std::move(s)) { - } - - static inline bool completed_predicate() { - // Infinite repeat never completes - return false; - } - - static inline void update() { - // Infinite repeat does not need to update state - } - - source_type source; - }; - - infinite(source_type s) : initial_(std::move(s)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - // start the first iteration - state->do_subscribe(); - } - - private: - values initial_; - }; -} + using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite + <event_handlers, T, Observable>; } +} // detail /*! @copydoc rx-repeat.hpp */ |