summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelelel <elel@3wh.net>2017-02-25 23:42:23 +0000
committerKirk Shoop <kirk.shoop@microsoft.com>2017-02-25 15:42:23 -0800
commitc808bb5b4536e35f25611db3c38fd9cda4a62c36 (patch)
tree3e64d29ec4344c9c79907d7f5b4e55d7df6eb15a
parent0b098e335609126add1e12cdf78740dec6571b1a (diff)
downloadRxCpp-c808bb5b4536e35f25611db3c38fd9cda4a62c36.tar.gz
Factor out commonalities between repeat and retry into a separate file (#363)
This reverts commit ad430c5bae364cf267f2b4a5cab703eb5672afbe. * Retry operator: remove old comment * Complete the error-reporting templates rollback * Rename retry/repeat common file * Fix filename in doxygen comment block
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-repeat.hpp146
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp153
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-retry.hpp157
3 files changed, 195 insertions, 261 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
index 3b11860..97ba197 100644
--- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
@@ -25,6 +25,7 @@
#define RXCPP_OPERATORS_RX_REPEAT_HPP
#include "../rx-includes.hpp"
+#include "rx-retry-repeat-common.hpp"
namespace rxcpp {
@@ -44,147 +45,36 @@ using repeat_invalid_t = typename repeat_invalid<AN...>::type;
// Contain repeat variations in a namespace
namespace repeat {
- // Structure to perform general repeat operations on state
- template <class ValuesType, class Subscriber, class T>
- struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>,
- public ValuesType {
-
- typedef Subscriber output_type;
- state_type(const ValuesType& i, const output_type& oarg)
- : ValuesType(i),
- source_lifetime(composite_subscription::empty()),
- out(oarg) {
+ struct event_handlers {
+ template <typename State>
+ static inline void on_error(State& state, std::exception_ptr& e) {
+ state->out.on_error(e);
}
- void do_subscribe() {
- auto state = this->shared_from_this();
-
- state->out.remove(state->lifetime_token);
- state->source_lifetime.unsubscribe();
-
- state->source_lifetime = composite_subscription();
- state->lifetime_token = state->out.add(state->source_lifetime);
-
- state->source.subscribe(
- state->out,
- state->source_lifetime,
- // on_next
- [state](T t) {
- state->out.on_next(t);
- },
- // on_error
- [state](std::exception_ptr e) {
- state->out.on_error(e);
- },
- // on_completed
- [state]() {
- state->update();
- // Use specialized predicate for finite/infinte case
- if (state->completed_predicate()) {
- state->out.on_completed();
- } else {
- state->do_subscribe();
- }
- }
- );
- }
-
- composite_subscription source_lifetime;
- output_type out;
- composite_subscription::weak_subscription lifetime_token;
- };
-
- // Finite repeat case (explicitely limited with the number of times)
- template <class T, class Observable, class Count>
- struct finite : public operator_base<T> {
- typedef rxu::decay_t<Observable> source_type;
- typedef rxu::decay_t<Count> count_type;
-
- struct values {
- values(source_type s, count_type t)
- : source(std::move(s)),
- remaining_(std::move(t)) {
- }
-
- inline bool completed_predicate() const {
- // Return true if we are completed
- return remaining_ <= 0;
- }
-
- inline void update() {
- // Decrement counter
- --remaining_;
- }
-
- source_type source;
-
- private:
- // Counter to hold number of times remaining to complete
- count_type remaining_;
- };
-
- finite(source_type s, count_type t)
- : initial_(std::move(s), std::move(t)) {
- }
-
- template<class Subscriber>
- void on_subscribe(const Subscriber& s) const {
- typedef state_type<values, Subscriber, T> state_t;
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_t>(initial_, s);
- if (initial_.completed_predicate()) {
- // return completed
+ template <typename State>
+ static inline void on_completed(State& state) {
+ // Functions update() and completed_predicate() vary between finite and infinte versions
+ state->update();
+ if (state->completed_predicate()) {
state->out.on_completed();
} else {
- // start the first iteration
state->do_subscribe();
}
}
-
- private:
- values initial_;
};
+ // Finite repeat case (explicitely limited with the number of times)
+ template <class T, class Observable, class Count>
+ using finite = ::rxcpp::operators::detail::retry_repeat_common::finite
+ <event_handlers, T, Observable, Count>;
+
// Infinite repeat case
template <class T, class Observable>
- struct infinite : public operator_base<T> {
- typedef rxu::decay_t<Observable> source_type;
-
- struct values {
- values(source_type s)
- : source(std::move(s)) {
- }
-
- static inline bool completed_predicate() {
- // Infinite repeat never completes
- return false;
- }
-
- static inline void update() {
- // Infinite repeat does not need to update state
- }
-
- source_type source;
- };
-
- infinite(source_type s) : initial_(std::move(s)) {
- }
-
- template<class Subscriber>
- void on_subscribe(const Subscriber& s) const {
- typedef state_type<values, Subscriber, T> state_t;
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_t>(initial_, s);
- // start the first iteration
- state->do_subscribe();
- }
-
- private:
- values initial_;
- };
-}
+ using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite
+ <event_handlers, T, Observable>;
}
+} // detail
/*! @copydoc rx-repeat.hpp
*/
diff --git a/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
new file mode 100644
index 0000000..373d9b3
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
@@ -0,0 +1,153 @@
+#pragma once
+
+/*! \file rx-retry-repeat-common.hpp
+
+ \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
+
+*/
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+ namespace operators {
+ namespace detail {
+
+ namespace retry_repeat_common {
+ // Structure to perform general retry/repeat operations on state
+ template <class Values, class Subscriber, class EventHandlers, class T>
+ struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
+ public Values {
+
+ typedef Subscriber output_type;
+ state_type(const Values& i, const output_type& oarg)
+ : Values(i),
+ source_lifetime(composite_subscription::empty()),
+ out(oarg) {
+ }
+
+ void do_subscribe() {
+ auto state = this->shared_from_this();
+
+ state->out.remove(state->lifetime_token);
+ state->source_lifetime.unsubscribe();
+
+ state->source_lifetime = composite_subscription();
+ state->lifetime_token = state->out.add(state->source_lifetime);
+
+ state->source.subscribe(
+ state->out,
+ state->source_lifetime,
+ // on_next
+ [state](T t) {
+ state->out.on_next(t);
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ EventHandlers::on_error(state, e);
+ },
+ // on_completed
+ [state]() {
+ EventHandlers::on_completed(state);
+ }
+ );
+ }
+
+ composite_subscription source_lifetime;
+ output_type out;
+ composite_subscription::weak_subscription lifetime_token;
+ };
+
+ // Finite case (explicitely limited with the number of times)
+ template <class EventHandlers, class T, class Observable, class Count>
+ struct finite : public operator_base<T> {
+ typedef rxu::decay_t<Observable> source_type;
+ typedef rxu::decay_t<Count> count_type;
+
+ struct values {
+ values(source_type s, count_type t)
+ : source(std::move(s)),
+ remaining_(std::move(t)) {
+ }
+
+ inline bool completed_predicate() const {
+ // Return true if we are completed
+ return remaining_ <= 0;
+ }
+
+ inline void update() {
+ // Decrement counter
+ --remaining_;
+ }
+
+ source_type source;
+
+ private:
+ // Counter to hold number of times remaining to complete
+ count_type remaining_;
+ };
+
+ finite(source_type s, count_type t)
+ : initial_(std::move(s), std::move(t)) {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+ typedef state_type<values, Subscriber, EventHandlers, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ if (initial_.completed_predicate()) {
+ // return completed
+ state->out.on_completed();
+ } else {
+ // start the first iteration
+ state->do_subscribe();
+ }
+ }
+
+ private:
+ values initial_;
+ };
+
+ // Infinite case
+ template <class EventHandlers, class T, class Observable>
+ struct infinite : public operator_base<T> {
+ typedef rxu::decay_t<Observable> source_type;
+
+ struct values {
+ values(source_type s)
+ : source(std::move(s)) {
+ }
+
+ static inline bool completed_predicate() {
+ // Infinite never completes
+ return false;
+ }
+
+ static inline void update() {
+ // Infinite does not need to update state
+ }
+
+ source_type source;
+ };
+
+ infinite(source_type s) : initial_(std::move(s)) {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+ typedef state_type<values, Subscriber, EventHandlers, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ // start the first iteration
+ state->do_subscribe();
+ }
+
+ private:
+ values initial_;
+ };
+
+
+ }
+ }
+ }
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-retry.hpp b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
index 073aa6c..3cee7d3 100644
--- a/Rx/v2/src/rxcpp/operators/rx-retry.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
@@ -21,6 +21,7 @@
#define RXCPP_OPERATORS_RX_RETRY_HPP
#include "../rx-includes.hpp"
+#include "rx-retry-repeat-common.hpp"
namespace rxcpp {
@@ -33,153 +34,43 @@ struct retry_invalid_arguments {};
template<class... AN>
struct retry_invalid : public rxo::operator_base<retry_invalid_arguments<AN...>> {
- using type = observable<retry_invalid_arguments<AN...>, retry_invalid<AN...>>;
+ using type = observable<retry_invalid_arguments<AN...>, retry_invalid<AN...>>;
};
template<class... AN>
using retry_invalid_t = typename retry_invalid<AN...>::type;
// Contain retry variations in a namespace
namespace retry {
- // Structure to perform general retry operations on state
- template <class ValuesType, class Subscriber, class T>
- struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>,
- public ValuesType {
- typedef Subscriber output_type;
- state_type(const ValuesType& i, const output_type& oarg)
- : ValuesType(i)
- , source_lifetime(composite_subscription::empty())
- , out(oarg) {
- }
-
- void do_subscribe() {
- auto state = this->shared_from_this();
-
- state->out.remove(state->lifetime_token);
- state->source_lifetime.unsubscribe();
-
- state->source_lifetime = composite_subscription();
- state->lifetime_token = state->out.add(state->source_lifetime);
-
-
- state->source.subscribe(
- state->out,
- state->source_lifetime,
- // on_next
- [state](T t) {
- state->out.on_next(t);
- },
- // on_error
- [state](std::exception_ptr e) {
- state->update();
- // Use specialized predicate for finite/infinte case
- if (state->completed_predicate()) {
- state->out.on_error(e);
- } else {
- state->do_subscribe();
- }
- },
- // on_completed
- [state]() {
- // JEP: never appeears to be called?
- state->out.on_completed();
- }
- );
- }
-
- composite_subscription source_lifetime;
- output_type out;
- composite_subscription::weak_subscription lifetime_token;
- };
-
- // Finite retry case (explicitly limited with the number of times)
- template<class T, class Observable, class Count>
- struct finite : public operator_base<T> {
- typedef rxu::decay_t<Observable> source_type;
- typedef rxu::decay_t<Count> count_type;
-
- struct values {
- values(source_type s, count_type t)
- : source(std::move(s)), remaining_(std::move(t)) {
- }
-
- inline bool completed_predicate() const {
- // Return true if we are completed
- return remaining_ <= 0;
- }
-
- inline void update() {
- // Decrement counter
- --remaining_;
- }
-
- source_type source;
-
- private:
- count_type remaining_;
- };
-
- finite(source_type s, count_type t)
- : initial_(std::move(s), std::move(t)) {
- }
-
- template<class Subscriber>
- void on_subscribe(const Subscriber& s) const {
- typedef state_type<values, Subscriber, T> state_t;
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_t>(initial_, s);
- if (initial_.completed_predicate()) {
- // Should we call it here in retry?
- state->out.on_completed();
+ struct event_handlers {
+ template <typename State>
+ static inline void on_error(State& state, std::exception_ptr& e) {
+ state->update();
+ // Use specialized predicate for finite/infinte case
+ if (state->completed_predicate()) {
+ state->out.on_error(e);
} else {
- // start the first iteration
state->do_subscribe();
}
}
-
- private:
- values initial_;
- };
-
- // Infinite retry case
- template<class T, class Observable>
- struct infinite : public operator_base<T> {
- typedef rxu::decay_t<Observable> source_type;
-
- struct values {
- values(source_type s)
- : source(std::move(s)) {
- }
-
- static inline bool completed_predicate() {
- // Infinite retry never stops ignoring errors
- return false;
- }
-
- static inline void update() {
- // Infinite retry does not need to update state
- }
-
- source_type source;
- };
-
- infinite(source_type s) : initial_(std::move(s)) {
- }
-
- template<class Subscriber>
- void on_subscribe(const Subscriber& s) const {
- typedef state_type<values, Subscriber, T> state_t;
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_t>(initial_, s);
- // start the first iteration
- state->do_subscribe();
+
+ template <typename State>
+ static inline void on_completed(State& state) {
+ state->out.on_completed();
}
-
- private:
- values initial_;
};
+ // Finite repeat case (explicitely limited with the number of times)
+ template <class T, class Observable, class Count>
+ using finite = ::rxcpp::operators::detail::retry_repeat_common::finite
+ <event_handlers, T, Observable, Count>;
+
+ // Infinite repeat case
+ template <class T, class Observable>
+ using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite
+ <event_handlers, T, Observable>;
+
}
-}
+} // detail
/*! @copydoc rx-retry.hpp
*/