diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-27 03:01:57 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-26 16:01:57 -0800 |
commit | 4b6a2f541dece7256271374ca6555ce12a76a99b (patch) | |
tree | 3e5ea5ddc20ee999bd5d32e146be96b43d694ff0 /Rx | |
parent | ad101cf093028f874a5d882967bdb4ccb8032eb8 (diff) | |
download | RxCpp-4b6a2f541dece7256271374ca6555ce12a76a99b.tar.gz |
decouple publish, publish_synchronized, multicast from observable (#338)
* decouple publish, publish_synchronized, multicast from observable
* decouple publish, publish_synchronized, multicast from observable - restore include structure
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-multicast.hpp | 68 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-publish.hpp | 141 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 116 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 20 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 2 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/subscription.cpp | 1 |
7 files changed, 211 insertions, 139 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp index e970bfe..193a11a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp @@ -2,6 +2,15 @@ #pragma once +/*! \file rx-multicast.hpp + + \brief allows connections to the source to be independent of subscriptions. + + \tparam Subject the subject to multicast the source Observable. + + \param sub the subject. +*/ + #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP) #define RXCPP_OPERATORS_RX_MULTICAST_HPP @@ -13,6 +22,16 @@ namespace operators { namespace detail { +template<class... AN> +struct multicast_invalid_arguments {}; + +template<class... AN> +struct multicast_invalid : public rxo::operator_base<multicast_invalid_arguments<AN...>> { + using type = observable<multicast_invalid_arguments<AN...>, multicast_invalid<AN...>>; +}; +template<class... AN> +using multicast_invalid_t = typename multicast_invalid<AN...>::type; + template<class T, class Observable, class Subject> struct multicast : public operator_base<T> { @@ -65,33 +84,40 @@ struct multicast : public operator_base<T> } }; -template<class Subject> -class multicast_factory -{ - Subject caster; -public: - multicast_factory(Subject sub) - : caster(std::move(sub)) - { - } - template<class Observable> - auto operator()(Observable&& source) - -> connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>> { - return connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>>( - multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>(std::forward<Observable>(source), caster)); - } -}; - } -template<class Subject> -inline auto multicast(Subject sub) - -> detail::multicast_factory<Subject> { - return detail::multicast_factory<Subject>(std::move(sub)); +/*! @copydoc rx-multicast.hpp +*/ +template<class... AN> +auto multicast(AN&&... an) + -> operator_factory<multicast_tag, AN...> { + return operator_factory<multicast_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } } +template<> +struct member_overload<multicast_tag> +{ + template<class Observable, class Subject, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Subject>>, + class Value = rxu::value_type_t<Multicast>, + class Result = connectable_observable<Value, Multicast>> + static Result member(Observable&& o, Subject&& sub) { + return Result(Multicast(std::forward<Observable>(o), std::forward<Subject>(sub))); + } + + template<class... AN> + static operators::detail::multicast_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp index 3589cc9..dc38191 100644 --- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp @@ -2,10 +2,32 @@ #pragma once +/*! \file rx-publish.hpp + + \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. + + \tparam T the type of the emitted item (optional). + + \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional). + \param cs the subscription to control lifetime (optional). + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. + + \sample + \snippet publish.cpp publish subject sample + \snippet output.txt publish subject sample + + \sample + \snippet publish.cpp publish behavior sample + \snippet output.txt publish behavior sample +*/ + #if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP) #define RXCPP_OPERATORS_RX_PUBLISH_HPP #include "../rx-includes.hpp" +#include "./rx-multicast.hpp" namespace rxcpp { @@ -13,29 +35,118 @@ namespace operators { namespace detail { -template<template<class T> class Subject> -class publish_factory -{ -public: - publish_factory() {} - template<class Observable> - auto operator()(Observable&& source) - -> connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>> { - return connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>>( - multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>( - std::forward<Observable>(source), Subject<rxu::value_type_t<rxu::decay_t<Observable>>>())); - } +template<class... AN> +struct publish_invalid_arguments {}; + +template<class... AN> +struct publish_invalid : public rxo::operator_base<publish_invalid_arguments<AN...>> { + using type = observable<publish_invalid_arguments<AN...>, publish_invalid<AN...>>; }; +template<class... AN> +using publish_invalid_t = typename publish_invalid<AN...>::type; + +} +/*! @copydoc rx-publish.hpp +*/ +template<class... AN> +auto publish(AN&&... an) + -> operator_factory<publish_tag, AN...> { + return operator_factory<publish_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } -inline auto publish() - -> detail::publish_factory<rxsub::subject> { - return detail::publish_factory<rxsub::subject>(); +/*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + + \tparam Coordination the type of the scheduler. + + \param cn a scheduler all values are queued and delivered on. + \param cs the subscription to control lifetime (optional). + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler. + + \sample + \snippet publish.cpp publish_synchronized sample + \snippet output.txt publish_synchronized sample +*/ +template<class... AN> +auto publish_synchronized(AN&&... an) + -> operator_factory<publish_synchronized_tag, AN...> { + return operator_factory<publish_synchronized_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } } +template<> +struct member_overload<publish_tag> +{ + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::subject<SourceValue>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o) { + return Result(Multicast(std::forward<Observable>(o), Subject(composite_subscription()))); + } + + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::subject<SourceValue>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, composite_subscription cs) { + return Result(Multicast(std::forward<Observable>(o), Subject(cs))); + } + + template<class Observable, class T, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::behavior<SourceValue>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(first, cs))); + } + + template<class... AN> + static operators::detail::publish_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)"); + } +}; + +template<> +struct member_overload<publish_synchronized_tag> +{ + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::synchronize<SourceValue, rxu::decay_t<Coordination>>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs))); + } + + template<class... AN> + static operators::detail::publish_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index c3b7d9d..e8c9b8c 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -203,9 +203,11 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" +#include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" +#include "operators/rx-publish.hpp" #include "operators/rx-reduce.hpp" #include "operators/rx-repeat.hpp" #include "operators/rx-replay.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 0a5ccbe..7a950c5 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1161,123 +1161,37 @@ public: return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...); } - /// \cond SHOW_SERVICE_MEMBERS - /// multicast -> - /// allows connections to the source to be independent of subscriptions - /// - template<class Subject> - auto multicast(Subject sub) const - -> connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>> { - return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>( - rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub))); - } - /// \endcond - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler. - - \sample - \snippet publish.cpp publish_synchronized sample - \snippet output.txt publish_synchronized sample - */ - template<class Coordination> - auto publish_synchronized(Coordination cn, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs))) - /// \endcond - { - return multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs)); - } - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish subject sample - \snippet output.txt publish subject sample - */ - template<class... AN> - auto publish(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(composite_subscription()))) - /// \endcond - { - composite_subscription cs; - return multicast(rxsub::subject<T>(cs)); - static_assert(sizeof...(AN) == 0, "publish() was passed too many arguments."); - } - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish subject sample - \snippet output.txt publish subject sample - */ + /*! @copydoc rx-muticast.hpp + */ template<class... AN> - auto publish(composite_subscription cs, AN**...) const + auto multicast(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs))) + -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return multicast(rxsub::subject<T>(cs)); - static_assert(sizeof...(AN) == 0, "publish(composite_subscription) was passed too many arguments."); + return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...); } - /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam T the type of the emitted item - - \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish behavior sample - \snippet output.txt publish behavior sample - */ + /*! @copydoc rx-publish.hpp + */ template<class... AN> - auto publish(T first, AN**...) const + auto publish(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, composite_subscription()))) + -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - composite_subscription cs; - return multicast(rxsub::behavior<T>(first, cs)); - static_assert(sizeof...(AN) == 0, "publish(value_type) was passed too many arguments."); + return observable_member(publish_tag{}, *this, std::forward<AN>(an)...); } - /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam T the type of the emitted item - - \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish behavior sample - \snippet output.txt publish behavior sample - */ + /*! @copydoc rxcpp::operators::publish_synchronized + */ template<class... AN> - auto publish(T first, composite_subscription cs, AN**...) const + auto publish_synchronized(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs))) + -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return multicast(rxsub::behavior<T>(first, cs)); - static_assert(sizeof...(AN) == 0, "publish(value_type, composite_subscription) was passed too many arguments."); + return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-replay.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index aeffac9..dda27f7 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -97,8 +97,6 @@ public: #include "operators/rx-connect_forever.hpp" #include "operators/rx-lift.hpp" -#include "operators/rx-multicast.hpp" -#include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-subscribe.hpp" @@ -256,6 +254,13 @@ struct merge_tag { }; }; +struct multicast_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-multicast.hpp>"); + }; +}; + struct observe_on_tag { template<class Included> struct include_header{ @@ -297,6 +302,14 @@ struct pairwise_tag { }; }; +struct publish_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-publish.hpp>"); + }; +}; +struct publish_synchronized_tag : publish_tag {}; + struct repeat_tag { template<class Included> struct include_header{ @@ -489,4 +502,7 @@ struct zip_tag { } +#include "operators/rx-multicast.hpp" +#include "operators/rx-publish.hpp" + #endif diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 62d81ff..316f275 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -1,4 +1,6 @@ #include "../test.h" +#include <rxcpp/operators/rx-publish.hpp> + SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){ GIVEN("a range"){ diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index a4ce52d..fb351bd 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -3,6 +3,7 @@ #include "rxcpp/operators/rx-map.hpp" #include "rxcpp/operators/rx-take.hpp" #include "rxcpp/operators/rx-observe_on.hpp" +#include "rxcpp/operators/rx-publish.hpp" SCENARIO("observe subscription", "[hide]"){ GIVEN("observable of ints"){ |