From ff18d4640e5f2f96eba2fe17fd5d91132f44dc0f Mon Sep 17 00:00:00 2001 From: Grigoriy Chudnov Date: Mon, 23 Jan 2017 01:08:25 +0300 Subject: decouple subscribe_on from observable --- Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp | 80 ++++++++++++++++------- Rx/v2/src/rxcpp/rx-includes.hpp | 1 + Rx/v2/src/rxcpp/rx-observable.hpp | 26 ++------ Rx/v2/src/rxcpp/rx-operators.hpp | 8 ++- Rx/v2/test/operators/subscribe_on.cpp | 91 +++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 44 deletions(-) diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp index fb38902..f026b59 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp @@ -2,6 +2,24 @@ #pragma once +/*! \file rx-subscribe_on.hpp + + \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler. + + \param cn the scheduler to perform subscription actions on. + + \return The source observable modified so that its subscriptions happen on the specified scheduler. + + \sample + \snippet subscribe_on.cpp subscribe_on sample + \snippet output.txt subscribe_on sample + + Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results: + \snippet output.txt observe_on sample +*/ + #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP) #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP @@ -13,6 +31,16 @@ namespace operators { namespace detail { +template +struct subscribe_on_invalid_arguments {}; + +template +struct subscribe_on_invalid : public rxo::operator_base> { + using type = observable, subscribe_on_invalid>; +}; +template +using subscribe_on_invalid_t = typename subscribe_on_invalid::type; + template struct subscribe_on : public operator_base { @@ -114,35 +142,41 @@ private: subscribe_on& operator=(subscribe_on o) RXCPP_DELETE; }; -template -class subscribe_on_factory -{ - typedef rxu::decay_t coordination_type; - - coordination_type coordination; -public: - subscribe_on_factory(coordination_type sf) - : coordination(std::move(sf)) - { - } - template - auto operator()(Observable&& source) - -> observable>, subscribe_on>, Observable, Coordination>> { - return observable>, subscribe_on>, Observable, Coordination>>( - subscribe_on>, Observable, Coordination>(std::forward(source), coordination)); - } -}; - } -template -auto subscribe_on(Coordination sf) - -> detail::subscribe_on_factory { - return detail::subscribe_on_factory(std::move(sf)); +/*! @copydoc rx-subscribe_on.hpp +*/ +template +auto subscribe_on(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } } +template<> +struct member_overload +{ + template, + is_coordination>, + class SourceValue = rxu::value_type_t, + class SubscribeOn = rxo::detail::subscribe_on, rxu::decay_t>, + class Value = rxu::value_type_t, + class Result = observable> + static Result member(Observable&& o, Coordination&& cn) { + return Result(SubscribeOn(std::forward(o), std::forward(cn))); + } + + template + static operators::detail::subscribe_on_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index d43dc3b..3a5a7ab 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -215,6 +215,7 @@ #include "operators/rx-skip_last.hpp" #include "operators/rx-skip_until.hpp" #include "operators/rx-start_with.hpp" +#include "operators/rx-subscribe_on.hpp" #include "operators/rx-switch_if_empty.hpp" #include "operators/rx-switch_on_next.hpp" #include "operators/rx-take.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 2863d43..b29dc52 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1498,29 +1498,15 @@ public: return multicast(rxsub::replay(count, period, std::move(cn), cs)); } - /*! Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to perform subscription actions on - - \return The source observable modified so that its subscriptions happen on the specified scheduler. - - \sample - \snippet subscribe_on.cpp subscribe_on sample - \snippet output.txt subscribe_on sample - - Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results: - \snippet output.txt observe_on sample - */ - template - auto subscribe_on(Coordination cn) const + /*! @copydoc rx-subscribe_on.hpp + */ + template + auto subscribe_on(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> observable>, rxo::detail::subscribe_on> + -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return observable>, rxo::detail::subscribe_on>( - rxo::detail::subscribe_on(*this, std::move(cn))); + return observable_member(subscribe_on_tag{}, *this, std::forward(an)...); } /*! All values are queued and delivered using the scheduler from the supplied coordination. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 30c5e5b..4fbf34f 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -103,7 +103,6 @@ public: #include "operators/rx-ref_count.hpp" #include "operators/rx-replay.hpp" #include "operators/rx-subscribe.hpp" -#include "operators/rx-subscribe_on.hpp" namespace rxcpp { @@ -356,6 +355,13 @@ struct start_with_tag { }; }; +struct subscribe_on_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct switch_if_empty_tag { template struct include_header{ diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp index 80e9b2c..ded1ea4 100644 --- a/Rx/v2/test/operators/subscribe_on.cpp +++ b/Rx/v2/test/operators/subscribe_on.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include #include +#include static const int static_subscriptions = 50000; @@ -84,3 +85,93 @@ SCENARIO("for loop subscribes to map with subscribe_on", "[hide][subscribe_on_on } } } + +SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("subscribe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + .subscribe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(201, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("subscribe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + | rxo::subscribe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(201, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} -- cgit v1.2.3