summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-03-13 19:41:28 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-03-13 19:41:28 -0700
commit64de52450d8a449257d06d2d33540acefa5a8728 (patch)
tree42a7fd23d50f5ba13ca86d90e5e2c44fd8eb4ccf /Rx
parent52d6a9cd64ec5256d7320f0dab106e1c944a8df7 (diff)
downloadRxCpp-64de52450d8a449257d06d2d33540acefa5a8728.tar.gz
add override support to make_subscriber
make_subscriber can take an existing subscriber and the some overrides and produce a new subscriber that reuses what has not been overridden.
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp6
-rw-r--r--Rx/v2/src/rxcpp/rx-observer.hpp3
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp112
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp2
-rw-r--r--Rx/v2/test/subscriptions/observer.cpp5
5 files changed, 117 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 2dff6a0..6a9c02f 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -118,10 +118,12 @@ private:
template<class U, class SO>
friend class observable;
- template<class I>
- auto detail_subscribe(observer<T, I> o, tag_observer&&) const
+ template<class Observer>
+ auto detail_subscribe(Observer o, tag_observer&&) const
-> decltype(make_subscription(o)) {
+ static_assert(is_observer<Observer>::value, "subscribe must be passed an observer");
+
if (!o.is_subscribed()) {
return make_subscription(o);
}
diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp
index 369339d..c5cb7a2 100644
--- a/Rx/v2/src/rxcpp/rx-observer.hpp
+++ b/Rx/v2/src/rxcpp/rx-observer.hpp
@@ -489,6 +489,9 @@ public:
~observer()
{
}
+ observer()
+ {
+ }
observer(inner_t inner)
: inner(std::move(inner))
{
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 5234c5d..1d53ab5 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -9,8 +9,25 @@
namespace rxcpp {
+struct tag_subscriber {};
+template<class T>
+struct subscriber_base : public observer_root<T>, public subscription_base, public resumption_base
+{
+ typedef tag_subscriber subscriber_tag;
+};
+template<class T>
+class is_subscriber
+{
+ template<class C>
+ static typename C::subscriber_tag* check(int);
+ template<class C>
+ static void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<T>(0)), tag_subscriber*>::value;
+};
+
template<class T, class Observer = observer<T>>
-class subscriber : public observer_root<T>, public subscription_base
+class subscriber : public subscriber_base<T>
{
composite_subscription lifetime;
resumption controller;
@@ -101,21 +118,97 @@ auto make_observer_resolved(ResolvedArgSet&& rs)
static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order");
}
+template<class T, bool subscriber_is_arg, bool observer_is_arg, bool onnext_is_arg>
+struct observer_selector;
+
+template<class T, bool subscriber_is_arg>
+struct observer_selector<T, subscriber_is_arg, true, false>
+{
+ template<class Set>
+ static auto get_observer(Set&& rs)
+ -> decltype(std::get<5>(std::forward<Set>(rs))) {
+ return std::get<5>(std::forward<Set>(rs));
+ }
+};
+template<class T, bool subscriber_is_arg>
+struct observer_selector<T, subscriber_is_arg, false, true>
+{
+ template<class Set>
+ static auto get_observer(Set&& rs)
+ -> decltype(make_observer_resolved<T>(std::forward<Set>(rs))) {
+ return make_observer_resolved<T>(std::forward<Set>(rs));
+ }
+};
+template<class T>
+struct observer_selector<T, true, false, false>
+{
+ template<class Set>
+ static auto get_observer(Set&& rs)
+ -> decltype(std::get<6>(std::forward<Set>(rs)).value.get_observer()) {
+ return std::get<6>(std::forward<Set>(rs)).value.get_observer();
+ }
+};
+
template<class T, class ResolvedArgSet>
-auto make_subscriber_resolved(ResolvedArgSet&& rs)
- -> subscriber<T, decltype(make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs)))> {
- return subscriber<T, decltype(make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs)))>(std::move(std::get<0>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<1>(std::forward<ResolvedArgSet>(rs)).value), make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs)));
+auto select_observer(ResolvedArgSet&& rs)
+ -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) {
+ return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs));
typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rr_t;
+ typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t;
+ typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t;
+ typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t;
+ typedef typename std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type ro_t;
+ typedef typename std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type rs_t;
- static_assert(rr_t::is_arg, "resumption is a required parameter");
+ static_assert(rs_t::is_arg || ro_t::is_arg || rn_t::is_arg, "at least one of; onnext, observer or subscriber is required");
+ static_assert(int(ro_t::is_arg) + int(rn_t::is_arg) < 2, "onnext, onerror and oncompleted not allowed with an observer");
}
+template<class T, class ResolvedArgSet>
+auto make_subscriber_resolved(ResolvedArgSet&& rs)
+ -> subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))> {
+ auto rsub = std::get<0>(std::forward<ResolvedArgSet>(rs));
+ auto rr = std::get<1>(std::forward<ResolvedArgSet>(rs));
+ const auto& rscrbr = std::get<6>(std::forward<ResolvedArgSet>(rs));
+ auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : std::move(rr.value);
+ auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : std::move(rsub.value);
+ return subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))>(
+ std::move(s), std::move(r), select_observer<T>(std::forward<ResolvedArgSet>(rs)));
+
+ typedef typename std::decay<decltype(rr)>::type rr_t;
+ typedef typename std::decay<decltype(rscrbr)>::type rs_t;
+
+ static_assert(rs_t::is_arg || rr_t::is_arg, "at least one of; resumption or subscriber is a required parameter");
+}
+
+template<class T>
+struct tag_subscriber_resolution
+{
+ template<class LHS>
+ struct predicate : public is_subscriber<LHS>
+ {
+ };
+ typedef subscriber<T, observer<T, void>> default_type;
+};
+
+template<class T>
+struct tag_observer_resolution
+{
+ template<class LHS>
+ struct predicate
+ {
+ static const bool value = !is_subscriber<LHS>::value && is_observer<LHS>::value;
+ };
+ typedef observer<T, void> default_type;
+};
+
struct tag_subscription_resolution
{
template<class LHS>
- struct predicate : public is_subscription<LHS>
+ struct predicate
{
+ static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value;
};
typedef composite_subscription default_type;
};
@@ -123,8 +216,9 @@ struct tag_subscription_resolution
struct tag_resumption_resolution
{
template<class LHS>
- struct predicate : public is_resumption<LHS>
+ struct predicate
{
+ static const bool value = !is_subscriber<LHS>::value && is_resumption<LHS>::value;
};
typedef resumption default_type;
};
@@ -172,7 +266,9 @@ struct tag_subscriber_set
rxu::detail::tag_set<tag_resumption_resolution,
rxu::detail::tag_set<tag_onnext_resolution<T>,
rxu::detail::tag_set<tag_onerror_resolution,
- rxu::detail::tag_set<tag_oncompleted_resolution>>>>>
+ rxu::detail::tag_set<tag_oncompleted_resolution,
+ rxu::detail::tag_set<tag_observer_resolution<T>,
+ rxu::detail::tag_set<tag_subscriber_resolution<T>>>>>>>>
{
};
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 91074d8..53f1b3c 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -310,8 +310,8 @@ struct arg_resolver_n<-1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5
struct tag_unresolvable {};
template<template<class Arg> class Predicate, class Default, class Arg0 = tag_unresolvable, class Arg1 = tag_unresolvable, class Arg2 = tag_unresolvable, class Arg3 = tag_unresolvable, class Arg4 = tag_unresolvable, class Arg5 = tag_unresolvable>
struct arg_resolver
- : public arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
{
+ typedef typename arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type type;
};
diff --git a/Rx/v2/test/subscriptions/observer.cpp b/Rx/v2/test/subscriptions/observer.cpp
index 5536dc0..588ffab 100644
--- a/Rx/v2/test/subscriptions/observer.cpp
+++ b/Rx/v2/test/subscriptions/observer.cpp
@@ -14,11 +14,16 @@ SCENARIO("subscriber traits", "[observer][traits]"){
// auto arg = rx::rxu::detail::resolve_arg<rx::tag_resumption_resolution::template predicate, typename rx::tag_resumption_resolution::default_type>(rx::resumption(), next, error, completed);
// auto argset = rx::rxu::detail::resolve_arg_set(rx::tag_subscriber_set<int>(), rx::resumption(), next, error, completed);
// auto o = rx::make_observer_resolved<int>(argset);
+// auto o = rx::select_observer<int>(argset);
// auto scrbResult = rx::subscriber<int, decltype(o)>(std::move(std::get<0>(argset).value), std::move(std::get<1>(argset).value), o);
// static_assert(std::tuple_element<1, decltype(argset)>::type::is_arg, "resumption is a required parameter");
// auto scrbResult = rx::make_subscriber_resolved<int>(rx::rxu::detail::resolve_arg_set(rx::tag_subscriber_set<int>(), rx::resumption(), next, error, completed));
// auto scrbResult = rx::make_subscriber_resolved<int>(argset);
auto scrbResult = rx::make_subscriber<int>(rx::resumption(), next, error, completed);
+ auto scrbdup = rx::make_subscriber<int>(scrbResult);
+ auto scrbop = rx::make_subscriber<int>(scrbResult, next, error, completed);
+ auto scrbsharelifetime = rx::make_subscriber<int>(scrbResult, scrbop.get_resumption(), scrbop.get_observer());
+ auto scrbuniquelifetime = rx::make_subscriber<int>(scrbResult, rx::composite_subscription());
auto emptyNext = [](int){};
auto scrb = rx::make_subscriber<int>(rx::resumption(), emptyNext);