diff options
author | elelel <elel@3wh.net> | 2017-02-21 03:10:05 +0000 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-02-20 19:10:05 -0800 |
commit | 7811809ebb9d4226420ad51d11a922704b56b7a9 (patch) | |
tree | 869a20e9ef51449bd9ba07253baf64ab98c35185 | |
parent | 35a805ef522b061d0a89f524e99134d6360f05ca (diff) | |
download | RxCpp-7811809ebb9d4226420ad51d11a922704b56b7a9.tar.gz |
Rewrite repeat operator to handle 0 case correctly and not rely on magic numbers (#356)
* Sketch interface for finite/inifinite variants
* CRTP deriving finite/infinite from base
* Fully rewrite repeat implementation
* Fix description and comments in repeat
* Test repeat(0) case
* Make 0 handling with completion when the input sequence complete
* Return immidiately empty sequence instead on first on_completed
* Return immidiately empty sequence instead on first on_completed
* Update param description for repeat 0 case
* repeat(0): never call on.next(), but call on.completed()
* Test: no subscriptions are made when repeat(0)
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-repeat.hpp | 263 | ||||
-rw-r--r-- | Rx/v2/test/operators/repeat.cpp | 41 |
2 files changed, 201 insertions, 103 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index dcbc161..0caad71 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -8,7 +8,7 @@ \tparam Count the type of the counter (optional). - \param t the number of times the source observable items are repeated (optional). If not specified or 0, infinitely repeats the source observable. + \param t The number of times the source observable items are repeated (optional). If not specified, infinitely repeats the source observable. Specifying 0 returns an empty sequence immediately \return An observable that repeats the sequence of items emitted by the source observable for t times. @@ -42,87 +42,147 @@ struct repeat_invalid : public rxo::operator_base<repeat_invalid_arguments<AN... template<class... AN> using repeat_invalid_t = typename repeat_invalid<AN...>::type; -template<class T, class Observable, class Count> -struct repeat : public operator_base<T> -{ +// Contain repeat variations in a namespace +namespace repeat { + // Structure to perform general repeat operations on state + template <class ValuesType, class Subscriber, class T> + struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>, + public ValuesType { + + typedef Subscriber output_type; + state_type(const ValuesType& i, const output_type& oarg) + : ValuesType(i), + source_lifetime(composite_subscription::empty()), + out(oarg) { + } + + void do_subscribe() { + auto state = this->shared_from_this(); + + state->out.remove(state->lifetime_token); + state->source_lifetime.unsubscribe(); + + state->source_lifetime = composite_subscription(); + state->lifetime_token = state->out.add(state->source_lifetime); + + state->source.subscribe( + state->out, + state->source_lifetime, + // on_next + [state](T t) { + state->out.on_next(t); + }, + // on_error + [state](std::exception_ptr e) { + state->out.on_error(e); + }, + // on_completed + [state]() { + state->on_completed(); + // Use specialized predicate for finite/infinte case + if (state->completed_predicate()) { + state->out.on_completed(); + } else { + state->do_subscribe(); + } + } + ); + } + + composite_subscription source_lifetime; + output_type out; + composite_subscription::weak_subscription lifetime_token; + }; + + // Finite repeat case (explicitely limited with the number of times) + template <class T, class Observable, class Count> + struct finite : public operator_base<T> { typedef rxu::decay_t<Observable> source_type; typedef rxu::decay_t<Count> count_type; - struct values - { - values(source_type s, count_type t) - : source(std::move(s)) - , remaining(std::move(t)) - , repeat_infinitely(t == 0) - { - } - source_type source; - count_type remaining; - bool repeat_infinitely; - }; - values initial; - repeat(source_type s, count_type t) - : initial(std::move(s), std::move(t)) - { + struct values { + values(source_type s, count_type t) + : source(std::move(s)), + remaining_(std::move(t)) { + } + + inline bool completed_predicate() const { + // Return true if we are completed + return remaining_ <= 0; + } + + inline void on_completed() { + // Decrement counter + --remaining_; + } + + source_type source; + + private: + // Counter to hold number of times remaining to complete + count_type remaining_; + }; + + finite(source_type s, count_type t) + : initial_(std::move(s), std::move(t)) { } template<class Subscriber> void on_subscribe(const Subscriber& s) const { - - typedef Subscriber output_type; - struct state_type - : public std::enable_shared_from_this<state_type> - , public values - { - state_type(const values& i, const output_type& oarg) - : values(i) - , source_lifetime(composite_subscription::empty()) - , out(oarg) - { - } - composite_subscription source_lifetime; - output_type out; - composite_subscription::weak_subscription lifetime_token; - - void do_subscribe() { - auto state = this->shared_from_this(); - - state->out.remove(state->lifetime_token); - state->source_lifetime.unsubscribe(); - - state->source_lifetime = composite_subscription(); - state->lifetime_token = state->out.add(state->source_lifetime); - - state->source.subscribe( - state->out, - state->source_lifetime, - // on_next - [state](T t) { - state->out.on_next(t); - }, - // on_error - [state](std::exception_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - if (state->repeat_infinitely || (--state->remaining > 0)) { - state->do_subscribe(); - } else { - state->out.on_completed(); - } - } - ); - } - }; - - // take a copy of the values for each subscription - auto state = std::make_shared<state_type>(initial, s); - + typedef state_type<values, Subscriber, T> state_t; + // take a copy of the values for each subscription + auto state = std::make_shared<state_t>(initial_, s); + if (initial_.completed_predicate()) { + // return completed + state->out.on_completed(); + } else { // start the first iteration state->do_subscribe(); + } } -}; + + private: + values initial_; + }; + + // Infinite repeat case + template <class T, class Observable> + struct infinite : public operator_base<T> { + typedef rxu::decay_t<Observable> source_type; + + struct values { + values(source_type s) + : source(std::move(s)) { + } + + static inline bool completed_predicate() { + // Infinite repeat never completes + return false; + } + + static inline void on_completed() { + // Infinite repeat does not need to update state + } + + source_type source; + }; + + infinite(source_type s) : initial_(std::move(s)) { + } + + template<class Subscriber> + void on_subscribe(const Subscriber& s) const { + typedef state_type<values, Subscriber, T> state_t; + // take a copy of the values for each subscription + auto state = std::make_shared<state_t>(initial_, s); + // start the first iteration + state->do_subscribe(); + } + + private: + values initial_; + }; +} } @@ -137,37 +197,34 @@ auto repeat(AN&&... an) } template<> -struct member_overload<repeat_tag> -{ - template<class Observable, - class Enabled = rxu::enable_if_all_true_type_t< - is_observable<Observable>>, - class SourceValue = rxu::value_type_t<Observable>, - class Repeat = rxo::detail::repeat<SourceValue, rxu::decay_t<Observable>, int>, - class Value = rxu::value_type_t<Repeat>, - class Result = observable<Value, Repeat>> - static Result member(Observable&& o) { - return Result(Repeat(std::forward<Observable>(o), 0)); - } - - template<class Observable, - class Count, - class Enabled = rxu::enable_if_all_true_type_t< - is_observable<Observable>>, - class SourceValue = rxu::value_type_t<Observable>, - class Repeat = rxo::detail::repeat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>, - class Value = rxu::value_type_t<Repeat>, - class Result = observable<Value, Repeat>> - static Result member(Observable&& o, Count&& c) { - return Result(Repeat(std::forward<Observable>(o), std::forward<Count>(c))); - } - - template<class... AN> - static operators::detail::repeat_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "repeat takes (optional Count)"); - } +struct member_overload<repeat_tag> { + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Repeat = rxo::detail::repeat::infinite<SourceValue, rxu::decay_t<Observable>>, + class Value = rxu::value_type_t<Repeat>, + class Result = observable<Value, Repeat>> + static Result member(Observable&& o) { + return Result(Repeat(std::forward<Observable>(o))); + } + + template<class Observable, + class Count, + class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Repeat = rxo::detail::repeat::finite<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>, + class Value = rxu::value_type_t<Repeat>, + class Result = observable<Value, Repeat>> + static Result member(Observable&& o, Count&& c) { + return Result(Repeat(std::forward<Observable>(o), std::forward<Count>(c))); + } + + template<class... AN> + static operators::detail::repeat_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "repeat takes (optional Count)"); + } }; } diff --git a/Rx/v2/test/operators/repeat.cpp b/Rx/v2/test/operators/repeat.cpp index 80c3c87..0aa9598 100644 --- a/Rx/v2/test/operators/repeat.cpp +++ b/Rx/v2/test/operators/repeat.cpp @@ -56,6 +56,47 @@ SCENARIO("repeat, basic test", "[repeat][operators]"){ } } +SCENARIO("repeat, 0 times case", "[repeat][operators]"){ + GIVEN("cold observable of 3 ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_cold_observable({ + on.next(100, 1), + on.next(150, 2), + on.next(200, 3), + on.completed(250) + }); + + WHEN("repeat zero times is launched"){ + + auto res = w.start( + [&]() { + return xs + | rxo::repeat(0) + // forget type to workaround lambda deduction bug on msvc 2013 + | rxo::as_dynamic(); + } + ); + + THEN("the output should be empty"){ + auto required = rxu::to_vector({ + on.completed(200) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("no subscriptions in repeat(0) variant that skips on.next()"){ + auto required = std::vector<rxcpp::notifications::subscription>(); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + SCENARIO("repeat, infinite observable test", "[repeat][operators]"){ GIVEN("cold observable of 3 ints that never completes."){ auto sc = rxsc::make_test(); |