From 4b6a2f541dece7256271374ca6555ce12a76a99b Mon Sep 17 00:00:00 2001 From: Grigoriy Chudnov Date: Fri, 27 Jan 2017 03:01:57 +0300 Subject: decouple publish, publish_synchronized, multicast from observable (#338) * decouple publish, publish_synchronized, multicast from observable * decouple publish, publish_synchronized, multicast from observable - restore include structure --- Rx/v2/src/rxcpp/operators/rx-multicast.hpp | 68 +++++++++----- Rx/v2/src/rxcpp/operators/rx-publish.hpp | 141 ++++++++++++++++++++++++++--- Rx/v2/src/rxcpp/rx-includes.hpp | 2 + Rx/v2/src/rxcpp/rx-observable.hpp | 116 +++--------------------- Rx/v2/src/rxcpp/rx-operators.hpp | 20 +++- Rx/v2/test/operators/publish.cpp | 2 + Rx/v2/test/subscriptions/subscription.cpp | 1 + 7 files changed, 211 insertions(+), 139 deletions(-) (limited to 'Rx/v2') diff --git a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp index e970bfe..193a11a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp @@ -2,6 +2,15 @@ #pragma once +/*! \file rx-multicast.hpp + + \brief allows connections to the source to be independent of subscriptions. + + \tparam Subject the subject to multicast the source Observable. + + \param sub the subject. +*/ + #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP) #define RXCPP_OPERATORS_RX_MULTICAST_HPP @@ -13,6 +22,16 @@ namespace operators { namespace detail { +template +struct multicast_invalid_arguments {}; + +template +struct multicast_invalid : public rxo::operator_base> { + using type = observable, multicast_invalid>; +}; +template +using multicast_invalid_t = typename multicast_invalid::type; + template struct multicast : public operator_base { @@ -65,33 +84,40 @@ struct multicast : public operator_base } }; -template -class multicast_factory -{ - Subject caster; -public: - multicast_factory(Subject sub) - : caster(std::move(sub)) - { - } - template - auto operator()(Observable&& source) - -> connectable_observable>, multicast>, Observable, Subject>> { - return connectable_observable>, multicast>, Observable, Subject>>( - multicast>, Observable, Subject>(std::forward(source), caster)); - } -}; - } -template -inline auto multicast(Subject sub) - -> detail::multicast_factory { - return detail::multicast_factory(std::move(sub)); +/*! @copydoc rx-multicast.hpp +*/ +template +auto multicast(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } } +template<> +struct member_overload +{ + template>, + class SourceValue = rxu::value_type_t, + class Multicast = rxo::detail::multicast, rxu::decay_t>, + class Value = rxu::value_type_t, + class Result = connectable_observable> + static Result member(Observable&& o, Subject&& sub) { + return Result(Multicast(std::forward(o), std::forward(sub))); + } + + template + static operators::detail::multicast_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp index 3589cc9..dc38191 100644 --- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp @@ -2,10 +2,32 @@ #pragma once +/*! \file rx-publish.hpp + + \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. + + \tparam T the type of the emitted item (optional). + + \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional). + \param cs the subscription to control lifetime (optional). + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. + + \sample + \snippet publish.cpp publish subject sample + \snippet output.txt publish subject sample + + \sample + \snippet publish.cpp publish behavior sample + \snippet output.txt publish behavior sample +*/ + #if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP) #define RXCPP_OPERATORS_RX_PUBLISH_HPP #include "../rx-includes.hpp" +#include "./rx-multicast.hpp" namespace rxcpp { @@ -13,29 +35,118 @@ namespace operators { namespace detail { -template class Subject> -class publish_factory -{ -public: - publish_factory() {} - template - auto operator()(Observable&& source) - -> connectable_observable>, multicast>, Observable, Subject>>>> { - return connectable_observable>, multicast>, Observable, Subject>>>>( - multicast>, Observable, Subject>>>( - std::forward(source), Subject>>())); - } +template +struct publish_invalid_arguments {}; + +template +struct publish_invalid : public rxo::operator_base> { + using type = observable, publish_invalid>; }; +template +using publish_invalid_t = typename publish_invalid::type; + +} +/*! @copydoc rx-publish.hpp +*/ +template +auto publish(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } -inline auto publish() - -> detail::publish_factory { - return detail::publish_factory(); +/*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + + \tparam Coordination the type of the scheduler. + + \param cn a scheduler all values are queued and delivered on. + \param cs the subscription to control lifetime (optional). + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler. + + \sample + \snippet publish.cpp publish_synchronized sample + \snippet output.txt publish_synchronized sample +*/ +template +auto publish_synchronized(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } } +template<> +struct member_overload +{ + template>, + class SourceValue = rxu::value_type_t, + class Subject = rxsub::subject, + class Multicast = rxo::detail::multicast, Subject>, + class Result = connectable_observable + > + static Result member(Observable&& o) { + return Result(Multicast(std::forward(o), Subject(composite_subscription()))); + } + + template>, + class SourceValue = rxu::value_type_t, + class Subject = rxsub::subject, + class Multicast = rxo::detail::multicast, Subject>, + class Result = connectable_observable + > + static Result member(Observable&& o, composite_subscription cs) { + return Result(Multicast(std::forward(o), Subject(cs))); + } + + template>, + class SourceValue = rxu::value_type_t, + class Subject = rxsub::behavior, + class Multicast = rxo::detail::multicast, Subject>, + class Result = connectable_observable + > + static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward(o), Subject(first, cs))); + } + + template + static operators::detail::publish_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)"); + } +}; + +template<> +struct member_overload +{ + template, + is_coordination>, + class SourceValue = rxu::value_type_t, + class Subject = rxsub::synchronize>, + class Multicast = rxo::detail::multicast, Subject>, + class Result = connectable_observable + > + static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward(o), Subject(std::forward(cn), cs))); + } + + template + static operators::detail::publish_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index c3b7d9d..e8c9b8c 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -203,9 +203,11 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" +#include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" +#include "operators/rx-publish.hpp" #include "operators/rx-reduce.hpp" #include "operators/rx-repeat.hpp" #include "operators/rx-replay.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 0a5ccbe..7a950c5 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1161,123 +1161,37 @@ public: return observable_member(ignore_elements_tag{}, *this, std::forward(an)...); } - /// \cond SHOW_SERVICE_MEMBERS - /// multicast -> - /// allows connections to the source to be independent of subscriptions - /// - template - auto multicast(Subject sub) const - -> connectable_observable> { - return connectable_observable>( - rxo::detail::multicast(*this, std::move(sub))); - } - /// \endcond - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler. - - \sample - \snippet publish.cpp publish_synchronized sample - \snippet output.txt publish_synchronized sample - */ - template - auto publish_synchronized(Coordination cn, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize(std::move(cn), cs))) - /// \endcond - { - return multicast(rxsub::synchronize(std::move(cn), cs)); - } - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish subject sample - \snippet output.txt publish subject sample - */ - template - auto publish(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::subject(composite_subscription()))) - /// \endcond - { - composite_subscription cs; - return multicast(rxsub::subject(cs)); - static_assert(sizeof...(AN) == 0, "publish() was passed too many arguments."); - } - - /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. - - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish subject sample - \snippet output.txt publish subject sample - */ + /*! @copydoc rx-muticast.hpp + */ template - auto publish(composite_subscription cs, AN**...) const + auto multicast(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::subject(cs))) + -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return multicast(rxsub::subject(cs)); - static_assert(sizeof...(AN) == 0, "publish(composite_subscription) was passed too many arguments."); + return observable_member(multicast_tag{}, *this, std::forward(an)...); } - /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam T the type of the emitted item - - \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish behavior sample - \snippet output.txt publish behavior sample - */ + /*! @copydoc rx-publish.hpp + */ template - auto publish(T first, AN**...) const + auto publish(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::behavior(first, composite_subscription()))) + -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - composite_subscription cs; - return multicast(rxsub::behavior(first, cs)); - static_assert(sizeof...(AN) == 0, "publish(value_type) was passed too many arguments."); + return observable_member(publish_tag{}, *this, std::forward(an)...); } - /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam T the type of the emitted item - - \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. - - \sample - \snippet publish.cpp publish behavior sample - \snippet output.txt publish behavior sample - */ + /*! @copydoc rxcpp::operators::publish_synchronized + */ template - auto publish(T first, composite_subscription cs, AN**...) const + auto publish_synchronized(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::behavior(first, cs))) + -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return multicast(rxsub::behavior(first, cs)); - static_assert(sizeof...(AN) == 0, "publish(value_type, composite_subscription) was passed too many arguments."); + return observable_member(publish_synchronized_tag{}, *this, std::forward(an)...); } /*! @copydoc rx-replay.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index aeffac9..dda27f7 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -97,8 +97,6 @@ public: #include "operators/rx-connect_forever.hpp" #include "operators/rx-lift.hpp" -#include "operators/rx-multicast.hpp" -#include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-subscribe.hpp" @@ -256,6 +254,13 @@ struct merge_tag { }; }; +struct multicast_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct observe_on_tag { template struct include_header{ @@ -297,6 +302,14 @@ struct pairwise_tag { }; }; +struct publish_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; +struct publish_synchronized_tag : publish_tag {}; + struct repeat_tag { template struct include_header{ @@ -489,4 +502,7 @@ struct zip_tag { } +#include "operators/rx-multicast.hpp" +#include "operators/rx-publish.hpp" + #endif diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 62d81ff..316f275 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -1,4 +1,6 @@ #include "../test.h" +#include + SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){ GIVEN("a range"){ diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index a4ce52d..fb351bd 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -3,6 +3,7 @@ #include "rxcpp/operators/rx-map.hpp" #include "rxcpp/operators/rx-take.hpp" #include "rxcpp/operators/rx-observe_on.hpp" +#include "rxcpp/operators/rx-publish.hpp" SCENARIO("observe subscription", "[hide]"){ GIVEN("observable of ints"){ -- cgit v1.2.3