diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-17 16:17:57 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-17 09:05:38 -0800 |
commit | dda07ac5ef860f384463a3f6b27e4d5096a45532 (patch) | |
tree | 0ad8306b22aceff1680508e1e0dc3de23d955889 | |
parent | 6a35d221de3054885b69f1c82a78c99c83a8f47a (diff) | |
download | RxCpp-dda07ac5ef860f384463a3f6b27e4d5096a45532.tar.gz |
decouple switch_on_next from observable
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp | 89 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 50 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/switch_on_next.cpp | 4 |
5 files changed, 81 insertions, 71 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp index 35b8e67..b18963a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp @@ -2,6 +2,21 @@ #pragma once +/*! \file rx-switch_on_next.hpp + + \brief Return observable that emits the items emitted by the observable most recently emitted by the source observable. + + \tparam Coordination the type of the scheduler (optional). + + \param cn the scheduler to synchronize sources from different contexts (optional). + + \return Observable that emits the items emitted by the observable most recently emitted by the source observable. + + \sample + \snippet switch_on_next.cpp switch_on_next sample + \snippet output.txt switch_on_next sample +*/ + #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP) #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP @@ -13,6 +28,16 @@ namespace operators { namespace detail { +template<class... AN> +struct switch_on_next_invalid_arguments {}; + +template<class... AN> +struct switch_on_next_invalid : public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> { + using type = observable<switch_on_next_invalid_arguments<AN...>, switch_on_next_invalid<AN...>>; +}; +template<class... AN> +using switch_on_next_invalid_t = typename switch_on_next_invalid<AN...>::type; + template<class T, class Observable, class Coordination> struct switch_on_next : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> @@ -169,36 +194,54 @@ struct switch_on_next } }; -template<class Coordination> -class switch_on_next_factory +} + +/*! @copydoc rx-switch_on_next.hpp +*/ +template<class... AN> +auto switch_on_next(AN&&... an) + -> operator_factory<switch_on_next_tag, AN...> { + return operator_factory<switch_on_next_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + +} + +template<> +struct member_overload<switch_on_next_tag> { - typedef rxu::decay_t<Coordination> coordination_type; + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, SwitchOnNext> + > + static Result member(Observable&& o) { + return Result(SwitchOnNext(std::forward<Observable>(o), identity_current_thread())); + } - coordination_type coordination; -public: - switch_on_next_factory(coordination_type sf) - : coordination(std::move(sf)) - { + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, SwitchOnNext> + > + static Result member(Observable&& o, Coordination&& cn) { + return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn))); } - template<class Observable> - auto operator()(Observable source) - -> observable<rxu::value_type_t<switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>, switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>> { - return observable<rxu::value_type_t<switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>, switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>( - switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination)); + template<class... AN> + static operators::detail::switch_on_next_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)"); } }; } -template<class Coordination> -auto switch_on_next(Coordination&& sf) - -> detail::switch_on_next_factory<Coordination> { - return detail::switch_on_next_factory<Coordination>(std::forward<Coordination>(sf)); -} - -} - -} - #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 3255fd9..1d47997 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -213,6 +213,7 @@ #include "operators/rx-skip_last.hpp" #include "operators/rx-skip_until.hpp" #include "operators/rx-switch_if_empty.hpp" +#include "operators/rx-switch_on_next.hpp" #include "operators/rx-take.hpp" #include "operators/rx-take_last.hpp" #include "operators/rx-take_until.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 89d3daa..9f90663 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1039,57 +1039,15 @@ public: return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); } - /// \cond SHOW_SERVICE_MEMBERS - template<class Coordination> - struct defer_switch_on_next : public defer_observable< - is_observable<value_type>, - this_type, - rxo::detail::switch_on_next, value_type, observable<value_type>, Coordination> - { - }; - /// \endcond - - /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable. - - \return Observable that emits the items emitted by the observable most recently emitted by the source observable. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet switch_on_next.cpp switch_on_next sample - \snippet output.txt switch_on_next sample + /*! @copydoc rx-switch_on_next.hpp */ template<class... AN> - auto switch_on_next(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename defer_switch_on_next<identity_one_worker>::observable_type - /// \endcond - { - return defer_switch_on_next<identity_one_worker>::make(*this, *this, identity_current_thread()); - static_assert(sizeof...(AN) == 0, "switch_on_next() was passed too many arguments."); - } - - /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to synchronize sources from different contexts - - \return Observable that emits the items emitted by the observable most recently emitted by the source observable. - - \sample - \snippet switch_on_next.cpp threaded switch_on_next sample - \snippet output.txt threaded switch_on_next sample - */ - template<class Coordination> - auto switch_on_next(Coordination cn) const + auto switch_on_next(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_switch_on_next<Coordination>::value, - typename defer_switch_on_next<Coordination>::observable_type>::type + -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return defer_switch_on_next<Coordination>::make(*this, *this, std::move(cn)); + return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-merge.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 208b3ef..e5a3eaf 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -107,7 +107,6 @@ public: #include "operators/rx-start_with.hpp" #include "operators/rx-subscribe.hpp" #include "operators/rx-subscribe_on.hpp" -#include "operators/rx-switch_on_next.hpp" namespace rxcpp { @@ -347,6 +346,13 @@ struct switch_if_empty_tag { }; struct default_if_empty_tag : switch_if_empty_tag {}; +struct switch_on_next_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-switch_on_next.hpp>"); + }; +}; + struct take_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/switch_on_next.cpp b/Rx/v2/test/operators/switch_on_next.cpp index d4c0f60..f3515ba 100644 --- a/Rx/v2/test/operators/switch_on_next.cpp +++ b/Rx/v2/test/operators/switch_on_next.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include <rxcpp/operators/rx-switch_on_next.hpp> SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){ GIVEN("a source"){ @@ -44,7 +45,8 @@ SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){ auto res = w.start( [xs]() { - return xs.switch_on_next(); + return xs + | rxo::switch_on_next(); } ); |