summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-23 01:08:25 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-22 15:56:57 -0800
commitff18d4640e5f2f96eba2fe17fd5d91132f44dc0f (patch)
treec7472d71eb1bf05058b781a1c81155da92672600
parentc7ce40f6a2dd1ead5587c089dcc61a276649e592 (diff)
downloadRxCpp-ff18d4640e5f2f96eba2fe17fd5d91132f44dc0f.tar.gz
decouple subscribe_on from observable
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp80
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp26
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/subscribe_on.cpp91
5 files changed, 162 insertions, 44 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
index fb38902..f026b59 100644
--- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
@@ -2,6 +2,24 @@
#pragma once
+/*! \file rx-subscribe_on.hpp
+
+ \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
+
+ \tparam Coordination the type of the scheduler.
+
+ \param cn the scheduler to perform subscription actions on.
+
+ \return The source observable modified so that its subscriptions happen on the specified scheduler.
+
+ \sample
+ \snippet subscribe_on.cpp subscribe_on sample
+ \snippet output.txt subscribe_on sample
+
+ Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
+ \snippet output.txt observe_on sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
#define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
@@ -13,6 +31,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct subscribe_on_invalid_arguments {};
+
+template<class... AN>
+struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
+ using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>;
+};
+template<class... AN>
+using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type;
+
template<class T, class Observable, class Coordination>
struct subscribe_on : public operator_base<T>
{
@@ -114,35 +142,41 @@ private:
subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
};
-template<class Coordination>
-class subscribe_on_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
-
- coordination_type coordination;
-public:
- subscribe_on_factory(coordination_type sf)
- : coordination(std::move(sf))
- {
- }
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>>(
- subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>(std::forward<Observable>(source), coordination));
- }
-};
-
}
-template<class Coordination>
-auto subscribe_on(Coordination sf)
- -> detail::subscribe_on_factory<Coordination> {
- return detail::subscribe_on_factory<Coordination>(std::move(sf));
+/*! @copydoc rx-subscribe_on.hpp
+*/
+template<class... AN>
+auto subscribe_on(AN&&... an)
+ -> operator_factory<subscribe_on_tag, AN...> {
+ return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<subscribe_on_tag>
+{
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SubscribeOn>,
+ class Result = observable<Value, SubscribeOn>>
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index d43dc3b..3a5a7ab 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -215,6 +215,7 @@
#include "operators/rx-skip_last.hpp"
#include "operators/rx-skip_until.hpp"
#include "operators/rx-start_with.hpp"
+#include "operators/rx-subscribe_on.hpp"
#include "operators/rx-switch_if_empty.hpp"
#include "operators/rx-switch_on_next.hpp"
#include "operators/rx-take.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 2863d43..b29dc52 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1498,29 +1498,15 @@ public:
return multicast(rxsub::replay<T, Coordination>(count, period, std::move(cn), cs));
}
- /*! Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
-
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to perform subscription actions on
-
- \return The source observable modified so that its subscriptions happen on the specified scheduler.
-
- \sample
- \snippet subscribe_on.cpp subscribe_on sample
- \snippet output.txt subscribe_on sample
-
- Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
- \snippet output.txt observe_on sample
- */
- template<class Coordination>
- auto subscribe_on(Coordination cn) const
+ /*! @copydoc rx-subscribe_on.hpp
+ */
+ template<class... AN>
+ auto subscribe_on(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>>
+ -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>>(
- rxo::detail::subscribe_on<T, this_type, Coordination>(*this, std::move(cn)));
+ return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
}
/*! All values are queued and delivered using the scheduler from the supplied coordination.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 30c5e5b..4fbf34f 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -103,7 +103,6 @@ public:
#include "operators/rx-ref_count.hpp"
#include "operators/rx-replay.hpp"
#include "operators/rx-subscribe.hpp"
-#include "operators/rx-subscribe_on.hpp"
namespace rxcpp {
@@ -356,6 +355,13 @@ struct start_with_tag {
};
};
+struct subscribe_on_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-subscribe_on.hpp>");
+ };
+};
+
struct switch_if_empty_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
index 80e9b2c..ded1ea4 100644
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ b/Rx/v2/test/operators/subscribe_on.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-map.hpp>
+#include <rxcpp/operators/rx-subscribe_on.hpp>
static const int static_subscriptions = 50000;
@@ -84,3 +85,93 @@ SCENARIO("for loop subscribes to map with subscribe_on", "[hide][subscribe_on_on
}
}
}
+
+SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("subscribe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ .subscribe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(201, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("subscribe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::subscribe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(201, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}