diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp | 80 |
1 files changed, 57 insertions, 23 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp index fb38902..f026b59 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp @@ -2,6 +2,24 @@ #pragma once +/*! \file rx-subscribe_on.hpp + + \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler. + + \param cn the scheduler to perform subscription actions on. + + \return The source observable modified so that its subscriptions happen on the specified scheduler. + + \sample + \snippet subscribe_on.cpp subscribe_on sample + \snippet output.txt subscribe_on sample + + Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results: + \snippet output.txt observe_on sample +*/ + #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP) #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP @@ -13,6 +31,16 @@ namespace operators { namespace detail { +template<class... AN> +struct subscribe_on_invalid_arguments {}; + +template<class... AN> +struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> { + using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>; +}; +template<class... AN> +using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type; + template<class T, class Observable, class Coordination> struct subscribe_on : public operator_base<T> { @@ -114,35 +142,41 @@ private: subscribe_on& operator=(subscribe_on o) RXCPP_DELETE; }; -template<class Coordination> -class subscribe_on_factory -{ - typedef rxu::decay_t<Coordination> coordination_type; - - coordination_type coordination; -public: - subscribe_on_factory(coordination_type sf) - : coordination(std::move(sf)) - { - } - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>> { - return observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>>( - subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>(std::forward<Observable>(source), coordination)); - } -}; - } -template<class Coordination> -auto subscribe_on(Coordination sf) - -> detail::subscribe_on_factory<Coordination> { - return detail::subscribe_on_factory<Coordination>(std::move(sf)); +/*! @copydoc rx-subscribe_on.hpp +*/ +template<class... AN> +auto subscribe_on(AN&&... an) + -> operator_factory<subscribe_on_tag, AN...> { + return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } } +template<> +struct member_overload<subscribe_on_tag> +{ + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SubscribeOn>, + class Result = observable<Value, SubscribeOn>> + static Result member(Observable&& o, Coordination&& cn) { + return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)"); + } +}; + } #endif |