diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-28 12:41:20 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-28 09:44:06 -0800 |
commit | 69abcd24d9ddc3490b5adeac213089318d05e4b8 (patch) | |
tree | 87a36fc091dc8a9367af6c03f5acd93a94d32441 | |
parent | 00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4 (diff) | |
download | RxCpp-69abcd24d9ddc3490b5adeac213089318d05e4b8.tar.gz |
decouple ref_count from connectable_observable
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-ref_count.hpp | 70 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 19 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 9 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 2 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/subscription.cpp | 1 |
5 files changed, 68 insertions, 33 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp index 886c605..55dde05 100644 --- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp @@ -2,6 +2,14 @@ #pragma once +/*! \file rx-ref_count.hpp + + \brief takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source. + The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection. + + \return An observable that emitting the items from its source. + */ + #if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP) #define RXCPP_OPERATORS_RX_REF_COUNT_HPP @@ -13,6 +21,16 @@ namespace operators { namespace detail { +template<class... AN> +struct ref_count_invalid_arguments {}; + +template<class... AN> +struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments<AN...>> { + using type = observable<ref_count_invalid_arguments<AN...>, ref_count_invalid<AN...>>; +}; +template<class... AN> +using ref_count_invalid_t = typename ref_count_invalid<AN...>::type; + template<class T, class ConnectableObservable> struct ref_count : public operator_base<T> { @@ -59,33 +77,41 @@ struct ref_count : public operator_base<T> } }; -class ref_count_factory -{ -public: - ref_count_factory() {} - template<class... TN> - auto operator()(connectable_observable<TN...>&& source) - -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { - return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( - ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source))); - } - template<class... TN> - auto operator()(const connectable_observable<TN...>& source) - -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { - return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( - ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source)); - } -}; - } -inline auto ref_count() - -> detail::ref_count_factory { - return detail::ref_count_factory(); +/*! @copydoc rx-ref_count.hpp +*/ +template<class... AN> +auto ref_count(AN&&... an) + -> operator_factory<ref_count_tag, AN...> { + return operator_factory<ref_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + } -} +template<> +struct member_overload<ref_count_tag> +{ + template<class ConnectableObservable, + class Enabled = rxu::enable_if_all_true_type_t< + is_connectable_observable<ConnectableObservable>>, + class SourceValue = rxu::value_type_t<ConnectableObservable>, + class RefCount = rxo::detail::ref_count<SourceValue, rxu::decay_t<ConnectableObservable>>, + class Value = rxu::value_type_t<RefCount>, + class Result = observable<Value, RefCount> + > + static Result member(ConnectableObservable&& o) { + return Result(RefCount(std::forward<ConnectableObservable>(o))); + } + template<class... AN> + static operators::detail::ref_count_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 4c86994..7038e24 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -162,16 +162,15 @@ public: return cs; } - /// ref_count -> - /// takes a connectable_observable source and uses a ref_count of the subscribers - /// to control the connection to the published source. The first subscription - /// will cause a call to connect() and the last unsubscribe will unsubscribe the - /// connection. - /// - auto ref_count() const - -> observable<T, rxo::detail::ref_count<T, this_type>> { - return observable<T, rxo::detail::ref_count<T, this_type>>( - rxo::detail::ref_count<T, this_type>(*this)); + /*! @copydoc rx-ref_count.hpp + */ + template<class... AN> + auto ref_count(AN... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond + { + return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-connect_forever.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4d05dc2..0fa5d58 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -96,7 +96,6 @@ public: } #include "operators/rx-lift.hpp" -#include "operators/rx-ref_count.hpp" #include "operators/rx-subscribe.hpp" namespace rxcpp { @@ -301,6 +300,13 @@ struct average_tag : reduce_tag {}; struct min_tag : reduce_tag {}; struct max_tag : reduce_tag {}; +struct ref_count_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-ref_count.hpp>"); + }; +}; + struct pairwise_tag { template<class Included> struct include_header{ @@ -510,5 +516,6 @@ struct zip_tag { #include "operators/rx-multicast.hpp" #include "operators/rx-publish.hpp" +#include "operators/rx-ref_count.hpp" #endif diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index e86374b..87c8fff 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -1,6 +1,8 @@ #include "../test.h" #include <rxcpp/operators/rx-publish.hpp> #include <rxcpp/operators/rx-connect_forever.hpp> +#include <rxcpp/operators/rx-ref_count.hpp> + SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){ GIVEN("a range"){ diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index fb351bd..2a9f15d 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -4,6 +4,7 @@ #include "rxcpp/operators/rx-take.hpp" #include "rxcpp/operators/rx-observe_on.hpp" #include "rxcpp/operators/rx-publish.hpp" +#include "rxcpp/operators/rx-ref_count.hpp" SCENARIO("observe subscription", "[hide]"){ GIVEN("observable of ints"){ |