summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-24 19:39:47 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-24 08:39:47 -0800
commit6f4ca1198079ed2a439f53865d494729b975bbb7 (patch)
tree37da80a3e6fd7e8ffdeb96d33faf5ac986c84b06 /Rx/v2/src/rxcpp
parentff18d4640e5f2f96eba2fe17fd5d91132f44dc0f (diff)
downloadRxCpp-6f4ca1198079ed2a439f53865d494729b975bbb7.tar.gz
decouple observe_on from observable (#335)
* decouple observe_on from observable * decouple observe_on from observable - fix msvc
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp73
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp23
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 68 insertions, 37 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index f4b69ff..99de4c3 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -2,6 +2,24 @@
#pragma once
+/*! \file rx-observe_on.hpp
+
+ \brief All values are queued and delivered using the scheduler from the supplied coordination.
+
+ \tparam Coordination the type of the scheduler.
+
+ \param cn the scheduler to notify observers on.
+
+ \return The source observable modified so that its observers are notified on the specified scheduler.
+
+ \sample
+ \snippet observe_on.cpp observe_on sample
+ \snippet output.txt observe_on sample
+
+ Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
+ \snippet output.txt subscribe_on sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
#define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
@@ -13,6 +31,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct observe_on_invalid_arguments {};
+
+template<class... AN>
+struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
+ using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
+};
+template<class... AN>
+using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
+
template<class T, class Coordination>
struct observe_on
{
@@ -195,30 +223,39 @@ struct observe_on
}
};
-template<class Coordination>
-class observe_on_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
- coordination_type coordination;
-public:
- observe_on_factory(coordination_type cn) : coordination(std::move(cn)) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(observe_on<rxu::value_type_t<rxu::decay_t<Observable>>, coordination_type>(coordination))) {
- return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(observe_on<rxu::value_type_t<rxu::decay_t<Observable>>, coordination_type>(coordination));
- }
-};
+}
+/*! @copydoc rx-observe_on.hpp
+*/
+template<class... AN>
+auto observe_on(AN&&... an)
+ -> operator_factory<observe_on_tag, AN...> {
+ return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-template<class Coordination>
-auto observe_on(Coordination cn)
- -> detail::observe_on_factory<Coordination> {
- return detail::observe_on_factory<Coordination>(std::move(cn));
}
+template<>
+struct member_overload<observe_on_tag>
+{
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
+ static auto member(Observable&& o, Coordination&& cn)
+ -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
+ return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
+ }
-}
+ template<class... AN>
+ static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
+ }
+};
class observe_on_one_worker : public coordination_base
{
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 3a5a7ab..7027648 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -203,6 +203,7 @@
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
+#include "operators/rx-observe_on.hpp"
#include "operators/rx-on_error_resume_next.hpp"
#include "operators/rx-pairwise.hpp"
#include "operators/rx-reduce.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index b29dc52..f121cf7 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1509,28 +1509,15 @@ public:
return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
}
- /*! All values are queued and delivered using the scheduler from the supplied coordination.
-
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to notify observers on
-
- \return The source observable modified so that its observers are notified on the specified scheduler.
-
- \sample
- \snippet observe_on.cpp observe_on sample
- \snippet output.txt observe_on sample
-
- Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
- \snippet output.txt subscribe_on sample
+ /*! @copydoc rx-observe_on.hpp
*/
- template<class Coordination>
- auto observe_on(Coordination cn) const
+ template<class... AN>
+ auto observe_on(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))))
+ -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
+ return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-reduce.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 4fbf34f..086315b 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -98,7 +98,6 @@ public:
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-multicast.hpp"
-#include "operators/rx-observe_on.hpp"
#include "operators/rx-publish.hpp"
#include "operators/rx-ref_count.hpp"
#include "operators/rx-replay.hpp"
@@ -258,6 +257,13 @@ struct merge_tag {
};
};
+struct observe_on_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-observe_on.hpp>");
+ };
+};
+
struct on_error_resume_next_tag {
template<class Included>
struct include_header{