From c808bb5b4536e35f25611db3c38fd9cda4a62c36 Mon Sep 17 00:00:00 2001 From: elelel Date: Sat, 25 Feb 2017 23:42:23 +0000 Subject: Factor out commonalities between repeat and retry into a separate file (#363) This reverts commit ad430c5bae364cf267f2b4a5cab703eb5672afbe. * Retry operator: remove old comment * Complete the error-reporting templates rollback * Rename retry/repeat common file * Fix filename in doxygen comment block --- Rx/v2/src/rxcpp/operators/rx-repeat.hpp | 146 +++---------------- .../src/rxcpp/operators/rx-retry-repeat-common.hpp | 153 ++++++++++++++++++++ Rx/v2/src/rxcpp/operators/rx-retry.hpp | 157 ++++----------------- 3 files changed, 195 insertions(+), 261 deletions(-) create mode 100644 Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp (limited to 'Rx') diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 3b11860..97ba197 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -25,6 +25,7 @@ #define RXCPP_OPERATORS_RX_REPEAT_HPP #include "../rx-includes.hpp" +#include "rx-retry-repeat-common.hpp" namespace rxcpp { @@ -44,147 +45,36 @@ using repeat_invalid_t = typename repeat_invalid::type; // Contain repeat variations in a namespace namespace repeat { - // Structure to perform general repeat operations on state - template - struct state_type : public std::enable_shared_from_this>, - public ValuesType { - - typedef Subscriber output_type; - state_type(const ValuesType& i, const output_type& oarg) - : ValuesType(i), - source_lifetime(composite_subscription::empty()), - out(oarg) { + struct event_handlers { + template + static inline void on_error(State& state, std::exception_ptr& e) { + state->out.on_error(e); } - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - state->update(); - // Use specialized predicate for finite/infinte case - if (state->completed_predicate()) { - state->out.on_completed(); - } else { - state->do_subscribe(); - } - } - ); - } - - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - }; - - // Finite repeat case (explicitely limited with the number of times) - template - struct finite : public operator_base { - typedef rxu::decay_t source_type; - typedef rxu::decay_t count_type; - - struct values { - values(source_type s, count_type t) - : source(std::move(s)), - remaining_(std::move(t)) { - } - - inline bool completed_predicate() const { - // Return true if we are completed - return remaining_ <= 0; - } - - inline void update() { - // Decrement counter - --remaining_; - } - - source_type source; - - private: - // Counter to hold number of times remaining to complete - count_type remaining_; - }; - - finite(source_type s, count_type t) - : initial_(std::move(s), std::move(t)) { - } - - template - void on_subscribe(const Subscriber& s) const { - typedef state_type state_t; - // take a copy of the values for each subscription - auto state = std::make_shared(initial_, s); - if (initial_.completed_predicate()) { - // return completed + template + static inline void on_completed(State& state) { + // Functions update() and completed_predicate() vary between finite and infinte versions + state->update(); + if (state->completed_predicate()) { state->out.on_completed(); } else { - // start the first iteration state->do_subscribe(); } } - - private: - values initial_; }; + // Finite repeat case (explicitely limited with the number of times) + template + using finite = ::rxcpp::operators::detail::retry_repeat_common::finite + ; + // Infinite repeat case template - struct infinite : public operator_base { - typedef rxu::decay_t source_type; - - struct values { - values(source_type s) - : source(std::move(s)) { - } - - static inline bool completed_predicate() { - // Infinite repeat never completes - return false; - } - - static inline void update() { - // Infinite repeat does not need to update state - } - - source_type source; - }; - - infinite(source_type s) : initial_(std::move(s)) { - } - - template - void on_subscribe(const Subscriber& s) const { - typedef state_type state_t; - // take a copy of the values for each subscription - auto state = std::make_shared(initial_, s); - // start the first iteration - state->do_subscribe(); - } - - private: - values initial_; - }; -} + using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite + ; } +} // detail /*! @copydoc rx-repeat.hpp */ diff --git a/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp new file mode 100644 index 0000000..373d9b3 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp @@ -0,0 +1,153 @@ +#pragma once + +/*! \file rx-retry-repeat-common.hpp + + \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp + +*/ + +#include "../rx-includes.hpp" + +namespace rxcpp { + namespace operators { + namespace detail { + + namespace retry_repeat_common { + // Structure to perform general retry/repeat operations on state + template + struct state_type : public std::enable_shared_from_this>, + public Values { + + typedef Subscriber output_type; + state_type(const Values& i, const output_type& oarg) + : Values(i), + source_lifetime(composite_subscription::empty()), + out(oarg) { + } + + void do_subscribe() { + auto state = this->shared_from_this(); + + state->out.remove(state->lifetime_token); + state->source_lifetime.unsubscribe(); + + state->source_lifetime = composite_subscription(); + state->lifetime_token = state->out.add(state->source_lifetime); + + state->source.subscribe( + state->out, + state->source_lifetime, + // on_next + [state](T t) { + state->out.on_next(t); + }, + // on_error + [state](std::exception_ptr e) { + EventHandlers::on_error(state, e); + }, + // on_completed + [state]() { + EventHandlers::on_completed(state); + } + ); + } + + composite_subscription source_lifetime; + output_type out; + composite_subscription::weak_subscription lifetime_token; + }; + + // Finite case (explicitely limited with the number of times) + template + struct finite : public operator_base { + typedef rxu::decay_t source_type; + typedef rxu::decay_t count_type; + + struct values { + values(source_type s, count_type t) + : source(std::move(s)), + remaining_(std::move(t)) { + } + + inline bool completed_predicate() const { + // Return true if we are completed + return remaining_ <= 0; + } + + inline void update() { + // Decrement counter + --remaining_; + } + + source_type source; + + private: + // Counter to hold number of times remaining to complete + count_type remaining_; + }; + + finite(source_type s, count_type t) + : initial_(std::move(s), std::move(t)) { + } + + template + void on_subscribe(const Subscriber& s) const { + typedef state_type state_t; + // take a copy of the values for each subscription + auto state = std::make_shared(initial_, s); + if (initial_.completed_predicate()) { + // return completed + state->out.on_completed(); + } else { + // start the first iteration + state->do_subscribe(); + } + } + + private: + values initial_; + }; + + // Infinite case + template + struct infinite : public operator_base { + typedef rxu::decay_t source_type; + + struct values { + values(source_type s) + : source(std::move(s)) { + } + + static inline bool completed_predicate() { + // Infinite never completes + return false; + } + + static inline void update() { + // Infinite does not need to update state + } + + source_type source; + }; + + infinite(source_type s) : initial_(std::move(s)) { + } + + template + void on_subscribe(const Subscriber& s) const { + typedef state_type state_t; + // take a copy of the values for each subscription + auto state = std::make_shared(initial_, s); + // start the first iteration + state->do_subscribe(); + } + + private: + values initial_; + }; + + + } + } + } +} diff --git a/Rx/v2/src/rxcpp/operators/rx-retry.hpp b/Rx/v2/src/rxcpp/operators/rx-retry.hpp index 073aa6c..3cee7d3 100644 --- a/Rx/v2/src/rxcpp/operators/rx-retry.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-retry.hpp @@ -21,6 +21,7 @@ #define RXCPP_OPERATORS_RX_RETRY_HPP #include "../rx-includes.hpp" +#include "rx-retry-repeat-common.hpp" namespace rxcpp { @@ -33,153 +34,43 @@ struct retry_invalid_arguments {}; template struct retry_invalid : public rxo::operator_base> { - using type = observable, retry_invalid>; + using type = observable, retry_invalid>; }; template using retry_invalid_t = typename retry_invalid::type; // Contain retry variations in a namespace namespace retry { - // Structure to perform general retry operations on state - template - struct state_type : public std::enable_shared_from_this>, - public ValuesType { - typedef Subscriber output_type; - state_type(const ValuesType& i, const output_type& oarg) - : ValuesType(i) - , source_lifetime(composite_subscription::empty()) - , out(oarg) { - } - - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->update(); - // Use specialized predicate for finite/infinte case - if (state->completed_predicate()) { - state->out.on_error(e); - } else { - state->do_subscribe(); - } - }, - // on_completed - [state]() { - // JEP: never appeears to be called? - state->out.on_completed(); - } - ); - } - - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - }; - - // Finite retry case (explicitly limited with the number of times) - template - struct finite : public operator_base { - typedef rxu::decay_t source_type; - typedef rxu::decay_t count_type; - - struct values { - values(source_type s, count_type t) - : source(std::move(s)), remaining_(std::move(t)) { - } - - inline bool completed_predicate() const { - // Return true if we are completed - return remaining_ <= 0; - } - - inline void update() { - // Decrement counter - --remaining_; - } - - source_type source; - - private: - count_type remaining_; - }; - - finite(source_type s, count_type t) - : initial_(std::move(s), std::move(t)) { - } - - template - void on_subscribe(const Subscriber& s) const { - typedef state_type state_t; - // take a copy of the values for each subscription - auto state = std::make_shared(initial_, s); - if (initial_.completed_predicate()) { - // Should we call it here in retry? - state->out.on_completed(); + struct event_handlers { + template + static inline void on_error(State& state, std::exception_ptr& e) { + state->update(); + // Use specialized predicate for finite/infinte case + if (state->completed_predicate()) { + state->out.on_error(e); } else { - // start the first iteration state->do_subscribe(); } } - - private: - values initial_; - }; - - // Infinite retry case - template - struct infinite : public operator_base { - typedef rxu::decay_t source_type; - - struct values { - values(source_type s) - : source(std::move(s)) { - } - - static inline bool completed_predicate() { - // Infinite retry never stops ignoring errors - return false; - } - - static inline void update() { - // Infinite retry does not need to update state - } - - source_type source; - }; - - infinite(source_type s) : initial_(std::move(s)) { - } - - template - void on_subscribe(const Subscriber& s) const { - typedef state_type state_t; - // take a copy of the values for each subscription - auto state = std::make_shared(initial_, s); - // start the first iteration - state->do_subscribe(); + + template + static inline void on_completed(State& state) { + state->out.on_completed(); } - - private: - values initial_; }; + // Finite repeat case (explicitely limited with the number of times) + template + using finite = ::rxcpp::operators::detail::retry_repeat_common::finite + ; + + // Infinite repeat case + template + using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite + ; + } -} +} // detail /*! @copydoc rx-retry.hpp */ -- cgit v1.2.3