diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-13 19:41:28 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-13 19:41:28 -0700 |
commit | 64de52450d8a449257d06d2d33540acefa5a8728 (patch) | |
tree | 42a7fd23d50f5ba13ca86d90e5e2c44fd8eb4ccf /Rx | |
parent | 52d6a9cd64ec5256d7320f0dab106e1c944a8df7 (diff) | |
download | RxCpp-64de52450d8a449257d06d2d33540acefa5a8728.tar.gz |
add override support to make_subscriber
make_subscriber can take an existing subscriber and the some overrides
and produce a new subscriber that reuses what has not been overridden.
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 6 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observer.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 112 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/observer.cpp | 5 |
5 files changed, 117 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 2dff6a0..6a9c02f 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -118,10 +118,12 @@ private: template<class U, class SO> friend class observable; - template<class I> - auto detail_subscribe(observer<T, I> o, tag_observer&&) const + template<class Observer> + auto detail_subscribe(Observer o, tag_observer&&) const -> decltype(make_subscription(o)) { + static_assert(is_observer<Observer>::value, "subscribe must be passed an observer"); + if (!o.is_subscribed()) { return make_subscription(o); } diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp index 369339d..c5cb7a2 100644 --- a/Rx/v2/src/rxcpp/rx-observer.hpp +++ b/Rx/v2/src/rxcpp/rx-observer.hpp @@ -489,6 +489,9 @@ public: ~observer() { } + observer() + { + } observer(inner_t inner) : inner(std::move(inner)) { diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 5234c5d..1d53ab5 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -9,8 +9,25 @@ namespace rxcpp { +struct tag_subscriber {}; +template<class T> +struct subscriber_base : public observer_root<T>, public subscription_base, public resumption_base +{ + typedef tag_subscriber subscriber_tag; +}; +template<class T> +class is_subscriber +{ + template<class C> + static typename C::subscriber_tag* check(int); + template<class C> + static void check(...); +public: + static const bool value = std::is_convertible<decltype(check<T>(0)), tag_subscriber*>::value; +}; + template<class T, class Observer = observer<T>> -class subscriber : public observer_root<T>, public subscription_base +class subscriber : public subscriber_base<T> { composite_subscription lifetime; resumption controller; @@ -101,21 +118,97 @@ auto make_observer_resolved(ResolvedArgSet&& rs) static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order"); } +template<class T, bool subscriber_is_arg, bool observer_is_arg, bool onnext_is_arg> +struct observer_selector; + +template<class T, bool subscriber_is_arg> +struct observer_selector<T, subscriber_is_arg, true, false> +{ + template<class Set> + static auto get_observer(Set&& rs) + -> decltype(std::get<5>(std::forward<Set>(rs))) { + return std::get<5>(std::forward<Set>(rs)); + } +}; +template<class T, bool subscriber_is_arg> +struct observer_selector<T, subscriber_is_arg, false, true> +{ + template<class Set> + static auto get_observer(Set&& rs) + -> decltype(make_observer_resolved<T>(std::forward<Set>(rs))) { + return make_observer_resolved<T>(std::forward<Set>(rs)); + } +}; +template<class T> +struct observer_selector<T, true, false, false> +{ + template<class Set> + static auto get_observer(Set&& rs) + -> decltype(std::get<6>(std::forward<Set>(rs)).value.get_observer()) { + return std::get<6>(std::forward<Set>(rs)).value.get_observer(); + } +}; + template<class T, class ResolvedArgSet> -auto make_subscriber_resolved(ResolvedArgSet&& rs) - -> subscriber<T, decltype(make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs)))> { - return subscriber<T, decltype(make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs)))>(std::move(std::get<0>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<1>(std::forward<ResolvedArgSet>(rs)).value), make_observer_resolved<T>(std::forward<ResolvedArgSet>(rs))); +auto select_observer(ResolvedArgSet&& rs) + -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) { + return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs)); typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rr_t; + typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t; + typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t; + typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t; + typedef typename std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type ro_t; + typedef typename std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type rs_t; - static_assert(rr_t::is_arg, "resumption is a required parameter"); + static_assert(rs_t::is_arg || ro_t::is_arg || rn_t::is_arg, "at least one of; onnext, observer or subscriber is required"); + static_assert(int(ro_t::is_arg) + int(rn_t::is_arg) < 2, "onnext, onerror and oncompleted not allowed with an observer"); } +template<class T, class ResolvedArgSet> +auto make_subscriber_resolved(ResolvedArgSet&& rs) + -> subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))> { + auto rsub = std::get<0>(std::forward<ResolvedArgSet>(rs)); + auto rr = std::get<1>(std::forward<ResolvedArgSet>(rs)); + const auto& rscrbr = std::get<6>(std::forward<ResolvedArgSet>(rs)); + auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : std::move(rr.value); + auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : std::move(rsub.value); + return subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))>( + std::move(s), std::move(r), select_observer<T>(std::forward<ResolvedArgSet>(rs))); + + typedef typename std::decay<decltype(rr)>::type rr_t; + typedef typename std::decay<decltype(rscrbr)>::type rs_t; + + static_assert(rs_t::is_arg || rr_t::is_arg, "at least one of; resumption or subscriber is a required parameter"); +} + +template<class T> +struct tag_subscriber_resolution +{ + template<class LHS> + struct predicate : public is_subscriber<LHS> + { + }; + typedef subscriber<T, observer<T, void>> default_type; +}; + +template<class T> +struct tag_observer_resolution +{ + template<class LHS> + struct predicate + { + static const bool value = !is_subscriber<LHS>::value && is_observer<LHS>::value; + }; + typedef observer<T, void> default_type; +}; + struct tag_subscription_resolution { template<class LHS> - struct predicate : public is_subscription<LHS> + struct predicate { + static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value; }; typedef composite_subscription default_type; }; @@ -123,8 +216,9 @@ struct tag_subscription_resolution struct tag_resumption_resolution { template<class LHS> - struct predicate : public is_resumption<LHS> + struct predicate { + static const bool value = !is_subscriber<LHS>::value && is_resumption<LHS>::value; }; typedef resumption default_type; }; @@ -172,7 +266,9 @@ struct tag_subscriber_set rxu::detail::tag_set<tag_resumption_resolution, rxu::detail::tag_set<tag_onnext_resolution<T>, rxu::detail::tag_set<tag_onerror_resolution, - rxu::detail::tag_set<tag_oncompleted_resolution>>>>> + rxu::detail::tag_set<tag_oncompleted_resolution, + rxu::detail::tag_set<tag_observer_resolution<T>, + rxu::detail::tag_set<tag_subscriber_resolution<T>>>>>>>> { }; diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 91074d8..53f1b3c 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -310,8 +310,8 @@ struct arg_resolver_n<-1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5 struct tag_unresolvable {}; template<template<class Arg> class Predicate, class Default, class Arg0 = tag_unresolvable, class Arg1 = tag_unresolvable, class Arg2 = tag_unresolvable, class Arg3 = tag_unresolvable, class Arg4 = tag_unresolvable, class Arg5 = tag_unresolvable> struct arg_resolver - : public arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> { + typedef typename arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type type; }; diff --git a/Rx/v2/test/subscriptions/observer.cpp b/Rx/v2/test/subscriptions/observer.cpp index 5536dc0..588ffab 100644 --- a/Rx/v2/test/subscriptions/observer.cpp +++ b/Rx/v2/test/subscriptions/observer.cpp @@ -14,11 +14,16 @@ SCENARIO("subscriber traits", "[observer][traits]"){ // auto arg = rx::rxu::detail::resolve_arg<rx::tag_resumption_resolution::template predicate, typename rx::tag_resumption_resolution::default_type>(rx::resumption(), next, error, completed); // auto argset = rx::rxu::detail::resolve_arg_set(rx::tag_subscriber_set<int>(), rx::resumption(), next, error, completed); // auto o = rx::make_observer_resolved<int>(argset); +// auto o = rx::select_observer<int>(argset); // auto scrbResult = rx::subscriber<int, decltype(o)>(std::move(std::get<0>(argset).value), std::move(std::get<1>(argset).value), o); // static_assert(std::tuple_element<1, decltype(argset)>::type::is_arg, "resumption is a required parameter"); // auto scrbResult = rx::make_subscriber_resolved<int>(rx::rxu::detail::resolve_arg_set(rx::tag_subscriber_set<int>(), rx::resumption(), next, error, completed)); // auto scrbResult = rx::make_subscriber_resolved<int>(argset); auto scrbResult = rx::make_subscriber<int>(rx::resumption(), next, error, completed); + auto scrbdup = rx::make_subscriber<int>(scrbResult); + auto scrbop = rx::make_subscriber<int>(scrbResult, next, error, completed); + auto scrbsharelifetime = rx::make_subscriber<int>(scrbResult, scrbop.get_resumption(), scrbop.get_observer()); + auto scrbuniquelifetime = rx::make_subscriber<int>(scrbResult, rx::composite_subscription()); auto emptyNext = [](int){}; auto scrb = rx::make_subscriber<int>(rx::resumption(), emptyNext); |