summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorelelel <elel@3wh.net>2017-02-23 15:29:10 +0000
committerKirk Shoop <kirk.shoop@microsoft.com>2017-02-23 07:29:10 -0800
commitb7939f8971d18dcedc1666f6466a868f7ed04794 (patch)
tree2d4dddcc0dc7ac5b5b6746ad5fad29bd09c45ea1 /Rx/v2
parent7919ac2aa228515456b0d46367ac7d92a50d05ce (diff)
downloadRxCpp-b7939f8971d18dcedc1666f6466a868f7ed04794.tar.gz
Retry(0)/Retry() operator fix (#358)
* Change retry description to conform with other Rx implementations * Retry operator common state * Repeat operator: finite case * Retry operator: infinite case * Fix misc dev-stage errors * Retry: tests and fixes
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-retry.hpp247
-rw-r--r--Rx/v2/test/operators/retry.cpp44
2 files changed, 196 insertions, 95 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-retry.hpp b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
index e7cd7c7..f901faa 100644
--- a/Rx/v2/src/rxcpp/operators/rx-retry.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
@@ -8,7 +8,7 @@
\tparam Count the type of the counter (optional)
- \param t the number of retries (optional) If not specified or 0, infinitely retries the source observable.
+ \param t the number of retries (optional) If not specified, infinitely retries the source observable. Sepcifying returns immediately without subscribing
\return An observable that mirrors the source observable, resubscribing to it if it calls on_error up to a specified number of retries.
@@ -38,82 +38,143 @@ struct retry_invalid : public rxo::operator_base<retry_invalid_arguments<AN...>>
template<class... AN>
using retry_invalid_t = typename retry_invalid<AN...>::type;
-template<class T, class Observable, class Count>
-struct retry : public operator_base<T> {
+// Contain retry variations in a namespace
+namespace retry {
+ // Structure to perform general retry operations on state
+ template <class ValuesType, class Subscriber, class T>
+ struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>,
+ public ValuesType {
+ typedef Subscriber output_type;
+ state_type(const ValuesType& i, const output_type& oarg)
+ : ValuesType(i)
+ , source_lifetime(composite_subscription::empty())
+ , out(oarg) {
+ }
+
+ void do_subscribe() {
+ auto state = this->shared_from_this();
+
+ state->source_lifetime = composite_subscription();
+ state->out.add(state->source_lifetime);
+
+ state->source.subscribe(
+ state->out,
+ state->source_lifetime,
+ // on_next
+ [state](T t) {
+ state->out.on_next(t);
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->update();
+ // Use specialized predicate for finite/infinte case
+ if (state->completed_predicate()) {
+ state->out.on_error(e);
+ } else {
+ state->do_subscribe();
+ }
+ },
+ // on_completed
+ [state]() {
+ // JEP: never appeears to be called?
+ state->out.on_completed();
+ }
+ );
+ }
+
+ composite_subscription source_lifetime;
+ output_type out;
+ };
+
+ // Finite retry case (explicitly limited with the number of times)
+ template<class T, class Observable, class Count>
+ struct finite : public operator_base<T> {
typedef rxu::decay_t<Observable> source_type;
typedef rxu::decay_t<Count> count_type;
struct values {
- values(source_type s, count_type t)
- : source(std::move(s))
- , remaining(std::move(t))
- , retry_infinitely(t == 0) {
- }
- source_type source;
- count_type remaining;
- bool retry_infinitely;
+ values(source_type s, count_type t)
+ : source(std::move(s)), remaining_(std::move(t)) {
+ if (remaining_ != 0) ++remaining_;
+ }
+
+ inline bool completed_predicate() const {
+ // Return true if we are completed
+ return remaining_ <= 0;
+ }
+
+ inline void update() {
+ // Decrement counter
+ --remaining_;
+ }
+
+ source_type source;
+
+ private:
+ count_type remaining_;
};
- values initial;
- retry(source_type s, count_type t)
- : initial(std::move(s), std::move(t)) {
+ finite(source_type s, count_type t)
+ : initial_(std::move(s), std::move(t)) {
}
template<class Subscriber>
void on_subscribe(const Subscriber& s) const {
-
- typedef Subscriber output_type;
- struct state_type
- : public std::enable_shared_from_this<state_type>
- , public values {
- state_type(const values& i, const output_type& oarg)
- : values(i)
- , source_lifetime(composite_subscription::empty())
- , out(oarg) {
- }
- composite_subscription source_lifetime;
- output_type out;
-
- void do_subscribe() {
- auto state = this->shared_from_this();
-
- state->source_lifetime = composite_subscription();
- state->out.add(state->source_lifetime);
-
- state->source.subscribe(
- state->out,
- state->source_lifetime,
- // on_next
- [state](T t) {
- state->out.on_next(t);
- },
- // on_error
- [state](std::exception_ptr e) {
- if (state->retry_infinitely || (--state->remaining >= 0)) {
- state->do_subscribe();
- } else {
- state->out.on_error(e);
- }
- },
- // on_completed
- [state]() {
-
- // JEP: never appeears to be called?
-
- state->out.on_completed();
- }
- );
- }
- };
-
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_type>(initial, s);
-
+ typedef state_type<values, Subscriber, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ if (initial_.completed_predicate()) {
+ // Should we call it here in retry?
+ state->out.on_completed();
+ } else {
// start the first iteration
state->do_subscribe();
+ }
+ }
+
+ private:
+ values initial_;
+ };
+
+ // Infinite retry case
+ template<class T, class Observable>
+ struct infinite : public operator_base<T> {
+ typedef rxu::decay_t<Observable> source_type;
+
+ struct values {
+ values(source_type s)
+ : source(std::move(s)) {
+ }
+
+ static inline bool completed_predicate() {
+ // Infinite retry never stops ignoring errors
+ return false;
+ }
+
+ static inline void update() {
+ // Infinite retry does not need to update state
+ }
+
+ source_type source;
+ };
+
+ infinite(source_type s) : initial_(std::move(s)) {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+ typedef state_type<values, Subscriber, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ // start the first iteration
+ state->do_subscribe();
}
-};
+ private:
+ values initial_;
+ };
+
+}
}
/*! @copydoc rx-retry.hpp
@@ -129,37 +190,35 @@ auto retry(AN&&... an)
template<>
struct member_overload<retry_tag>
{
- template<class Observable,
- class Enabled = rxu::enable_if_all_true_type_t<
- is_observable<Observable>>,
- class SourceValue = rxu::value_type_t<Observable>,
- class Retry = rxo::detail::retry<SourceValue, rxu::decay_t<Observable>, int>,
- class Value = rxu::value_type_t<Retry>,
- class Result = observable<Value, Retry>
- >
- static Result member(Observable&& o) {
- return Result(Retry(std::forward<Observable>(o), 0));
- }
-
- template<class Observable,
- class Count,
- class Enabled = rxu::enable_if_all_true_type_t<
- is_observable<Observable>>,
- class SourceValue = rxu::value_type_t<Observable>,
- class Retry = rxo::detail::retry<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
- class Value = rxu::value_type_t<Retry>,
- class Result = observable<Value, Retry>
- >
- static Result member(Observable&& o, Count&& c) {
- return Result(Retry(std::forward<Observable>(o), std::forward<Count>(c)));
- }
-
- template<class... AN>
- static operators::detail::retry_invalid_t<AN...> member(const AN&...) {
- std::terminate();
- return {};
- static_assert(sizeof...(AN) == 10000, "retry takes (optional Count)");
- }
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Retry = rxo::detail::retry::infinite<SourceValue, rxu::decay_t<Observable>>,
+ class Value = rxu::value_type_t<Retry>,
+ class Result = observable<Value, Retry>
+ >
+ static Result member(Observable&& o) {
+ return Result(Retry(std::forward<Observable>(o)));
+ }
+
+ template<class Observable,
+ class Count,
+ class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Retry = rxo::detail::retry::finite<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
+ class Value = rxu::value_type_t<Retry>,
+ class Result = observable<Value, Retry>
+ >
+ static Result member(Observable&& o, Count&& c) {
+ return Result(Retry(std::forward<Observable>(o), std::forward<Count>(c)));
+ }
+
+ template<class... AN>
+ static operators::detail::retry_invalid_t<AN...> member(const AN&...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "retry takes (optional Count)");
+ }
};
}
diff --git a/Rx/v2/test/operators/retry.cpp b/Rx/v2/test/operators/retry.cpp
index fa1b6f7..92c117f 100644
--- a/Rx/v2/test/operators/retry.cpp
+++ b/Rx/v2/test/operators/retry.cpp
@@ -1,7 +1,6 @@
#include "../test.h"
#include "rxcpp/operators/rx-retry.hpp"
-
SCENARIO("retry, basic test", "[retry][operators]") {
GIVEN("hot observable of 3x4x7 ints with errors inbetween the groups. Infinite retry.") {
auto sc = rxsc::make_test();
@@ -75,6 +74,49 @@ SCENARIO("retry, basic test", "[retry][operators]") {
}
}
+SCENARIO("retry 0, basic test", "[retry][operators]") {
+ GIVEN("hot observable of 3 ints. Infinite retry.") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ std::runtime_error ex("retry on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(100, 1),
+ on.next(150, 2),
+ on.next(200, 3),
+ });;
+
+ WHEN("retry is invoked with 0 times as argument") {
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ | rxo::retry(0)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output should be empty"){
+ auto required = rxu::to_vector({
+ on.completed(200)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("no subscriptions in retry(0)"){
+ auto required = std::vector<rxcpp::notifications::subscription>();
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+
+ }
+}
+
SCENARIO("retry with failure", "[retry][operators]") {
GIVEN("hot observable of 3x4x7 ints with errors inbetween the groups. Retry 1. Must fail.") {