From dda07ac5ef860f384463a3f6b27e4d5096a45532 Mon Sep 17 00:00:00 2001 From: Grigoriy Chudnov Date: Tue, 17 Jan 2017 16:17:57 +0300 Subject: decouple switch_on_next from observable --- Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp | 89 ++++++++++++++++++------- Rx/v2/src/rxcpp/rx-includes.hpp | 1 + Rx/v2/src/rxcpp/rx-observable.hpp | 50 ++------------ Rx/v2/src/rxcpp/rx-operators.hpp | 8 ++- Rx/v2/test/operators/switch_on_next.cpp | 4 +- 5 files changed, 81 insertions(+), 71 deletions(-) diff --git a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp index 35b8e67..b18963a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp @@ -2,6 +2,21 @@ #pragma once +/*! \file rx-switch_on_next.hpp + + \brief Return observable that emits the items emitted by the observable most recently emitted by the source observable. + + \tparam Coordination the type of the scheduler (optional). + + \param cn the scheduler to synchronize sources from different contexts (optional). + + \return Observable that emits the items emitted by the observable most recently emitted by the source observable. + + \sample + \snippet switch_on_next.cpp switch_on_next sample + \snippet output.txt switch_on_next sample +*/ + #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP) #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP @@ -13,6 +28,16 @@ namespace operators { namespace detail { +template +struct switch_on_next_invalid_arguments {}; + +template +struct switch_on_next_invalid : public rxo::operator_base> { + using type = observable, switch_on_next_invalid>; +}; +template +using switch_on_next_invalid_t = typename switch_on_next_invalid::type; + template struct switch_on_next : public operator_base>> @@ -169,36 +194,54 @@ struct switch_on_next } }; -template -class switch_on_next_factory +} + +/*! @copydoc rx-switch_on_next.hpp +*/ +template +auto switch_on_next(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); +} + +} + +template<> +struct member_overload { - typedef rxu::decay_t coordination_type; + template>, + class SourceValue = rxu::value_type_t, + class SwitchOnNext = rxo::detail::switch_on_next, identity_one_worker>, + class Value = rxu::value_type_t, + class Result = observable + > + static Result member(Observable&& o) { + return Result(SwitchOnNext(std::forward(o), identity_current_thread())); + } - coordination_type coordination; -public: - switch_on_next_factory(coordination_type sf) - : coordination(std::move(sf)) - { + template, + is_coordination>, + class SourceValue = rxu::value_type_t, + class SwitchOnNext = rxo::detail::switch_on_next, rxu::decay_t>, + class Value = rxu::value_type_t, + class Result = observable + > + static Result member(Observable&& o, Coordination&& cn) { + return Result(SwitchOnNext(std::forward(o), std::forward(cn))); } - template - auto operator()(Observable source) - -> observable, Observable, Coordination>>, switch_on_next, Observable, Coordination>> { - return observable, Observable, Coordination>>, switch_on_next, Observable, Coordination>>( - switch_on_next, Observable, Coordination>(std::move(source), coordination)); + template + static operators::detail::switch_on_next_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)"); } }; } -template -auto switch_on_next(Coordination&& sf) - -> detail::switch_on_next_factory { - return detail::switch_on_next_factory(std::forward(sf)); -} - -} - -} - #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 3255fd9..1d47997 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -213,6 +213,7 @@ #include "operators/rx-skip_last.hpp" #include "operators/rx-skip_until.hpp" #include "operators/rx-switch_if_empty.hpp" +#include "operators/rx-switch_on_next.hpp" #include "operators/rx-take.hpp" #include "operators/rx-take_last.hpp" #include "operators/rx-take_until.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 89d3daa..9f90663 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1039,57 +1039,15 @@ public: return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward(an)...); } - /// \cond SHOW_SERVICE_MEMBERS - template - struct defer_switch_on_next : public defer_observable< - is_observable, - this_type, - rxo::detail::switch_on_next, value_type, observable, Coordination> - { - }; - /// \endcond - - /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable. - - \return Observable that emits the items emitted by the observable most recently emitted by the source observable. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet switch_on_next.cpp switch_on_next sample - \snippet output.txt switch_on_next sample + /*! @copydoc rx-switch_on_next.hpp */ template - auto switch_on_next(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename defer_switch_on_next::observable_type - /// \endcond - { - return defer_switch_on_next::make(*this, *this, identity_current_thread()); - static_assert(sizeof...(AN) == 0, "switch_on_next() was passed too many arguments."); - } - - /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to synchronize sources from different contexts - - \return Observable that emits the items emitted by the observable most recently emitted by the source observable. - - \sample - \snippet switch_on_next.cpp threaded switch_on_next sample - \snippet output.txt threaded switch_on_next sample - */ - template - auto switch_on_next(Coordination cn) const + auto switch_on_next(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_switch_on_next::value, - typename defer_switch_on_next::observable_type>::type + -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return defer_switch_on_next::make(*this, *this, std::move(cn)); + return observable_member(switch_on_next_tag{}, *this, std::forward(an)...); } /*! @copydoc rx-merge.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 208b3ef..e5a3eaf 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -107,7 +107,6 @@ public: #include "operators/rx-start_with.hpp" #include "operators/rx-subscribe.hpp" #include "operators/rx-subscribe_on.hpp" -#include "operators/rx-switch_on_next.hpp" namespace rxcpp { @@ -347,6 +346,13 @@ struct switch_if_empty_tag { }; struct default_if_empty_tag : switch_if_empty_tag {}; +struct switch_on_next_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct take_tag { template struct include_header{ diff --git a/Rx/v2/test/operators/switch_on_next.cpp b/Rx/v2/test/operators/switch_on_next.cpp index d4c0f60..f3515ba 100644 --- a/Rx/v2/test/operators/switch_on_next.cpp +++ b/Rx/v2/test/operators/switch_on_next.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){ GIVEN("a source"){ @@ -44,7 +45,8 @@ SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){ auto res = w.start( [xs]() { - return xs.switch_on_next(); + return xs + | rxo::switch_on_next(); } ); -- cgit v1.2.3