diff options
author | elelel <elel@3wh.net> | 2017-02-25 23:42:23 +0000 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-02-25 15:42:23 -0800 |
commit | c808bb5b4536e35f25611db3c38fd9cda4a62c36 (patch) | |
tree | 3e64d29ec4344c9c79907d7f5b4e55d7df6eb15a | |
parent | 0b098e335609126add1e12cdf78740dec6571b1a (diff) | |
download | RxCpp-c808bb5b4536e35f25611db3c38fd9cda4a62c36.tar.gz |
Factor out commonalities between repeat and retry into a separate file (#363)
This reverts commit ad430c5bae364cf267f2b4a5cab703eb5672afbe.
* Retry operator: remove old comment
* Complete the error-reporting templates rollback
* Rename retry/repeat common file
* Fix filename in doxygen comment block
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-repeat.hpp | 146 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp | 153 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-retry.hpp | 157 |
3 files changed, 195 insertions, 261 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 3b11860..97ba197 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -25,6 +25,7 @@ #define RXCPP_OPERATORS_RX_REPEAT_HPP #include "../rx-includes.hpp" +#include "rx-retry-repeat-common.hpp" namespace rxcpp { @@ -44,147 +45,36 @@ using repeat_invalid_t = typename repeat_invalid<AN...>::type; // Contain repeat variations in a namespace namespace repeat { - // Structure to perform general repeat operations on state - template <class ValuesType, class Subscriber, class T> - struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>, - public ValuesType { - - typedef Subscriber output_type; - state_type(const ValuesType& i, const output_type& oarg) - : ValuesType(i), - source_lifetime(composite_subscription::empty()), - out(oarg) { + struct event_handlers { + template <typename State> + static inline void on_error(State& state, std::exception_ptr& e) { + state->out.on_error(e); } - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - state->update(); - // Use specialized predicate for finite/infinte case - if (state->completed_predicate()) { - state->out.on_completed(); - } else { - state->do_subscribe(); - } - } - ); - } - - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - }; - - // Finite repeat case (explicitely limited with the number of times) - template <class T, class Observable, class Count> - struct finite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - typedef rxu::decay_t<Count> count_type; - - struct values { - values(source_type s, count_type t) - : source(std::move(s)), - remaining_(std::move(t)) { - } - - inline bool completed_predicate() const { - // Return true if we are completed - return remaining_ <= 0; - } - - inline void update() { - // Decrement counter - --remaining_; - } - - source_type source; - - private: - // Counter to hold number of times remaining to complete - count_type remaining_; - }; - - finite(source_type s, count_type t) - : initial_(std::move(s), std::move(t)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - if (initial_.completed_predicate()) { - // return completed + template <typename State> + static inline void on_completed(State& state) { + // Functions update() and completed_predicate() vary between finite and infinte versions + state->update(); + if (state->completed_predicate()) { state->out.on_completed(); } else { - // start the first iteration state->do_subscribe(); } } - - private: - values initial_; }; + // Finite repeat case (explicitely limited with the number of times) + template <class T, class Observable, class Count> + using finite = ::rxcpp::operators::detail::retry_repeat_common::finite + <event_handlers, T, Observable, Count>; + // Infinite repeat case template <class T, class Observable> - struct infinite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - - struct values { - values(source_type s) - : source(std::move(s)) { - } - - static inline bool completed_predicate() { - // Infinite repeat never completes - return false; - } - - static inline void update() { - // Infinite repeat does not need to update state - } - - source_type source; - }; - - infinite(source_type s) : initial_(std::move(s)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - // start the first iteration - state->do_subscribe(); - } - - private: - values initial_; - }; -} + using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite + <event_handlers, T, Observable>; } +} // detail /*! @copydoc rx-repeat.hpp */ diff --git a/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp new file mode 100644 index 0000000..373d9b3 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp @@ -0,0 +1,153 @@ +#pragma once + +/*! \file rx-retry-repeat-common.hpp + + \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp + +*/ + +#include "../rx-includes.hpp" + +namespace rxcpp { + namespace operators { + namespace detail { + + namespace retry_repeat_common { + // Structure to perform general retry/repeat operations on state + template <class Values, class Subscriber, class EventHandlers, class T> + struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>, + public Values { + + typedef Subscriber output_type; + state_type(const Values& i, const output_type& oarg) + : Values(i), + source_lifetime(composite_subscription::empty()), + out(oarg) { + } + + void do_subscribe() { + auto state = this->shared_from_this(); + + state->out.remove(state->lifetime_token); + state->source_lifetime.unsubscribe(); + + state->source_lifetime = composite_subscription(); + state->lifetime_token = state->out.add(state->source_lifetime); + + state->source.subscribe( + state->out, + state->source_lifetime, + // on_next + [state](T t) { + state->out.on_next(t); + }, + // on_error + [state](std::exception_ptr e) { + EventHandlers::on_error(state, e); + }, + // on_completed + [state]() { + EventHandlers::on_completed(state); + } + ); + } + + composite_subscription source_lifetime; + output_type out; + composite_subscription::weak_subscription lifetime_token; + }; + + // Finite case (explicitely limited with the number of times) + template <class EventHandlers, class T, class Observable, class Count> + struct finite : public operator_base<T> { + typedef rxu::decay_t<Observable> source_type; + typedef rxu::decay_t<Count> count_type; + + struct values { + values(source_type s, count_type t) + : source(std::move(s)), + remaining_(std::move(t)) { + } + + inline bool completed_predicate() const { + // Return true if we are completed + return remaining_ <= 0; + } + + inline void update() { + // Decrement counter + --remaining_; + } + + source_type source; + + private: + // Counter to hold number of times remaining to complete + count_type remaining_; + }; + + finite(source_type s, count_type t) + : initial_(std::move(s), std::move(t)) { + } + + template<class Subscriber> + void on_subscribe(const Subscriber& s) const { + typedef state_type<values, Subscriber, EventHandlers, T> state_t; + // take a copy of the values for each subscription + auto state = std::make_shared<state_t>(initial_, s); + if (initial_.completed_predicate()) { + // return completed + state->out.on_completed(); + } else { + // start the first iteration + state->do_subscribe(); + } + } + + private: + values initial_; + }; + + // Infinite case + template <class EventHandlers, class T, class Observable> + struct infinite : public operator_base<T> { + typedef rxu::decay_t<Observable> source_type; + + struct values { + values(source_type s) + : source(std::move(s)) { + } + + static inline bool completed_predicate() { + // Infinite never completes + return false; + } + + static inline void update() { + // Infinite does not need to update state + } + + source_type source; + }; + + infinite(source_type s) : initial_(std::move(s)) { + } + + template<class Subscriber> + void on_subscribe(const Subscriber& s) const { + typedef state_type<values, Subscriber, EventHandlers, T> state_t; + // take a copy of the values for each subscription + auto state = std::make_shared<state_t>(initial_, s); + // start the first iteration + state->do_subscribe(); + } + + private: + values initial_; + }; + + + } + } + } +} diff --git a/Rx/v2/src/rxcpp/operators/rx-retry.hpp b/Rx/v2/src/rxcpp/operators/rx-retry.hpp index 073aa6c..3cee7d3 100644 --- a/Rx/v2/src/rxcpp/operators/rx-retry.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-retry.hpp @@ -21,6 +21,7 @@ #define RXCPP_OPERATORS_RX_RETRY_HPP #include "../rx-includes.hpp" +#include "rx-retry-repeat-common.hpp" namespace rxcpp { @@ -33,153 +34,43 @@ struct retry_invalid_arguments {}; template<class... AN> struct retry_invalid : public rxo::operator_base<retry_invalid_arguments<AN...>> { - using type = observable<retry_invalid_arguments<AN...>, retry_invalid<AN...>>; + using type = observable<retry_invalid_arguments<AN...>, retry_invalid<AN...>>; }; template<class... AN> using retry_invalid_t = typename retry_invalid<AN...>::type; // Contain retry variations in a namespace namespace retry { - // Structure to perform general retry operations on state - template <class ValuesType, class Subscriber, class T> - struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>, - public ValuesType { - typedef Subscriber output_type; - state_type(const ValuesType& i, const output_type& oarg) - : ValuesType(i) - , source_lifetime(composite_subscription::empty()) - , out(oarg) { - } - - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->update(); - // Use specialized predicate for finite/infinte case - if (state->completed_predicate()) { - state->out.on_error(e); - } else { - state->do_subscribe(); - } - }, - // on_completed - [state]() { - // JEP: never appeears to be called? - state->out.on_completed(); - } - ); - } - - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - }; - - // Finite retry case (explicitly limited with the number of times) - template<class T, class Observable, class Count> - struct finite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - typedef rxu::decay_t<Count> count_type; - - struct values { - values(source_type s, count_type t) - : source(std::move(s)), remaining_(std::move(t)) { - } - - inline bool completed_predicate() const { - // Return true if we are completed - return remaining_ <= 0; - } - - inline void update() { - // Decrement counter - --remaining_; - } - - source_type source; - - private: - count_type remaining_; - }; - - finite(source_type s, count_type t) - : initial_(std::move(s), std::move(t)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - if (initial_.completed_predicate()) { - // Should we call it here in retry? - state->out.on_completed(); + struct event_handlers { + template <typename State> + static inline void on_error(State& state, std::exception_ptr& e) { + state->update(); + // Use specialized predicate for finite/infinte case + if (state->completed_predicate()) { + state->out.on_error(e); } else { - // start the first iteration state->do_subscribe(); } } - - private: - values initial_; - }; - - // Infinite retry case - template<class T, class Observable> - struct infinite : public operator_base<T> { - typedef rxu::decay_t<Observable> source_type; - - struct values { - values(source_type s) - : source(std::move(s)) { - } - - static inline bool completed_predicate() { - // Infinite retry never stops ignoring errors - return false; - } - - static inline void update() { - // Infinite retry does not need to update state - } - - source_type source; - }; - - infinite(source_type s) : initial_(std::move(s)) { - } - - template<class Subscriber> - void on_subscribe(const Subscriber& s) const { - typedef state_type<values, Subscriber, T> state_t; - // take a copy of the values for each subscription - auto state = std::make_shared<state_t>(initial_, s); - // start the first iteration - state->do_subscribe(); + + template <typename State> + static inline void on_completed(State& state) { + state->out.on_completed(); } - - private: - values initial_; }; + // Finite repeat case (explicitely limited with the number of times) + template <class T, class Observable, class Count> + using finite = ::rxcpp::operators::detail::retry_repeat_common::finite + <event_handlers, T, Observable, Count>; + + // Infinite repeat case + template <class T, class Observable> + using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite + <event_handlers, T, Observable>; + } -} +} // detail /*! @copydoc rx-retry.hpp */ |