diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-27 22:56:57 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-27 14:07:26 -0800 |
commit | 00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4 (patch) | |
tree | ee7a7189cbf32225fd2cba4d228725f70446b4ee | |
parent | 4b6a2f541dece7256271374ca6555ce12a76a99b (diff) | |
download | RxCpp-00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4.tar.gz |
decouple connect_forever from connectable_observable
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp | 68 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 19 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 2 |
5 files changed, 65 insertions, 35 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp index 1117a48..bcbe830 100644 --- a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp @@ -2,6 +2,14 @@ #pragma once +/*! \file rx-connect_forever.hpp + + \brief takes a connectable_observable source and calls connect during the construction of the expression. + This means that the source starts running without any subscribers and continues running after all subscriptions have been unsubscribed. + + \return An observable that emitting the items from its source. + */ + #if !defined(RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP) #define RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP @@ -13,6 +21,16 @@ namespace operators { namespace detail { +template<class... AN> +struct connect_forever_invalid_arguments {}; + +template<class... AN> +struct connect_forever_invalid : public rxo::operator_base<connect_forever_invalid_arguments<AN...>> { + using type = observable<connect_forever_invalid_arguments<AN...>, connect_forever_invalid<AN...>>; +}; +template<class... AN> +using connect_forever_invalid_t = typename connect_forever_invalid<AN...>::type; + template<class T, class ConnectableObservable> struct connect_forever : public operator_base<T> { @@ -32,33 +50,41 @@ struct connect_forever : public operator_base<T> } }; -class connect_forever_factory -{ -public: - connect_forever_factory() {} - template<class... TN> - auto operator()(connectable_observable<TN...>&& source) - -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { - return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( - connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source))); - } - template<class... TN> - auto operator()(const connectable_observable<TN...>& source) - -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { - return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( - connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source)); - } -}; - } -inline auto connect_forever() - -> detail::connect_forever_factory { - return detail::connect_forever_factory(); +/*! @copydoc rx-connect_forever.hpp +*/ +template<class... AN> +auto connect_forever(AN&&... an) +-> operator_factory<connect_forever_tag, AN...> { + return operator_factory<connect_forever_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } } +template<> +struct member_overload<connect_forever_tag> +{ + template<class ConnectableObservable, + class Enabled = rxu::enable_if_all_true_type_t< + is_connectable_observable<ConnectableObservable>>, + class SourceValue = rxu::value_type_t<ConnectableObservable>, + class ConnectForever = rxo::detail::connect_forever<SourceValue, rxu::decay_t<ConnectableObservable>>, + class Value = rxu::value_type_t<ConnectForever>, + class Result = observable<Value, ConnectForever> + > + static Result member(ConnectableObservable&& o) { + return Result(ConnectForever(std::forward<ConnectableObservable>(o))); + } + + template<class... AN> + static operators::detail::connect_forever_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "connect_forever takes no arguments"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 373c16c..4c86994 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -174,16 +174,15 @@ public: rxo::detail::ref_count<T, this_type>(*this)); } - /// connect_forever -> - /// takes a connectable_observable source and calls connect during - /// the construction of the expression. This means that the source - /// starts running without any subscribers and continues running - /// after all subscriptions have been unsubscribed. - /// - auto connect_forever() const - -> observable<T, rxo::detail::connect_forever<T, this_type>> { - return observable<T, rxo::detail::connect_forever<T, this_type>>( - rxo::detail::connect_forever<T, this_type>(*this)); + /*! @copydoc rx-connect_forever.hpp + */ + template<class... AN> + auto connect_forever(AN... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond + { + return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...); } }; diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index e8c9b8c..66f5537 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -191,6 +191,7 @@ #include "operators/rx-combine_latest.hpp" #include "operators/rx-concat.hpp" #include "operators/rx-concat_map.hpp" +#include "operators/rx-connect_forever.hpp" #include "operators/rx-debounce.hpp" #include "operators/rx-delay.hpp" #include "operators/rx-distinct.hpp" @@ -203,11 +204,9 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" -#include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" -#include "operators/rx-publish.hpp" #include "operators/rx-reduce.hpp" #include "operators/rx-repeat.hpp" #include "operators/rx-replay.hpp" diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index dda27f7..4d05dc2 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -95,7 +95,6 @@ public: } -#include "operators/rx-connect_forever.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-subscribe.hpp" @@ -170,6 +169,13 @@ struct concat_map_tag { }; }; +struct connect_forever_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-connect_forever.hpp>"); + }; +}; + struct debounce_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 316f275..e86374b 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -1,6 +1,6 @@ #include "../test.h" #include <rxcpp/operators/rx-publish.hpp> - +#include <rxcpp/operators/rx-connect_forever.hpp> SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){ GIVEN("a range"){ |