diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-24 19:39:47 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-24 08:39:47 -0800 |
commit | 6f4ca1198079ed2a439f53865d494729b975bbb7 (patch) | |
tree | 37da80a3e6fd7e8ffdeb96d33faf5ac986c84b06 /Rx | |
parent | ff18d4640e5f2f96eba2fe17fd5d91132f44dc0f (diff) | |
download | RxCpp-6f4ca1198079ed2a439f53865d494729b975bbb7.tar.gz |
decouple observe_on from observable (#335)
* decouple observe_on from observable
* decouple observe_on from observable - fix msvc
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 73 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 23 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/group_by.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/merge.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/observe_on.cpp | 91 | ||||
-rw-r--r-- | Rx/v2/test/operators/subscribe_on.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/subscription.cpp | 1 |
12 files changed, 166 insertions, 37 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index f4b69ff..99de4c3 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -2,6 +2,24 @@ #pragma once +/*! \file rx-observe_on.hpp + + \brief All values are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler. + + \param cn the scheduler to notify observers on. + + \return The source observable modified so that its observers are notified on the specified scheduler. + + \sample + \snippet observe_on.cpp observe_on sample + \snippet output.txt observe_on sample + + Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: + \snippet output.txt subscribe_on sample +*/ + #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP) #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP @@ -13,6 +31,16 @@ namespace operators { namespace detail { +template<class... AN> +struct observe_on_invalid_arguments {}; + +template<class... AN> +struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> { + using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>; +}; +template<class... AN> +using observe_on_invalid_t = typename observe_on_invalid<AN...>::type; + template<class T, class Coordination> struct observe_on { @@ -195,30 +223,39 @@ struct observe_on } }; -template<class Coordination> -class observe_on_factory -{ - typedef rxu::decay_t<Coordination> coordination_type; - coordination_type coordination; -public: - observe_on_factory(coordination_type cn) : coordination(std::move(cn)) {} - template<class Observable> - auto operator()(Observable&& source) - -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(observe_on<rxu::value_type_t<rxu::decay_t<Observable>>, coordination_type>(coordination))) { - return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(observe_on<rxu::value_type_t<rxu::decay_t<Observable>>, coordination_type>(coordination)); - } -}; +} +/*! @copydoc rx-observe_on.hpp +*/ +template<class... AN> +auto observe_on(AN&&... an) + -> operator_factory<observe_on_tag, AN...> { + return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } -template<class Coordination> -auto observe_on(Coordination cn) - -> detail::observe_on_factory<Coordination> { - return detail::observe_on_factory<Coordination>(std::move(cn)); } +template<> +struct member_overload<observe_on_tag> +{ + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>> + static auto member(Observable&& o, Coordination&& cn) + -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) { + return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn))); + } -} + template<class... AN> + static operators::detail::observe_on_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)"); + } +}; class observe_on_one_worker : public coordination_base { diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 3a5a7ab..7027648 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -203,6 +203,7 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" +#include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" #include "operators/rx-reduce.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index b29dc52..f121cf7 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1509,28 +1509,15 @@ public: return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...); } - /*! All values are queued and delivered using the scheduler from the supplied coordination. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to notify observers on - - \return The source observable modified so that its observers are notified on the specified scheduler. - - \sample - \snippet observe_on.cpp observe_on sample - \snippet output.txt observe_on sample - - Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: - \snippet output.txt subscribe_on sample + /*! @copydoc rx-observe_on.hpp */ - template<class Coordination> - auto observe_on(Coordination cn) const + template<class... AN> + auto observe_on(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) + -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))); + return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-reduce.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4fbf34f..086315b 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -98,7 +98,6 @@ public: #include "operators/rx-connect_forever.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-multicast.hpp" -#include "operators/rx-observe_on.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-replay.hpp" @@ -258,6 +257,13 @@ struct merge_tag { }; }; +struct observe_on_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-observe_on.hpp>"); + }; +}; + struct on_error_resume_next_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/concat.cpp b/Rx/v2/test/operators/concat.cpp index c8ff234..86f4a7e 100644 --- a/Rx/v2/test/operators/concat.cpp +++ b/Rx/v2/test/operators/concat.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-concat.hpp> #include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 1000000; diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp index 761fdff..0a3ee6b 100644 --- a/Rx/v2/test/operators/concat_map.cpp +++ b/Rx/v2/test/operators/concat_map.cpp @@ -4,6 +4,7 @@ #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-concat_map.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 03cb36e..e2e3cf8 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -4,6 +4,7 @@ #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-flat_map.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp index 2704aa7..606d7df 100644 --- a/Rx/v2/test/operators/group_by.cpp +++ b/Rx/v2/test/operators/group_by.cpp @@ -6,6 +6,7 @@ #include <rxcpp/operators/rx-merge.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-start_with.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> #include <locale> diff --git a/Rx/v2/test/operators/merge.cpp b/Rx/v2/test/operators/merge.cpp index 60e51a2..9a7f28c 100644 --- a/Rx/v2/test/operators/merge.cpp +++ b/Rx/v2/test/operators/merge.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-merge.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 1000000; diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp index 5d56448..644ab93 100644 --- a/Rx/v2/test/operators/observe_on.cpp +++ b/Rx/v2/test/operators/observe_on.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include <rxcpp/operators/rx-take.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 100000; @@ -46,3 +47,93 @@ SCENARIO("range observed on new_thread", "[hide][range][observe_on_debug][observ } } } + +SCENARIO("observe_on", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("subscribe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + .observe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("stream observe_on", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("observe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + | rxo::observe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +}
\ No newline at end of file diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp index ded1ea4..baa66e2 100644 --- a/Rx/v2/test/operators/subscribe_on.cpp +++ b/Rx/v2/test/operators/subscribe_on.cpp @@ -2,6 +2,7 @@ #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-subscribe_on.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_subscriptions = 50000; diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index 37c9a43..a4ce52d 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -2,6 +2,7 @@ #include "rxcpp/operators/rx-combine_latest.hpp" #include "rxcpp/operators/rx-map.hpp" #include "rxcpp/operators/rx-take.hpp" +#include "rxcpp/operators/rx-observe_on.hpp" SCENARIO("observe subscription", "[hide]"){ GIVEN("observable of ints"){ |