From 6f4ca1198079ed2a439f53865d494729b975bbb7 Mon Sep 17 00:00:00 2001 From: Grigoriy Chudnov Date: Tue, 24 Jan 2017 19:39:47 +0300 Subject: decouple observe_on from observable (#335) * decouple observe_on from observable * decouple observe_on from observable - fix msvc --- Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 73 ++++++++++++++++++++++------- Rx/v2/src/rxcpp/rx-includes.hpp | 1 + Rx/v2/src/rxcpp/rx-observable.hpp | 23 ++------- Rx/v2/src/rxcpp/rx-operators.hpp | 8 +++- 4 files changed, 68 insertions(+), 37 deletions(-) (limited to 'Rx/v2/src/rxcpp') diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index f4b69ff..99de4c3 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -2,6 +2,24 @@ #pragma once +/*! \file rx-observe_on.hpp + + \brief All values are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler. + + \param cn the scheduler to notify observers on. + + \return The source observable modified so that its observers are notified on the specified scheduler. + + \sample + \snippet observe_on.cpp observe_on sample + \snippet output.txt observe_on sample + + Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: + \snippet output.txt subscribe_on sample +*/ + #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP) #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP @@ -13,6 +31,16 @@ namespace operators { namespace detail { +template +struct observe_on_invalid_arguments {}; + +template +struct observe_on_invalid : public rxo::operator_base> { + using type = observable, observe_on_invalid>; +}; +template +using observe_on_invalid_t = typename observe_on_invalid::type; + template struct observe_on { @@ -195,30 +223,39 @@ struct observe_on } }; -template -class observe_on_factory -{ - typedef rxu::decay_t coordination_type; - coordination_type coordination; -public: - observe_on_factory(coordination_type cn) : coordination(std::move(cn)) {} - template - auto operator()(Observable&& source) - -> decltype(source.template lift>>(observe_on>, coordination_type>(coordination))) { - return source.template lift>>(observe_on>, coordination_type>(coordination)); - } -}; +} +/*! @copydoc rx-observe_on.hpp +*/ +template +auto observe_on(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } -template -auto observe_on(Coordination cn) - -> detail::observe_on_factory { - return detail::observe_on_factory(std::move(cn)); } +template<> +struct member_overload +{ + template, + is_coordination>, + class SourceValue = rxu::value_type_t, + class ObserveOn = rxo::detail::observe_on>> + static auto member(Observable&& o, Coordination&& cn) + -> decltype(o.template lift(ObserveOn(std::forward(cn)))) { + return o.template lift(ObserveOn(std::forward(cn))); + } -} + template + static operators::detail::observe_on_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)"); + } +}; class observe_on_one_worker : public coordination_base { diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 3a5a7ab..7027648 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -203,6 +203,7 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" +#include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" #include "operators/rx-reduce.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index b29dc52..f121cf7 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1509,28 +1509,15 @@ public: return observable_member(subscribe_on_tag{}, *this, std::forward(an)...); } - /*! All values are queued and delivered using the scheduler from the supplied coordination. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to notify observers on - - \return The source observable modified so that its observers are notified on the specified scheduler. - - \sample - \snippet observe_on.cpp observe_on sample - \snippet output.txt observe_on sample - - Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: - \snippet output.txt subscribe_on sample + /*! @copydoc rx-observe_on.hpp */ - template - auto observe_on(Coordination cn) const + template + auto observe_on(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift(rxo::detail::observe_on(std::move(cn)))) + -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return lift(rxo::detail::observe_on(std::move(cn))); + return observable_member(observe_on_tag{}, *this, std::forward(an)...); } /*! @copydoc rx-reduce.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4fbf34f..086315b 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -98,7 +98,6 @@ public: #include "operators/rx-connect_forever.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-multicast.hpp" -#include "operators/rx-observe_on.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-replay.hpp" @@ -258,6 +257,13 @@ struct merge_tag { }; }; +struct observe_on_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct on_error_resume_next_tag { template struct include_header{ -- cgit v1.2.3