summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-17 16:17:57 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-17 09:05:38 -0800
commitdda07ac5ef860f384463a3f6b27e4d5096a45532 (patch)
tree0ad8306b22aceff1680508e1e0dc3de23d955889
parent6a35d221de3054885b69f1c82a78c99c83a8f47a (diff)
downloadRxCpp-dda07ac5ef860f384463a3f6b27e4d5096a45532.tar.gz
decouple switch_on_next from observable
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp89
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp50
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/switch_on_next.cpp4
5 files changed, 81 insertions, 71 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
index 35b8e67..b18963a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
@@ -2,6 +2,21 @@
#pragma once
+/*! \file rx-switch_on_next.hpp
+
+ \brief Return observable that emits the items emitted by the observable most recently emitted by the source observable.
+
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param cn the scheduler to synchronize sources from different contexts (optional).
+
+ \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
+
+ \sample
+ \snippet switch_on_next.cpp switch_on_next sample
+ \snippet output.txt switch_on_next sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP)
#define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP
@@ -13,6 +28,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct switch_on_next_invalid_arguments {};
+
+template<class... AN>
+struct switch_on_next_invalid : public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> {
+ using type = observable<switch_on_next_invalid_arguments<AN...>, switch_on_next_invalid<AN...>>;
+};
+template<class... AN>
+using switch_on_next_invalid_t = typename switch_on_next_invalid<AN...>::type;
+
template<class T, class Observable, class Coordination>
struct switch_on_next
: public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
@@ -169,36 +194,54 @@ struct switch_on_next
}
};
-template<class Coordination>
-class switch_on_next_factory
+}
+
+/*! @copydoc rx-switch_on_next.hpp
+*/
+template<class... AN>
+auto switch_on_next(AN&&... an)
+ -> operator_factory<switch_on_next_tag, AN...> {
+ return operator_factory<switch_on_next_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
+}
+
+template<>
+struct member_overload<switch_on_next_tag>
{
- typedef rxu::decay_t<Coordination> coordination_type;
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, SwitchOnNext>
+ >
+ static Result member(Observable&& o) {
+ return Result(SwitchOnNext(std::forward<Observable>(o), identity_current_thread()));
+ }
- coordination_type coordination;
-public:
- switch_on_next_factory(coordination_type sf)
- : coordination(std::move(sf))
- {
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, SwitchOnNext>
+ >
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn)));
}
- template<class Observable>
- auto operator()(Observable source)
- -> observable<rxu::value_type_t<switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>, switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>> {
- return observable<rxu::value_type_t<switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>, switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>>(
- switch_on_next<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination));
+ template<class... AN>
+ static operators::detail::switch_on_next_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)");
}
};
}
-template<class Coordination>
-auto switch_on_next(Coordination&& sf)
- -> detail::switch_on_next_factory<Coordination> {
- return detail::switch_on_next_factory<Coordination>(std::forward<Coordination>(sf));
-}
-
-}
-
-}
-
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 3255fd9..1d47997 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -213,6 +213,7 @@
#include "operators/rx-skip_last.hpp"
#include "operators/rx-skip_until.hpp"
#include "operators/rx-switch_if_empty.hpp"
+#include "operators/rx-switch_on_next.hpp"
#include "operators/rx-take.hpp"
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 89d3daa..9f90663 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1039,57 +1039,15 @@ public:
return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
}
- /// \cond SHOW_SERVICE_MEMBERS
- template<class Coordination>
- struct defer_switch_on_next : public defer_observable<
- is_observable<value_type>,
- this_type,
- rxo::detail::switch_on_next, value_type, observable<value_type>, Coordination>
- {
- };
- /// \endcond
-
- /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable.
-
- \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet switch_on_next.cpp switch_on_next sample
- \snippet output.txt switch_on_next sample
+ /*! @copydoc rx-switch_on_next.hpp
*/
template<class... AN>
- auto switch_on_next(AN**...) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename defer_switch_on_next<identity_one_worker>::observable_type
- /// \endcond
- {
- return defer_switch_on_next<identity_one_worker>::make(*this, *this, identity_current_thread());
- static_assert(sizeof...(AN) == 0, "switch_on_next() was passed too many arguments.");
- }
-
- /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to synchronize sources from different contexts
-
- \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
-
- \sample
- \snippet switch_on_next.cpp threaded switch_on_next sample
- \snippet output.txt threaded switch_on_next sample
- */
- template<class Coordination>
- auto switch_on_next(Coordination cn) const
+ auto switch_on_next(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_switch_on_next<Coordination>::value,
- typename defer_switch_on_next<Coordination>::observable_type>::type
+ -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return defer_switch_on_next<Coordination>::make(*this, *this, std::move(cn));
+ return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-merge.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 208b3ef..e5a3eaf 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -107,7 +107,6 @@ public:
#include "operators/rx-start_with.hpp"
#include "operators/rx-subscribe.hpp"
#include "operators/rx-subscribe_on.hpp"
-#include "operators/rx-switch_on_next.hpp"
namespace rxcpp {
@@ -347,6 +346,13 @@ struct switch_if_empty_tag {
};
struct default_if_empty_tag : switch_if_empty_tag {};
+struct switch_on_next_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-switch_on_next.hpp>");
+ };
+};
+
struct take_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/switch_on_next.cpp b/Rx/v2/test/operators/switch_on_next.cpp
index d4c0f60..f3515ba 100644
--- a/Rx/v2/test/operators/switch_on_next.cpp
+++ b/Rx/v2/test/operators/switch_on_next.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include <rxcpp/operators/rx-switch_on_next.hpp>
SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){
GIVEN("a source"){
@@ -44,7 +45,8 @@ SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){
auto res = w.start(
[xs]() {
- return xs.switch_on_next();
+ return xs
+ | rxo::switch_on_next();
}
);