summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelelel <elel@3wh.net>2017-02-21 03:10:05 +0000
committerKirk Shoop <kirk.shoop@microsoft.com>2017-02-20 19:10:05 -0800
commit7811809ebb9d4226420ad51d11a922704b56b7a9 (patch)
tree869a20e9ef51449bd9ba07253baf64ab98c35185
parent35a805ef522b061d0a89f524e99134d6360f05ca (diff)
downloadRxCpp-7811809ebb9d4226420ad51d11a922704b56b7a9.tar.gz
Rewrite repeat operator to handle 0 case correctly and not rely on magic numbers (#356)
* Sketch interface for finite/inifinite variants * CRTP deriving finite/infinite from base * Fully rewrite repeat implementation * Fix description and comments in repeat * Test repeat(0) case * Make 0 handling with completion when the input sequence complete * Return immidiately empty sequence instead on first on_completed * Return immidiately empty sequence instead on first on_completed * Update param description for repeat 0 case * repeat(0): never call on.next(), but call on.completed() * Test: no subscriptions are made when repeat(0)
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-repeat.hpp263
-rw-r--r--Rx/v2/test/operators/repeat.cpp41
2 files changed, 201 insertions, 103 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
index dcbc161..0caad71 100644
--- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
@@ -8,7 +8,7 @@
\tparam Count the type of the counter (optional).
- \param t the number of times the source observable items are repeated (optional). If not specified or 0, infinitely repeats the source observable.
+ \param t The number of times the source observable items are repeated (optional). If not specified, infinitely repeats the source observable. Specifying 0 returns an empty sequence immediately
\return An observable that repeats the sequence of items emitted by the source observable for t times.
@@ -42,87 +42,147 @@ struct repeat_invalid : public rxo::operator_base<repeat_invalid_arguments<AN...
template<class... AN>
using repeat_invalid_t = typename repeat_invalid<AN...>::type;
-template<class T, class Observable, class Count>
-struct repeat : public operator_base<T>
-{
+// Contain repeat variations in a namespace
+namespace repeat {
+ // Structure to perform general repeat operations on state
+ template <class ValuesType, class Subscriber, class T>
+ struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>,
+ public ValuesType {
+
+ typedef Subscriber output_type;
+ state_type(const ValuesType& i, const output_type& oarg)
+ : ValuesType(i),
+ source_lifetime(composite_subscription::empty()),
+ out(oarg) {
+ }
+
+ void do_subscribe() {
+ auto state = this->shared_from_this();
+
+ state->out.remove(state->lifetime_token);
+ state->source_lifetime.unsubscribe();
+
+ state->source_lifetime = composite_subscription();
+ state->lifetime_token = state->out.add(state->source_lifetime);
+
+ state->source.subscribe(
+ state->out,
+ state->source_lifetime,
+ // on_next
+ [state](T t) {
+ state->out.on_next(t);
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->out.on_error(e);
+ },
+ // on_completed
+ [state]() {
+ state->on_completed();
+ // Use specialized predicate for finite/infinte case
+ if (state->completed_predicate()) {
+ state->out.on_completed();
+ } else {
+ state->do_subscribe();
+ }
+ }
+ );
+ }
+
+ composite_subscription source_lifetime;
+ output_type out;
+ composite_subscription::weak_subscription lifetime_token;
+ };
+
+ // Finite repeat case (explicitely limited with the number of times)
+ template <class T, class Observable, class Count>
+ struct finite : public operator_base<T> {
typedef rxu::decay_t<Observable> source_type;
typedef rxu::decay_t<Count> count_type;
- struct values
- {
- values(source_type s, count_type t)
- : source(std::move(s))
- , remaining(std::move(t))
- , repeat_infinitely(t == 0)
- {
- }
- source_type source;
- count_type remaining;
- bool repeat_infinitely;
- };
- values initial;
- repeat(source_type s, count_type t)
- : initial(std::move(s), std::move(t))
- {
+ struct values {
+ values(source_type s, count_type t)
+ : source(std::move(s)),
+ remaining_(std::move(t)) {
+ }
+
+ inline bool completed_predicate() const {
+ // Return true if we are completed
+ return remaining_ <= 0;
+ }
+
+ inline void on_completed() {
+ // Decrement counter
+ --remaining_;
+ }
+
+ source_type source;
+
+ private:
+ // Counter to hold number of times remaining to complete
+ count_type remaining_;
+ };
+
+ finite(source_type s, count_type t)
+ : initial_(std::move(s), std::move(t)) {
}
template<class Subscriber>
void on_subscribe(const Subscriber& s) const {
-
- typedef Subscriber output_type;
- struct state_type
- : public std::enable_shared_from_this<state_type>
- , public values
- {
- state_type(const values& i, const output_type& oarg)
- : values(i)
- , source_lifetime(composite_subscription::empty())
- , out(oarg)
- {
- }
- composite_subscription source_lifetime;
- output_type out;
- composite_subscription::weak_subscription lifetime_token;
-
- void do_subscribe() {
- auto state = this->shared_from_this();
-
- state->out.remove(state->lifetime_token);
- state->source_lifetime.unsubscribe();
-
- state->source_lifetime = composite_subscription();
- state->lifetime_token = state->out.add(state->source_lifetime);
-
- state->source.subscribe(
- state->out,
- state->source_lifetime,
- // on_next
- [state](T t) {
- state->out.on_next(t);
- },
- // on_error
- [state](std::exception_ptr e) {
- state->out.on_error(e);
- },
- // on_completed
- [state]() {
- if (state->repeat_infinitely || (--state->remaining > 0)) {
- state->do_subscribe();
- } else {
- state->out.on_completed();
- }
- }
- );
- }
- };
-
- // take a copy of the values for each subscription
- auto state = std::make_shared<state_type>(initial, s);
-
+ typedef state_type<values, Subscriber, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ if (initial_.completed_predicate()) {
+ // return completed
+ state->out.on_completed();
+ } else {
// start the first iteration
state->do_subscribe();
+ }
}
-};
+
+ private:
+ values initial_;
+ };
+
+ // Infinite repeat case
+ template <class T, class Observable>
+ struct infinite : public operator_base<T> {
+ typedef rxu::decay_t<Observable> source_type;
+
+ struct values {
+ values(source_type s)
+ : source(std::move(s)) {
+ }
+
+ static inline bool completed_predicate() {
+ // Infinite repeat never completes
+ return false;
+ }
+
+ static inline void on_completed() {
+ // Infinite repeat does not need to update state
+ }
+
+ source_type source;
+ };
+
+ infinite(source_type s) : initial_(std::move(s)) {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+ typedef state_type<values, Subscriber, T> state_t;
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_t>(initial_, s);
+ // start the first iteration
+ state->do_subscribe();
+ }
+
+ private:
+ values initial_;
+ };
+}
}
@@ -137,37 +197,34 @@ auto repeat(AN&&... an)
}
template<>
-struct member_overload<repeat_tag>
-{
- template<class Observable,
- class Enabled = rxu::enable_if_all_true_type_t<
- is_observable<Observable>>,
- class SourceValue = rxu::value_type_t<Observable>,
- class Repeat = rxo::detail::repeat<SourceValue, rxu::decay_t<Observable>, int>,
- class Value = rxu::value_type_t<Repeat>,
- class Result = observable<Value, Repeat>>
- static Result member(Observable&& o) {
- return Result(Repeat(std::forward<Observable>(o), 0));
- }
-
- template<class Observable,
- class Count,
- class Enabled = rxu::enable_if_all_true_type_t<
- is_observable<Observable>>,
- class SourceValue = rxu::value_type_t<Observable>,
- class Repeat = rxo::detail::repeat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
- class Value = rxu::value_type_t<Repeat>,
- class Result = observable<Value, Repeat>>
- static Result member(Observable&& o, Count&& c) {
- return Result(Repeat(std::forward<Observable>(o), std::forward<Count>(c)));
- }
-
- template<class... AN>
- static operators::detail::repeat_invalid_t<AN...> member(AN...) {
- std::terminate();
- return {};
- static_assert(sizeof...(AN) == 10000, "repeat takes (optional Count)");
- }
+struct member_overload<repeat_tag> {
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Repeat = rxo::detail::repeat::infinite<SourceValue, rxu::decay_t<Observable>>,
+ class Value = rxu::value_type_t<Repeat>,
+ class Result = observable<Value, Repeat>>
+ static Result member(Observable&& o) {
+ return Result(Repeat(std::forward<Observable>(o)));
+ }
+
+ template<class Observable,
+ class Count,
+ class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Repeat = rxo::detail::repeat::finite<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
+ class Value = rxu::value_type_t<Repeat>,
+ class Result = observable<Value, Repeat>>
+ static Result member(Observable&& o, Count&& c) {
+ return Result(Repeat(std::forward<Observable>(o), std::forward<Count>(c)));
+ }
+
+ template<class... AN>
+ static operators::detail::repeat_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "repeat takes (optional Count)");
+ }
};
}
diff --git a/Rx/v2/test/operators/repeat.cpp b/Rx/v2/test/operators/repeat.cpp
index 80c3c87..0aa9598 100644
--- a/Rx/v2/test/operators/repeat.cpp
+++ b/Rx/v2/test/operators/repeat.cpp
@@ -56,6 +56,47 @@ SCENARIO("repeat, basic test", "[repeat][operators]"){
}
}
+SCENARIO("repeat, 0 times case", "[repeat][operators]"){
+ GIVEN("cold observable of 3 ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_cold_observable({
+ on.next(100, 1),
+ on.next(150, 2),
+ on.next(200, 3),
+ on.completed(250)
+ });
+
+ WHEN("repeat zero times is launched"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ | rxo::repeat(0)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output should be empty"){
+ auto required = rxu::to_vector({
+ on.completed(200)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("no subscriptions in repeat(0) variant that skips on.next()"){
+ auto required = std::vector<rxcpp::notifications::subscription>();
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
SCENARIO("repeat, infinite observable test", "[repeat][operators]"){
GIVEN("cold observable of 3 ints that never completes."){
auto sc = rxsc::make_test();