summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp80
1 files changed, 57 insertions, 23 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
index fb38902..f026b59 100644
--- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
@@ -2,6 +2,24 @@
#pragma once
+/*! \file rx-subscribe_on.hpp
+
+ \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
+
+ \tparam Coordination the type of the scheduler.
+
+ \param cn the scheduler to perform subscription actions on.
+
+ \return The source observable modified so that its subscriptions happen on the specified scheduler.
+
+ \sample
+ \snippet subscribe_on.cpp subscribe_on sample
+ \snippet output.txt subscribe_on sample
+
+ Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
+ \snippet output.txt observe_on sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
#define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
@@ -13,6 +31,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct subscribe_on_invalid_arguments {};
+
+template<class... AN>
+struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
+ using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>;
+};
+template<class... AN>
+using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type;
+
template<class T, class Observable, class Coordination>
struct subscribe_on : public operator_base<T>
{
@@ -114,35 +142,41 @@ private:
subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
};
-template<class Coordination>
-class subscribe_on_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
-
- coordination_type coordination;
-public:
- subscribe_on_factory(coordination_type sf)
- : coordination(std::move(sf))
- {
- }
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>>(
- subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>(std::forward<Observable>(source), coordination));
- }
-};
-
}
-template<class Coordination>
-auto subscribe_on(Coordination sf)
- -> detail::subscribe_on_factory<Coordination> {
- return detail::subscribe_on_factory<Coordination>(std::move(sf));
+/*! @copydoc rx-subscribe_on.hpp
+*/
+template<class... AN>
+auto subscribe_on(AN&&... an)
+ -> operator_factory<subscribe_on_tag, AN...> {
+ return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<subscribe_on_tag>
+{
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SubscribeOn>,
+ class Result = observable<Value, SubscribeOn>>
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)");
+ }
+};
+
}
#endif