diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-19 18:11:40 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-19 18:11:40 -0700 |
commit | cb42db9eaa8dc288e42584d1ae892b8b811a28f9 (patch) | |
tree | e70eab217c912b45910e914fd6f6c257b4a444a1 /Rx/v2/src | |
parent | 64de52450d8a449257d06d2d33540acefa5a8728 (diff) | |
download | RxCpp-cb42db9eaa8dc288e42584d1ae892b8b811a28f9.tar.gz |
switch subscribe from observer to subscriber
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-filter.hpp | 33 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-flat_map.hpp | 72 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-map.hpp | 34 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-subscribe.hpp | 145 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-notification.hpp | 69 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 172 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observer.hpp | 446 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-predef.hpp | 13 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 204 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscription.hpp | 27 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-test.hpp | 46 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 81 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-test.hpp | 121 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-range.hpp | 4 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-subject.hpp | 10 |
16 files changed, 675 insertions, 804 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-filter.hpp b/Rx/v2/src/rxcpp/operators/rx-filter.hpp index ee61646..28f2caa 100644 --- a/Rx/v2/src/rxcpp/operators/rx-filter.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-filter.hpp @@ -16,23 +16,25 @@ namespace detail { template<class T, class Observable, class Predicate> struct filter : public operator_base<T> { - Observable source; - Predicate test; + typedef typename std::decay<Observable>::type source_type; + typedef typename std::decay<Predicate>::type test_type; + source_type source; + test_type test; template<class CT, class CP> static auto check(int) -> decltype((*(CP*)nullptr)(*(CT*)nullptr)); template<class CT, class CP> static void check(...); - filter(Observable o, Predicate p) + filter(source_type o, test_type p) : source(std::move(o)) , test(std::move(p)) { - static_assert(std::is_convertible<decltype(check<T, Predicate>(0)), bool>::value, "filter Predicate must be a function with the signature bool(T)"); + static_assert(std::is_convertible<decltype(check<T, test_type>(0)), bool>::value, "filter Predicate must be a function with the signature bool(T)"); } - template<class I> - void on_subscribe(observer<T, I> o) { + template<class Subscriber> + void on_subscribe(Subscriber o) { source.subscribe( - o.get_subscription(), + o, // on_next [this, o](T t) { bool filtered = false; @@ -61,23 +63,24 @@ struct filter : public operator_base<T> template<class Predicate> class filter_factory { - Predicate predicate; + typedef typename std::decay<Predicate>::type test_type; + test_type predicate; public: - filter_factory(Predicate p) : predicate(std::move(p)) {} + filter_factory(test_type p) : predicate(std::move(p)) {} template<class Observable> - auto operator()(Observable source) - -> observable<typename Observable::value_type, filter<typename Observable::value_type, Observable, Predicate>> { - return observable<typename Observable::value_type, filter<typename Observable::value_type, Observable, Predicate>>( - filter<typename Observable::value_type, Observable, Predicate>(source, std::move(predicate))); + auto operator()(Observable&& source) + -> observable<typename std::decay<Observable>::type::value_type, filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>> { + return observable<typename std::decay<Observable>::type::value_type, filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>>( + filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>(std::forward<Observable>(source), std::move(predicate))); } }; } template<class Predicate> -auto filter(Predicate p) +auto filter(Predicate&& p) -> detail::filter_factory<Predicate> { - return detail::filter_factory<Predicate>(std::move(p)); + return detail::filter_factory<Predicate>(std::forward<Predicate>(p)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index 7ff88e1..a52d3c8 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -15,7 +15,11 @@ namespace detail { template<class Observable, class CollectionSelector, class ResultSelector> struct flat_map_traits { - typedef typename Observable::value_type source_value_type; + typedef typename std::decay<Observable>::type source_type; + typedef typename std::decay<CollectionSelector>::type collection_selector_type; + typedef typename std::decay<ResultSelector>::type result_selector_type; + + typedef typename source_type::value_type source_value_type; struct tag_not_valid {}; template<class CV, class CCS> @@ -23,9 +27,9 @@ struct flat_map_traits { template<class CV, class CCS> static tag_not_valid collection_check(...); - static_assert(!std::is_same<decltype(collection_check<source_value_type, CollectionSelector>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)"); + static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)"); - typedef decltype((*(CollectionSelector*)nullptr)((*(source_value_type*)nullptr))) collection_type; + typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type; static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable"); @@ -36,9 +40,9 @@ struct flat_map_traits { template<class CV, class CCV, class CRS> static tag_not_valid result_check(...); - static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, ResultSelector>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)"); + static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)"); - typedef decltype((*(ResultSelector*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; + typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; }; template<class Observable, class CollectionSelector, class ResultSelector> @@ -48,39 +52,46 @@ struct flat_map typedef flat_map<Observable, CollectionSelector, ResultSelector> this_type; typedef flat_map_traits<Observable, CollectionSelector, ResultSelector> traits; + typedef typename traits::source_type source_type; + typedef typename traits::collection_selector_type collection_selector_type; + typedef typename traits::result_selector_type result_selector_type; + + typedef typename traits::source_value_type source_value_type; + typedef typename traits::collection_type collection_type; + typedef typename traits::collection_value_type collection_value_type; + struct values { - values(Observable o, CollectionSelector s, ResultSelector rs) + values(source_type o, collection_selector_type s, result_selector_type rs) : source(std::move(o)) , selectCollection(std::move(s)) , selectResult(std::move(rs)) { } - Observable source; - CollectionSelector selectCollection; - ResultSelector selectResult; + source_type source; + collection_selector_type selectCollection; + result_selector_type selectResult; }; values initial; - typedef typename traits::source_value_type source_value_type; - typedef typename traits::collection_type collection_type; - typedef typename traits::collection_value_type collection_value_type; - - flat_map(Observable o, CollectionSelector s, ResultSelector rs) + flat_map(source_type o, collection_selector_type s, result_selector_type rs) : initial(std::move(o), std::move(s), std::move(rs)) { } - template<class I> - void on_subscribe(observer<typename this_type::value_type, I> o) { + template<class Observer> + void on_subscribe(Observer&& o) { + static_assert(is_observer<Observer>::value, "subscribe must be passed an observer"); + + typedef typename std::decay<Observer>::type output_type; - typedef observer<typename this_type::value_type, I> output_type; struct state_type : public std::enable_shared_from_this<state_type> , public values { state_type(values i, output_type oarg) : values(std::move(i)) + , pendingCompletions(0) , out(std::move(oarg)) { } @@ -95,19 +106,20 @@ struct flat_map output_type out; }; // take a copy of the values for each subscription - auto state = std::shared_ptr<state_type>(new state_type(initial, std::move(o))); + auto state = std::shared_ptr<state_type>(new state_type(initial, std::forward<Observer>(o))); composite_subscription outercs; // when the out observer is unsubscribed all the // inner subscriptions are unsubscribed as well - state->out.get_subscription().add(outercs); + state->out.add(outercs); ++state->pendingCompletions; // this subscribe does not share the observer subscription // so that when it is unsubscribed the observer can be called // until the inner subscriptions have finished state->source.subscribe( + state->out, outercs, // on_next [state](source_value_type st) { @@ -124,16 +136,17 @@ struct flat_map // when the out observer is unsubscribed all the // inner subscriptions are unsubscribed as well - auto innercstoken = state->out.get_subscription().add(innercs); + auto innercstoken = state->out.add(innercs); innercs.add(make_subscription([state, innercstoken](){ - state->out.get_subscription().remove(innercstoken); + state->out.remove(innercstoken); })); ++state->pendingCompletions; // this subscribe does not share the source subscription // so that when it is unsubscribed the source will continue selectedCollection->subscribe( + state->out, innercs, // on_next [state, st](collection_value_type ct) { @@ -181,29 +194,32 @@ struct flat_map template<class CollectionSelector, class ResultSelector> class flat_map_factory { - CollectionSelector selectorCollection; - ResultSelector selectorResult; + typedef typename std::decay<CollectionSelector>::type collection_selector_type; + typedef typename std::decay<ResultSelector>::type result_selector_type; + + collection_selector_type selectorCollection; + result_selector_type selectorResult; public: - flat_map_factory(CollectionSelector s, ResultSelector rs) + flat_map_factory(collection_selector_type s, result_selector_type rs) : selectorCollection(std::move(rs)) , selectorResult(std::move(s)) { } template<class Observable> - auto operator()(Observable source) + auto operator()(Observable&& source) -> observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>> { return observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>>( - flat_map<Observable, CollectionSelector, ResultSelector>(source, std::move(selectorCollection), std::move(selectorResult))); + flat_map<Observable, CollectionSelector, ResultSelector>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult))); } }; } template<class CollectionSelector, class ResultSelector> -auto flat_map(CollectionSelector s, ResultSelector rs) +auto flat_map(CollectionSelector&& s, ResultSelector&& rs) -> detail::flat_map_factory<CollectionSelector, ResultSelector> { - return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::move(s), std::move(rs)); + return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp index 939d6e3..91d6673 100644 --- a/Rx/v2/src/rxcpp/operators/rx-map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp @@ -19,15 +19,18 @@ struct map { typedef map<Observable, Selector> this_type; + typedef typename std::decay<Observable>::type source_type; + typedef typename std::decay<Selector>::type select_type; + struct values { - values(Observable o, Selector s) + values(source_type o, select_type s) : source(std::move(o)) , select(std::move(s)) { } - Observable source; - Selector select; + source_type source; + select_type select; }; values initial; @@ -39,17 +42,17 @@ struct map template<class CF, class CP> static tag_not_valid check(...); - static_assert(!std::is_same<decltype(check<source_value_type, Selector>(0)), tag_not_valid>::value, "map Selector must be a function with the signature map::value_type(map::source_value_type)"); + static_assert(!std::is_same<decltype(check<source_value_type, select_type>(0)), tag_not_valid>::value, "map Selector must be a function with the signature map::value_type(map::source_value_type)"); - map(Observable o, Selector s) + map(source_type o, select_type s) : initial(std::move(o), std::move(s)) { } - template<class I> - void on_subscribe(observer<typename this_type::value_type, I> o) { + template<class Subscriber> + void on_subscribe(Subscriber o) { - typedef observer<typename this_type::value_type, I> output_type; + typedef Subscriber output_type; struct state_type : public std::enable_shared_from_this<state_type> , public values @@ -65,7 +68,7 @@ struct map auto state = std::shared_ptr<state_type>(new state_type(initial, std::move(o))); state->source.subscribe( - state->out.get_subscription(), + state->out, // on_next [state](typename this_type::source_value_type st) { util::detail::maybe<typename this_type::value_type> selected; @@ -92,23 +95,24 @@ struct map template<class Selector> class map_factory { - Selector selector; + typedef typename std::decay<Selector>::type select_type; + select_type selector; public: - map_factory(Selector p) : selector(std::move(p)) {} + map_factory(select_type p) : selector(std::move(p)) {} template<class Observable> - auto operator()(Observable source) + auto operator()(Observable&& source) -> observable<typename map<Observable, Selector>::value_type, map<Observable, Selector>> { return observable<typename map<Observable, Selector>::value_type, map<Observable, Selector>>( - map<Observable, Selector>(source, std::move(selector))); + map<Observable, Selector>(std::forward<Observable>(source), std::move(selector))); } }; } template<class Selector> -auto map(Selector p) +auto map(Selector&& p) -> detail::map_factory<Selector> { - return detail::map_factory<Selector>(std::move(p)); + return detail::map_factory<Selector>(std::forward<Selector>(p)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp index 0a53734..cc25c1d 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp @@ -13,118 +13,61 @@ namespace operators { namespace detail { -template<class Observer> -class subscribe_to_observer_factory -{ - Observer observer; -public: - subscribe_to_observer_factory(Observer o) : observer(std::move(o)) {} - template<class Observable> - auto operator()(Observable source) - -> decltype(source.subscribe(std::move(observer))) { - return source.subscribe(std::move(observer)); - } -}; -template<class OnNext, class OnError, class OnCompleted> -class subscribe_factory -{ - OnNext onnext; - OnError onerror; - OnCompleted oncompleted; -public: - subscribe_factory(OnNext n, OnError e, OnCompleted c) - : onnext(std::move(n)) - , onerror(std::move(e)) - , oncompleted(std::move(c)) - {} - template<class Observable> - auto operator()(Observable source) - -> decltype(source.subscribe(make_observer<typename Observable::value_type>(std::move(onnext), std::move(onerror), std::move(oncompleted)))) { - return source.subscribe(make_observer<typename Observable::value_type>(std::move(onnext), std::move(onerror), std::move(oncompleted))); - } -}; -template<class OnNext, class OnError, class OnCompleted> -class subscribe_factory_chained +template<class Subscriber> +class subscribe_factory; + +template<class T, class I> +class subscribe_factory<subscriber<T, I>> { - composite_subscription cs; - OnNext onnext; - OnError onerror; - OnCompleted oncompleted; + subscriber<T, I> scrbr; public: - subscribe_factory_chained(OnNext n, OnError e, OnCompleted c) - : cs(std::move(cs)) - , onnext(std::move(n)) - , onerror(std::move(e)) - , oncompleted(std::move(c)) + subscribe_factory(subscriber<T, I> s) + : scrbr(std::move(s)) {} template<class Observable> - auto operator()(Observable source) - -> decltype(source.subscribe(make_observer<typename Observable::value_type>(std::move(cs), std::move(onnext), std::move(onerror), std::move(oncompleted)))) { - return source.subscribe(make_observer<typename Observable::value_type>(std::move(cs), std::move(onnext), std::move(onerror), std::move(oncompleted))); + auto operator()(Observable&& source) + -> decltype(std::forward<Observable>(source).subscribe(std::move(scrbr))) { + return std::forward<Observable>(source).subscribe(std::move(scrbr)); } }; -template<class Observer> -auto subscribe(Observer o, tag_observer&&) - -> subscribe_to_observer_factory<Observer> { - return subscribe_to_observer_factory<Observer>(std::move(o)); } -struct tag_function {}; -template<class OnNext> -auto subscribe(OnNext n, tag_function&&) - -> subscribe_factory<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty> { - return subscribe_factory<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty>(std::move(n), rxcpp::detail::OnErrorEmpty(), rxcpp::detail::OnCompletedEmpty()); +template<class T, class Arg0> +auto subscribe(Arg0&& a0) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> + (make_subscriber<T>(std::forward<Arg0>(a0))); } - -template<class OnNext, class OnError> -auto subscribe(OnNext n, OnError e, tag_function&&) - -> subscribe_factory<OnNext, OnError, rxcpp::detail::OnCompletedEmpty> { - return subscribe_factory<OnNext, OnError, rxcpp::detail::OnCompletedEmpty>(std::move(n), std::move(e), rxcpp::detail::OnCompletedEmpty()); +template<class T, class Arg0, class Arg1> +auto subscribe(Arg0&& a0, Arg1&& a1) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> + (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1))); } - -template<class OnNext, class OnError, class OnCompleted> -auto subscribe(OnNext n, OnError e, OnCompleted c, tag_function&&) - -> subscribe_factory<OnNext, OnError, OnCompleted> { - return subscribe_factory<OnNext, OnError, OnCompleted>(std::move(n), std::move(e), std::move(c)); -} - -template<class OnNext> -auto subscribe(composite_subscription cs, OnNext n, tag_subscription&&) - -> subscribe_factory_chained<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty> { - return subscribe_factory_chained<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty>(std::move(cs), std::move(n), rxcpp::detail::OnErrorEmpty(), rxcpp::detail::OnCompletedEmpty()); -} - -template<class OnNext, class OnError> -auto subscribe(composite_subscription cs, OnNext n, OnError e, tag_subscription&&) - -> subscribe_factory_chained<OnNext, OnError, rxcpp::detail::OnCompletedEmpty> { - return subscribe_factory_chained<OnNext, OnError, rxcpp::detail::OnCompletedEmpty>(std::move(cs), std::move(n), std::move(e), rxcpp::detail::OnCompletedEmpty()); -} - -} - -template<class Arg> -auto subscribe(Arg a) - -> decltype(detail::subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_function>::type())) { - return detail::subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_function>::type()); +template<class T, class Arg0, class Arg1, class Arg2> +auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> + (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); } - -template<class Arg1, class Arg2> -auto subscribe(Arg1 a1, Arg2 a2) - -> decltype(detail::subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) { - return detail::subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type()); +template<class T, class Arg0, class Arg1, class Arg2, class Arg3> +auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> + (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); } - -template<class Arg1, class Arg2, class Arg3> -auto subscribe(Arg1 a1, Arg2 a2, Arg3 a3) - -> decltype(detail::subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) { - return detail::subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type()); +template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4> +auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> + (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4))); } - -template<class OnNext, class OnError, class OnCompleted> -auto subscribe(composite_subscription cs, OnNext n, OnError e, OnCompleted c) - -> detail::subscribe_factory<OnNext, OnError, OnCompleted> { - return detail::subscribe_factory<OnNext, OnError, OnCompleted>(std::move(cs), std::move(n), std::move(e), std::move(c)); +template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5> +auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> + (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))); } namespace detail { @@ -133,9 +76,9 @@ class dynamic_factory { public: template<class Observable> - auto operator()(Observable source) - -> observable<typename Observable::value_type> { - return observable<typename Observable::value_type>(source); + auto operator()(Observable&& source) + -> observable<typename std::decay<Observable>::type::value_type> { + return observable<typename std::decay<Observable>::type::value_type>(std::forward<Observable>(source)); } }; diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index e210800..9131716 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -94,10 +94,10 @@ #include "rx-predef.hpp" #include "rx-subscription.hpp" #include "rx-observer.hpp" -#include "rx-notification.hpp" #include "rx-scheduler.hpp" #include "rx-regulator.hpp" #include "rx-subscriber.hpp" +#include "rx-notification.hpp" #include "rx-sources.hpp" #include "rx-subjects.hpp" #include "rx-operators.hpp" diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp index 503c392..dd1ff78 100644 --- a/Rx/v2/src/rxcpp/rx-notification.hpp +++ b/Rx/v2/src/rxcpp/rx-notification.hpp @@ -46,7 +46,7 @@ template<typename T> struct notification_base : public std::enable_shared_from_this<notification_base<T>> { - typedef observer<T> observer_type; + typedef subscriber<T> observer_type; typedef std::shared_ptr<notification_base<T>> type; virtual ~notification_base() {} @@ -75,9 +75,9 @@ private: } virtual bool equals(const typename base::type& other) const { bool result = false; - other->accept(make_observer_dynamic<T>([this, &result](T value) { + other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T value) { result = this->value == value; - })); + }))); return result; } virtual void accept(const typename base::observer_type& o) const { @@ -103,9 +103,9 @@ private: virtual bool equals(const typename base::type& other) const { bool result = false; // not trying to compare exceptions - other->accept(make_observer_dynamic<T>(nullptr, [&result](std::exception_ptr){ + other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](std::exception_ptr){ result = true; - })); + }))); return result; } virtual void accept(const typename base::observer_type& o) const { @@ -122,9 +122,9 @@ private: } virtual bool equals(const typename base::type& other) const { bool result = false; - other->accept(make_observer_dynamic<T>(nullptr, nullptr, [&result](){ + other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){ result = true; - })); + }))); return result; } virtual void accept(const typename base::observer_type& o) const { @@ -154,26 +154,49 @@ private: return std::make_shared<on_error_notification>(ep); } + struct on_next_factory + { + type operator()(T value) const { + return std::make_shared<on_next_notification>(std::move(value)); + } + }; + + struct on_completed_factory + { + type operator()() const { + return std::make_shared<on_completed_notification>(); + } + }; + + struct on_error_factory + { + template<typename Exception> + type operator()(Exception&& e) const { + return make_on_error(typename std::conditional< + std::is_same<typename std::decay<Exception>::type, std::exception_ptr>::value, + exception_ptr_tag, exception_tag>::type(), + std::forward<Exception>(e)); + } + }; public: - static - type make_on_next(T value) { - return std::make_shared<on_next_notification>(std::move(value)); - } - static - type make_on_completed() { - return std::make_shared<on_completed_notification>(); - } - template<typename Exception> - static - type make_on_error(Exception&& e) { - return make_on_error(typename std::conditional< - std::is_same<typename std::decay<Exception>::type, std::exception_ptr>::value, - exception_ptr_tag, exception_tag>::type(), - std::forward<Exception>(e)); - } + const static on_next_factory on_next; + const static on_completed_factory on_completed; + const static on_error_factory on_error; }; template<class T> +//static +RXCPP_SELECT_ANY const typename notification<T>::on_next_factory notification<T>::on_next = notification<T>::on_next_factory(); + +template<class T> +//static +RXCPP_SELECT_ANY const typename notification<T>::on_completed_factory notification<T>::on_completed = notification<T>::on_completed_factory(); + +template<class T> +//static +RXCPP_SELECT_ANY const typename notification<T>::on_error_factory notification<T>::on_error = notification<T>::on_error_factory(); + +template<class T> bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) { if (!lhs && !rhs) {return true;} if (!lhs || !rhs) {return false;} diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 6a9c02f..7b1f336 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -20,10 +20,23 @@ struct is_operator_factory_for template<class CS, class CF> static not_void check(...); - typedef decltype(check<Source, F>(0)) detail_result; + typedef decltype(check<typename std::decay<Source>::type, typename std::decay<F>::type>(0)) detail_result; static const bool value = !std::is_same<detail_result, not_void>::value; }; +template<class Subscriber, class T> +struct has_on_subscribe_for +{ + struct not_void {}; + template<class CS, class CT> + static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr)); + template<class CS, class CT> + static not_void check(...); + + typedef decltype(check<typename std::decay<Subscriber>::type, T>(0)) detail_result; + static const bool value = std::is_same<detail_result, void>::value; +}; + } template<class T> @@ -33,7 +46,7 @@ class dynamic_observable struct state_type : public std::enable_shared_from_this<state_type> { - typedef std::function<void(observer<T>)> onsubscribe_type; + typedef std::function<void(subscriber<T>)> onsubscribe_type; onsubscribe_type on_subscribe; }; @@ -48,8 +61,9 @@ class dynamic_observable } template<class SO> - void construct(SO so, rxs::tag_source&&) { - state->on_subscribe = [so](observer<T> o) mutable { + void construct(SO&& source, rxs::tag_source&&) { + typename std::decay<SO>::type so = std::forward<SO>(source); + state->on_subscribe = [so](subscriber<T> o) mutable { so.on_subscribe(std::move(o)); }; } @@ -74,28 +88,29 @@ public: typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type()); } - void on_subscribe(observer<T> o) const { + void on_subscribe(subscriber<T> o) const { state->on_subscribe(std::move(o)); } - template<class Observer> - typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type - on_subscribe(Observer o) const { - auto so = std::make_shared<Observer>(o); - state->on_subscribe(make_observer_dynamic<T>( - so->get_subscription(), - // on_next - [so](T t){ - so->on_next(t); - }, - // on_error - [so](std::exception_ptr e){ - so->on_error(e); - }, - // on_completed - [so](){ - so->on_completed(); - })); + template<class Subscriber> + typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type + on_subscribe(Subscriber&& o) const { + auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); + state->on_subscribe(make_subscriber<T>( + *so, + make_observer_dynamic<T>( + // on_next + [so](T t){ + so->on_next(t); + }, + // on_error + [so](std::exception_ptr e){ + so->on_error(e); + }, + // on_completed + [so](){ + so->on_completed(); + }))); } }; @@ -112,17 +127,23 @@ class observable protected: typedef observable<T, SourceOperator> this_type; - mutable SourceOperator source_operator; + typedef typename std::decay<SourceOperator>::type source_operator_type; + mutable source_operator_type source_operator; private: template<class U, class SO> friend class observable; - template<class Observer> - auto detail_subscribe(Observer o, tag_observer&&) const - -> decltype(make_subscription(o)) { + template<class Subscriber> + auto detail_subscribe(Subscriber&& scrbr) const + -> decltype(make_subscription(*(typename std::decay<Subscriber>::type*)nullptr)) { - static_assert(is_observer<Observer>::value, "subscribe must be passed an observer"); + typedef typename std::decay<Subscriber>::type subscriber_type; + subscriber_type o = std::forward<Subscriber>(scrbr); + + static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer"); + static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible"); + static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); if (!o.is_subscribed()) { return make_subscription(o); @@ -154,51 +175,20 @@ private: return make_subscription(o); } - struct tag_function {}; - template<class OnNext> - auto detail_subscribe(OnNext n, tag_function&&) const - -> decltype(make_subscription( make_observer<T>(std::move(n)))) { - return subscribe( make_observer<T>(std::move(n))); - } - - template<class OnNext, class OnError> - auto detail_subscribe(OnNext n, OnError e, tag_function&&) const - -> decltype(make_subscription( make_observer<T>(std::move(n), std::move(e)))) { - return subscribe( make_observer<T>(std::move(n), std::move(e))); - } - - template<class OnNext, class OnError, class OnCompleted> - auto detail_subscribe(OnNext n, OnError e, OnCompleted c, tag_function&&) const - -> decltype(make_subscription( make_observer<T>(std::move(n), std::move(e), std::move(c)))) { - return subscribe( make_observer<T>(std::move(n), std::move(e), std::move(c))); - } - - template<class OnNext> - auto detail_subscribe(composite_subscription cs, OnNext n, tag_subscription&&) const - -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n)))) { - return subscribe( make_observer<T>(std::move(cs), std::move(n))); - } - - template<class OnNext, class OnError> - auto detail_subscribe(composite_subscription cs, OnNext n, OnError e, tag_subscription&&) const - -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n), std::move(e)))) { - return subscribe( make_observer<T>(std::move(cs), std::move(n), std::move(e))); - } - public: typedef T value_type; - static_assert(rxo::is_operator<SourceOperator>::value || rxs::is_source<SourceOperator>::value, "observable must wrap an operator or source"); + static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source"); observable() { } - explicit observable(const SourceOperator& o) + explicit observable(const source_operator_type& o) : source_operator(o) { } - explicit observable(SourceOperator&& o) + explicit observable(source_operator_type&& o) : source_operator(std::move(o)) { } @@ -228,54 +218,66 @@ public: return *this; } - template<class Arg> - auto subscribe(Arg a) const - -> decltype(detail_subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, tag_function>::type())) { - return detail_subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, tag_function>::type()); + template<class Arg0> + auto subscribe(Arg0&& a0) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0))); + } + + template<class Arg0, class Arg1> + auto subscribe(Arg0&& a0, Arg1&& a1) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1))); + } + + template<class Arg0, class Arg1, class Arg2> + auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); } - template<class Arg1, class Arg2> - auto subscribe(Arg1 a1, Arg2 a2) const - -> decltype(detail_subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type())) { - return detail_subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type()); + template<class Arg0, class Arg1, class Arg2, class Arg3> + auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); } - template<class Arg1, class Arg2, class Arg3> - auto subscribe(Arg1 a1, Arg2 a2, Arg3 a3) const - -> decltype(detail_subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type())) { - return detail_subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type()); + template<class Arg0, class Arg1, class Arg2, class Arg3, class Arg4> + auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4))); } - template<class OnNext, class OnError, class OnCompleted> - auto subscribe(composite_subscription cs, OnNext n, OnError e, OnCompleted c) const - -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c)))) { - return subscribe( make_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c))); + template<class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5> + auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) const + -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) { + return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))); } template<class Predicate> - auto filter(Predicate p) const + auto filter(Predicate&& p) const -> observable<T, rxo::detail::filter<T, observable, Predicate>> { return observable<T, rxo::detail::filter<T, observable, Predicate>>( - rxo::detail::filter<T, observable, Predicate>(*this, std::move(p))); + rxo::detail::filter<T, observable, Predicate>(*this, std::forward<Predicate>(p))); } template<class Selector> - auto map(Selector s) const + auto map(Selector&& s) const -> observable<typename rxo::detail::map<observable, Selector>::value_type, rxo::detail::map<observable, Selector>> { return observable<typename rxo::detail::map<observable, Selector>::value_type, rxo::detail::map<observable, Selector>>( - rxo::detail::map<observable, Selector>(*this, std::move(s))); + rxo::detail::map<observable, Selector>(*this, std::forward<Selector>(s))); } template<class CollectionSelector, class ResultSelector> - auto flat_map(CollectionSelector s, ResultSelector rs) const + auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const -> observable<typename rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>> { return observable<typename rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>>( - rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>(*this, std::move(s), std::move(rs))); + rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs))); } template<class OperatorFactory> auto op(OperatorFactory&& of) const - -> decltype(of(*(this_type*)nullptr)) { + -> decltype(of(*(const this_type*)nullptr)) { static_assert(detail::is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); return of(*this); } diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp index c5cb7a2..8d2ddcf 100644 --- a/Rx/v2/src/rxcpp/rx-observer.hpp +++ b/Rx/v2/src/rxcpp/rx-observer.hpp @@ -51,7 +51,7 @@ public: using std::swap; swap(s, o.s); } - typename this_type::subscription_type get_subscription() { + const typename this_type::subscription_type& get_subscription() const { return s; } bool is_subscribed() const { @@ -75,7 +75,7 @@ class is_observer template<class C> static void check(...); public: - static const bool value = std::is_convertible<decltype(check<T>(0)), tag_observer>::value; + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_observer>::value; }; namespace detail { @@ -102,7 +102,7 @@ struct is_on_next_of template<class CT, class CF> static not_void check(...); - typedef decltype(check<T, F>(0)) detail_result; + typedef decltype(check<T, typename std::decay<F>::type>(0)) detail_result; static const bool value = std::is_same<detail_result, void>::value; }; @@ -115,7 +115,7 @@ struct is_on_error template<class CF> static not_void check(...); - static const bool value = std::is_same<decltype(check<F>(0)), void>::value; + static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value; }; template<class F> @@ -127,7 +127,7 @@ struct is_on_completed template<class CF> static not_void check(...); - static const bool value = std::is_same<decltype(check<F>(0)), void>::value; + static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value; }; } @@ -147,215 +147,17 @@ private: on_error_t onerror; on_completed_t oncompleted; - struct tag_this_type {}; - struct tag_function {}; - - template<class Arg> - struct resolve_tag - { - typedef typename std::conditional<std::is_same<typename std::decay<Arg>::type, dynamic_observer<T>>::value, tag_this_type, - typename std::conditional<is_subscription<Arg>::value, tag_subscription, tag_function>::type - >::type type; - }; - - template<class Arg, class Tag = typename resolve_tag<Arg>::type> - struct resolve_cs; - template<class Arg> - struct resolve_cs<Arg, tag_this_type> - { - auto operator()(const Arg& a) - -> decltype(a.cs) { - return a.cs; - } - auto operator()(Arg&& a) - -> decltype(a.cs) { - return std::move(a.cs); - } - }; - template<class Arg> - struct resolve_cs<Arg, tag_subscription> - { - Arg operator()(Arg a) { - return std::move(a); - } - }; - template<class Arg> - struct resolve_cs<Arg, tag_function> - { - composite_subscription operator()(const Arg& a) { - return composite_subscription(); - } - }; - - template<class Arg, class Tag = typename resolve_tag<Arg>::type> - struct resolve_onnext; - template<class Arg> - struct resolve_onnext<Arg, tag_this_type> - { - auto operator()(const Arg& a) - -> decltype(a.onnext) { - return a.onnext; - } - auto operator()(Arg&& a) - -> decltype(a.onnext) { - return std::move(a.onnext); - } - }; - template<class Arg> - struct resolve_onnext<Arg, tag_subscription> - { - template<class Arg1> - on_next_t operator()(const Arg1& a1) { - return on_next_t(); - } - template<class Arg1, class Arg2> - Arg2 operator()(const Arg1&, Arg2 a2) { - static_assert(detail::is_on_next_of<T, Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value, - "Function supplied for on_next must be a function with the signature void(T);"); - return std::move(a2); - } - }; - template<class Arg> - struct resolve_onnext<Arg, tag_function> - { - template<class Arg1> - Arg1 operator()(Arg1 a1) { - static_assert(detail::is_on_next_of<T, Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value, - "Function supplied for on_next must be a function with the signature void(T);"); - return std::move(a1); - } - template<class Arg1, class Arg2> - Arg operator()(Arg1 a1, const Arg2&) { - static_assert(detail::is_on_next_of<T, Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value, - "Function supplied for on_next must be a function with the signature void(T);"); - return std::move(a1); - } - }; - - template<class Arg, class Tag = typename resolve_tag<Arg>::type> - struct resolve_onerror; - template<class Arg> - struct resolve_onerror<Arg, tag_this_type> - { - auto operator()(const Arg& a) - -> decltype(a.onerror) { - return a.onerror; - } - auto operator()(Arg&& a) - -> decltype(a.onerror) { - return std::move(a.onerror); - } - }; - template<class Arg> - struct resolve_onerror<Arg, tag_subscription> - { - template<class Arg1> - on_error_t operator()(const Arg1& a1) { - return on_error_t(); - } - template<class Arg1, class Arg2> - Arg2 operator()(const Arg1&, Arg2 a2) { - static_assert(detail::is_on_error<Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value, - "Function supplied for on_error must be a function with the signature void(std::exception_ptr);"); - return std::move(a2); - } - }; - template<class Arg> - struct resolve_onerror<Arg, tag_function> - { - template<class Arg1> - on_error_t operator()(const Arg1& a1) { - return on_error_t(); - } - template<class Arg1, class Arg2> - Arg1 operator()(Arg1 a1, const Arg2&) { - static_assert(detail::is_on_error<Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value, - "Function supplied for on_error must be a function with the signature void(std::exception_ptr);"); - return std::move(a1); - } - }; - - template<class Arg, class Tag = typename resolve_tag<Arg>::type> - struct resolve_oncompleted; - template<class Arg> - struct resolve_oncompleted<Arg, tag_this_type> - { - auto operator()(const Arg& a) - -> decltype(a.oncompleted) { - return a.oncompleted; - } - auto operator()(Arg&& a) - -> decltype(a.oncompleted) { - return std::move(a.oncompleted); - } - }; - template<class Arg> - struct resolve_oncompleted<Arg, tag_subscription> - { - template<class Arg1> - on_completed_t operator()(const Arg1& a1) { - return on_completed_t(); - } - template<class Arg1, class Arg2> - Arg2 operator()(const Arg1&, Arg2 a2) { - static_assert(detail::is_on_completed<Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value, - "Function supplied for on_completed must be a function with the signature void();"); - return std::move(a2); - } - }; - template<class Arg> - struct resolve_oncompleted<Arg, tag_function> - { - template<class Arg1> - on_completed_t operator()(const Arg1& a1) { - return on_completed_t(); - } - template<class Arg1, class Arg2> - Arg1 operator()(Arg1 a1, const Arg2&) { - static_assert(detail::is_on_completed<Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value, - "Function supplied for on_completed must be a function with the signature void();"); - return std::move(a1); - } - }; - public: dynamic_observer() { } - template<class Arg> - explicit dynamic_observer(Arg a) - : base_type(resolve_cs<Arg>()(a)) - , onnext(resolve_onnext<Arg>()(a)) - , onerror(resolve_onerror<Arg>()(a)) - , oncompleted(resolve_oncompleted<Arg>()(a)) - { - } - - template<class Arg1, class Arg2> - dynamic_observer(Arg1 a1, Arg2 a2) - : base_type(resolve_cs<Arg1>()(a1)) - , onnext(resolve_onnext<Arg1>()(a1, a2)) - , onerror(resolve_onerror<Arg1>()(a2, on_error_t())) - , oncompleted(resolve_oncompleted<Arg1>()(on_completed_t(), on_completed_t())) - { - } - - template<class Arg1, class Arg2, class Arg3> - dynamic_observer(Arg1 a1, Arg2 a2, Arg3 a3) - : base_type(resolve_cs<Arg1>()(a1)) - , onnext(resolve_onnext<Arg1>()(a1, a2)) - , onerror(resolve_onerror<Arg1>()(a2, a3)) - , oncompleted(resolve_oncompleted<Arg1>()(a3, on_completed_t())) - { - } - template<class OnNext, class OnError, class OnCompleted> - dynamic_observer(composite_subscription cs, OnNext n, OnError e, OnCompleted c) + dynamic_observer(composite_subscription cs, OnNext&& n, OnError&& e, OnCompleted&& c) : base_type(std::move(cs)) - , onnext(std::move(n)) - , onerror(std::move(e)) - , oncompleted(std::move(c)) + , onnext(std::forward<OnNext>(n)) + , onerror(std::forward<OnError>(e)) + , oncompleted(std::forward<OnCompleted>(c)) { static_assert(detail::is_on_next_of<T, OnNext>::value || std::is_same<OnNext, std::nullptr_t>::value, "Function supplied for on_next must be a function with the signature void(T);"); @@ -394,13 +196,14 @@ public: } }; -template<class T, class OnNext, class OnError = detail::OnErrorEmpty, class OnCompleted = detail::OnCompletedEmpty> +template<class T, class OnNext, class OnError, class OnCompleted> class static_observer : public observer_base<T> { public: - typedef OnNext on_next_t; - typedef OnError on_error_t; - typedef OnCompleted on_completed_t; + typedef static_observer<T, OnNext, OnError, OnCompleted> this_type; + typedef typename std::decay<OnNext>::type on_next_t; + typedef typename std::decay<OnError>::type on_error_t; + typedef typename std::decay<OnCompleted>::type on_completed_t; private: on_next_t onnext; @@ -412,39 +215,32 @@ public: static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(std::exception_ptr);"); static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();"); - explicit static_observer(on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) - : observer_base<T>(composite_subscription()) - , onnext(std::move(n)) - , onerror(std::move(e)) - , oncompleted(std::move(c)) - { - } - explicit static_observer(composite_subscription cs, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) + explicit static_observer(composite_subscription cs, on_next_t n, on_error_t e, on_completed_t c) : observer_base<T>(std::move(cs)) , onnext(std::move(n)) , onerror(std::move(e)) , oncompleted(std::move(c)) { } - static_observer(const static_observer& o) + static_observer(const this_type& o) : observer_base<T>(o) , onnext(o.onnext) , onerror(o.onerror) , oncompleted(o.oncompleted) { } - static_observer(static_observer&& o) + static_observer(this_type&& o) : observer_base<T>(std::move(o)) , onnext(std::move(o.onnext)) , onerror(std::move(o.onerror)) , oncompleted(std::move(o.oncompleted)) { } - static_observer& operator=(static_observer o) { + static_observer& operator=(this_type o) { swap(o); return *this; } - void swap(static_observer& o) { + void swap(this_type& o) { using std::swap; observer_base<T>::swap(o); swap(onnext, o.onnext); @@ -466,8 +262,8 @@ public: template<class T, class I> class observer : public observer_root<T> { - typedef observer this_type; - typedef typename std::conditional<is_observer<I>::value, I, dynamic_observer<T>>::type inner_t; + typedef observer<T, I> this_type; + typedef typename std::conditional<is_observer<I>::value, typename std::decay<I>::type, dynamic_observer<T>>::type inner_t; struct detacher { @@ -515,7 +311,7 @@ public: inner.on_completed(); } } - typename this_type::subscription_type get_subscription() { + const typename this_type::subscription_type& get_subscription() const { return inner.get_subscription(); } bool is_subscribed() const { @@ -545,7 +341,7 @@ public: } void on_completed() const { } - typename this_type::subscription_type get_subscription() { + const typename this_type::subscription_type& get_subscription() const { static typename this_type::subscription_type result; result.unsubscribe(); return result; @@ -571,115 +367,135 @@ auto make_observer() namespace detail { -struct tag_require_observer {}; - -struct tag_require_function {}; - -template<class I> -auto make_observer(I i, tag_observer&&) - -> observer<typename I::value_type, I> { - return observer<typename I::value_type, I>(std::move(i)); +template<class T, class ResolvedArgSet> +auto make_observer_resolved(ResolvedArgSet&& rs) + -> observer<T, static_observer<T, typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type::result_type>> { + return make_observer_resolved<T, static_observer<T, typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type::result_type>>(std::forward<ResolvedArgSet>(rs)); } - -struct tag_function {}; -template<class T, class OnNext> -auto make_observer(OnNext n, tag_function&&) - -> observer<T, static_observer<T, OnNext>> { - return observer<T, static_observer<T, OnNext>>( - static_observer<T, OnNext>(std::move(n))); +template<class T, class ResolvedArgSet> +auto make_observer_dynamic_resolved(ResolvedArgSet&& rs) + -> observer<T, dynamic_observer<T>> { + return make_observer_resolved<T, dynamic_observer<T>>(std::forward<ResolvedArgSet>(rs)); } -template<class T, class OnNext, class OnError> -auto make_observer(OnNext n, OnError e, tag_function&&) - -> observer<T, static_observer<T, OnNext, OnError>> { - return observer<T, static_observer<T, OnNext, OnError>>( - static_observer<T, OnNext, OnError>(std::move(n), std::move(e))); +template<class T, class I, class ResolvedArgSet> +auto make_observer_resolved(ResolvedArgSet&& rs) + -> observer<T, I> { + typedef I inner_type; + return observer<T, inner_type>(inner_type( + std::get<0>(std::forward<ResolvedArgSet>(rs)).value, + std::move(std::get<1>(std::forward<ResolvedArgSet>(rs)).value), + std::move(std::get<2>(std::forward<ResolvedArgSet>(rs)).value), + std::move(std::get<3>(std::forward<ResolvedArgSet>(rs)).value))); + + typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rn_t; + typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type re_t; + typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type rc_t; + + static_assert(rn_t::is_arg, "onnext is a required parameter"); + static_assert(!(rn_t::is_arg && re_t::is_arg) || rn_t::n + 1 == re_t::n, "onnext, onerror parameters must be together and in order"); + static_assert(!(re_t::is_arg && rc_t::is_arg) || re_t::n + 1 == rc_t::n, "onerror, oncompleted parameters must be together and in order"); + static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order"); } -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_observer(OnNext n, OnError e, OnCompleted c, tag_function&&) - -> observer<T, static_observer<T, OnNext, OnError, OnCompleted>> { - return observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( - static_observer<T, OnNext, OnError, OnCompleted>(std::move(n), std::move(e), std::move(c))); -} +struct tag_subscription_resolution +{ + template<class LHS> + struct predicate + { + static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value; + }; + typedef composite_subscription default_type; +}; -template<class T, class OnNext> -auto make_observer(composite_subscription cs, OnNext n, tag_subscription&&) - -> observer<T, static_observer<T, OnNext>> { - return observer<T, static_observer<T, OnNext>>( - static_observer<T, OnNext>(std::move(cs), std::move(n))); -} +template<class T> +struct tag_onnext_resolution +{ + template<class LHS> + struct predicate + { + static const bool value = is_on_next_of<T, LHS>::value; + }; + typedef OnNextEmpty<T> default_type; +}; -template<class T, class OnNext, class OnError> -auto make_observer(composite_subscription cs, OnNext n, OnError e, tag_subscription&&) - -> observer<T, static_observer<T, OnNext, OnError>> { - return observer<T, static_observer<T, OnNext, OnError>>( - static_observer<T, OnNext, OnError>(std::move(cs), std::move(n), std::move(e))); -} +struct tag_onerror_resolution +{ + template<class LHS> + struct predicate + { + static const bool value = is_on_error<LHS>::value; + }; + typedef OnErrorEmpty default_type; +}; -} +struct tag_oncompleted_resolution +{ + template<class LHS> + struct predicate + { + static const bool value = is_on_completed<LHS>::value; + }; + typedef OnCompletedEmpty default_type; +}; -template<class Arg> -auto make_observer(Arg a) - -> decltype(detail::make_observer(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_require_observer>::type())) { - return detail::make_observer(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_require_observer>::type()); -} +// types to disambiguate +// on_next and optional on_error, on_completed + +// optional subscription +// -template<class T, class Arg> -auto make_observer(Arg a) - -> decltype(detail::make_observer<T>(std::move(a), typename std::conditional<is_observer<Arg>::value, detail::tag_require_function, detail::tag_function>::type())) { - return detail::make_observer<T>(std::move(a), typename std::conditional<is_observer<Arg>::value, detail::tag_require_function, detail::tag_function>::type()); -} +template<class T> +struct tag_observer_set + : public rxu::detail::tag_set<tag_subscription_resolution, + rxu::detail::tag_set<tag_onnext_resolution<T>, + rxu::detail::tag_set<tag_onerror_resolution, + rxu::detail::tag_set<tag_oncompleted_resolution>>>> +{ +}; -template<class T, class Arg1, class Arg2> -auto make_observer(Arg1 a1, Arg2 a2) - -> decltype(detail::make_observer<T>(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) { - return detail::make_observer<T>(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type()); -} -template<class T, class Arg1, class Arg2, class Arg3> -auto make_observer(Arg1 a1, Arg2 a2, Arg3 a3) - -> decltype(detail::make_observer<T>(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) { - return detail::make_observer<T>(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type()); } -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_observer(composite_subscription cs, OnNext n, OnError e, OnCompleted c) - -> observer<T, static_observer<T, OnNext, OnError, OnCompleted>> { - return observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( - static_observer<T, OnNext, OnError, OnCompleted>(std::move(cs), std::move(n), std::move(e), std::move(c))); +template<class T, class Arg0> +auto make_observer(Arg0&& a0) + -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)))) { + return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0))); } - -template<class T, class OnNext> -auto make_observer_dynamic(OnNext n) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(n))); +template<class T, class Arg0, class Arg1> +auto make_observer(Arg0&& a0, Arg1&& a1) + -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) { + return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1))); } -template<class T, class OnNext, class OnError> -auto make_observer_dynamic(OnNext n, OnError e) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(n), std::move(e))); +template<class T, class Arg0, class Arg1, class Arg2> +auto make_observer(Arg0&& a0, Arg1&& a1, Arg2&& a2) + -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) { + return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); } -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_observer_dynamic(OnNext n, OnError e, OnCompleted c) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(n), std::move(e), std::move(c))); +template<class T, class Arg0, class Arg1, class Arg2, class Arg3> +auto make_observer(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) + -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) { + return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); } -template<class T, class OnNext> -auto make_observer_dynamic(composite_subscription cs, OnNext n) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n))); +template<class T, class Arg0> +auto make_observer_dynamic(Arg0&& a0) + -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)))) { + return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0))); } -template<class T, class OnNext, class OnError> -auto make_observer_dynamic(composite_subscription cs, OnNext n, OnError e) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n), std::move(e))); +template<class T, class Arg0, class Arg1> +auto make_observer_dynamic(Arg0&& a0, Arg1&& a1) + -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) { + return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1))); } -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_observer_dynamic(composite_subscription cs, OnNext n, OnError e, OnCompleted c) - -> observer<T> { - return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c))); +template<class T, class Arg0, class Arg1, class Arg2> +auto make_observer_dynamic(Arg0&& a0, Arg1&& a1, Arg2&& a2) + -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) { + return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); +} +template<class T, class Arg0, class Arg1, class Arg2, class Arg3> +auto make_observer_dynamic(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) + -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) { + return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); } } diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp index 81c2292..0a25131 100644 --- a/Rx/v2/src/rxcpp/rx-predef.hpp +++ b/Rx/v2/src/rxcpp/rx-predef.hpp @@ -15,6 +15,19 @@ class dynamic_observer; template<class T, class I = dynamic_observer<T>> class observer; +struct tag_subscriber {}; +template<class T> +class is_subscriber +{ + struct not_void {}; + template<class C> + static typename C::subscriber_tag* check(int); + template<class C> + static not_void check(...); +public: + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_subscriber*>::value; +}; + template<class T> class dynamic_observable; diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 1d53ab5..7bca7ad 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -9,29 +9,36 @@ namespace rxcpp { -struct tag_subscriber {}; template<class T> struct subscriber_base : public observer_root<T>, public subscription_base, public resumption_base { typedef tag_subscriber subscriber_tag; }; -template<class T> -class is_subscriber -{ - template<class C> - static typename C::subscriber_tag* check(int); - template<class C> - static void check(...); -public: - static const bool value = std::is_convertible<decltype(check<T>(0)), tag_subscriber*>::value; -}; template<class T, class Observer = observer<T>> class subscriber : public subscriber_base<T> { + typedef subscriber<T, Observer> this_type; + typedef typename std::decay<Observer>::type observer_type; + composite_subscription lifetime; resumption controller; - Observer destination; + observer_type destination; + + struct detacher + { + ~detacher() + { + if (that) { + that->unsubscribe(); + } + } + detacher(const this_type* that) + : that(that) + { + } + const this_type* that; + }; public: typedef typename composite_subscription::weak_subscription weak_subscription; @@ -40,20 +47,30 @@ public: subscriber() { } - subscriber(composite_subscription cs, resumption r, Observer o) + template<class U> + subscriber(composite_subscription cs, resumption r, U&& o) : lifetime(std::move(cs)) - , destination(std::move(o)) , controller(std::move(r)) + , destination(std::forward<U>(o)) { } - Observer get_observer() const { + const observer_type& get_observer() const { + return destination; + } + observer_type& get_observer() { return destination; } - resumption get_resumption() const { + const resumption& get_resumption() const { return controller; } - composite_subscription get_subscription() const { + resumption& get_resumption() { + return controller; + } + const composite_subscription& get_subscription() const { + return lifetime; + } + composite_subscription& get_subscription() { return lifetime; } @@ -69,13 +86,23 @@ public: // observer // void on_next(T t) const { - destination.on_next(std::move(t)); + if (is_subscribed()) { + detacher protect(this); + destination.on_next(std::move(t)); + protect.that = nullptr; + } } void on_error(std::exception_ptr e) const { - destination.on_error(e); + if (is_subscribed()) { + detacher protect(this); + destination.on_error(e); + } } void on_completed() const { - destination.on_completed(); + if (is_subscribed()) { + detacher protect(this); + destination.on_completed(); + } } // composite_subscription @@ -101,22 +128,7 @@ public: }; -template<class T, class ResolvedArgSet> -auto make_observer_resolved(ResolvedArgSet&& rs) - -> observer<T, static_observer<T, decltype (std::get<2>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<3>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<4>(std::forward<ResolvedArgSet>(rs)).value)>> { - typedef static_observer<T, decltype (std::get<2>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<3>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<4>(std::forward<ResolvedArgSet>(rs)).value)> inner_type; - return observer<T, inner_type>(inner_type( - std::move(std::get<2>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<3>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<4>(std::forward<ResolvedArgSet>(rs)).value))); - - typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t; - typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t; - typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t; - - static_assert(rn_t::is_arg, "onnext is a required parameter"); - static_assert(!(rn_t::is_arg && re_t::is_arg) || rn_t::n + 1 == re_t::n, "onnext, onerror parameters must be together and in order"); - static_assert(!(re_t::is_arg && rc_t::is_arg) || re_t::n + 1 == rc_t::n, "onerror, oncompleted parameters must be together and in order"); - static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order"); -} +namespace detail { template<class T, bool subscriber_is_arg, bool observer_is_arg, bool onnext_is_arg> struct observer_selector; @@ -126,8 +138,8 @@ struct observer_selector<T, subscriber_is_arg, true, false> { template<class Set> static auto get_observer(Set&& rs) - -> decltype(std::get<5>(std::forward<Set>(rs))) { - return std::get<5>(std::forward<Set>(rs)); + -> decltype(std::get<5>(std::forward<Set>(rs)).value) { + return std::get<5>(std::forward<Set>(rs)).value; } }; template<class T, bool subscriber_is_arg> @@ -136,6 +148,9 @@ struct observer_selector<T, subscriber_is_arg, false, true> template<class Set> static auto get_observer(Set&& rs) -> decltype(make_observer_resolved<T>(std::forward<Set>(rs))) { + if (!std::get<0>(std::forward<Set>(rs)).value.is_subscribed()) { + abort(); + } return make_observer_resolved<T>(std::forward<Set>(rs)); } }; @@ -144,20 +159,23 @@ struct observer_selector<T, true, false, false> { template<class Set> static auto get_observer(Set&& rs) - -> decltype(std::get<6>(std::forward<Set>(rs)).value.get_observer()) { - return std::get<6>(std::forward<Set>(rs)).value.get_observer(); + -> const decltype( std::get<6>(std::forward<Set>(rs)).value.get_observer())& { + return std::get<6>(std::forward<Set>(rs)).value.get_observer(); } }; template<class T, class ResolvedArgSet> auto select_observer(ResolvedArgSet&& rs) - -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) { - return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs)); + -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) { + if (!std::get<0>(std::forward<ResolvedArgSet>(rs)).value.is_subscribed()) { + abort(); + } + return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs)); - typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rr_t; - typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t; - typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t; - typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t; + typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rr_t; + typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rn_t; + typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type re_t; + typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type rc_t; typedef typename std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type ro_t; typedef typename std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type rs_t; @@ -166,20 +184,27 @@ auto select_observer(ResolvedArgSet&& rs) } template<class T, class ResolvedArgSet> -auto make_subscriber_resolved(ResolvedArgSet&& rs) - -> subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))> { - auto rsub = std::get<0>(std::forward<ResolvedArgSet>(rs)); - auto rr = std::get<1>(std::forward<ResolvedArgSet>(rs)); - const auto& rscrbr = std::get<6>(std::forward<ResolvedArgSet>(rs)); - auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : std::move(rr.value); - auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : std::move(rsub.value); - return subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))>( - std::move(s), std::move(r), select_observer<T>(std::forward<ResolvedArgSet>(rs))); - +auto make_subscriber_resolved(ResolvedArgSet&& rsArg) + -> subscriber<T, decltype( select_observer<T>(std::move(rsArg)))> { + const auto rs = std::forward<ResolvedArgSet>(rsArg); + if (!std::get<0>(rs).value.is_subscribed()) { + abort(); + } + const auto rsub = std::get<0>(rs); + const auto rr = std::get<4>(rs); + const auto rscrbr = std::get<6>(rs); + const auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : rr.value; + const auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : rsub.value; + return subscriber<T, decltype( select_observer<T>(std::move(rsArg)))>( + s, r, select_observer<T>(std::move(rs))); + +// at least for now do not enforce resolver +#if 0 typedef typename std::decay<decltype(rr)>::type rr_t; typedef typename std::decay<decltype(rscrbr)>::type rs_t; static_assert(rs_t::is_arg || rr_t::is_arg, "at least one of; resumption or subscriber is a required parameter"); +#endif } template<class T> @@ -203,16 +228,6 @@ struct tag_observer_resolution typedef observer<T, void> default_type; }; -struct tag_subscription_resolution -{ - template<class LHS> - struct predicate - { - static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value; - }; - typedef composite_subscription default_type; -}; - struct tag_resumption_resolution { template<class LHS> @@ -224,34 +239,6 @@ struct tag_resumption_resolution }; -template<class T> -struct tag_onnext_resolution -{ - template<class LHS> - struct predicate : public detail::is_on_next_of<T, LHS> - { - }; - typedef detail::OnNextEmpty<T> default_type; -}; - -struct tag_onerror_resolution -{ - template<class LHS> - struct predicate : public detail::is_on_error<LHS> - { - }; - typedef detail::OnErrorEmpty default_type; -}; - -struct tag_oncompleted_resolution -{ - template<class LHS> - struct predicate : public detail::is_on_completed<LHS> - { - }; - typedef detail::OnCompletedEmpty default_type; -}; - // types to disambiguate // subscriber with override for subscription, observer, resumption | // optional subscriber + @@ -262,45 +249,48 @@ struct tag_oncompleted_resolution template<class T> struct tag_subscriber_set + // the first four must be the same as tag_observer_set or the indexing will fail : public rxu::detail::tag_set<tag_subscription_resolution, - rxu::detail::tag_set<tag_resumption_resolution, rxu::detail::tag_set<tag_onnext_resolution<T>, rxu::detail::tag_set<tag_onerror_resolution, rxu::detail::tag_set<tag_oncompleted_resolution, + rxu::detail::tag_set<tag_resumption_resolution, rxu::detail::tag_set<tag_observer_resolution<T>, rxu::detail::tag_set<tag_subscriber_resolution<T>>>>>>>> { }; +} + template<class T, class Arg0> auto make_subscriber(Arg0&& a0) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0))); } template<class T, class Arg0, class Arg1> auto make_subscriber(Arg0&& a0, Arg1&& a1) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1))); } template<class T, class Arg0, class Arg1, class Arg2> auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); } template<class T, class Arg0, class Arg1, class Arg2, class Arg3> auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); } template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4> auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4))); } template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5> auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) - -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) { - return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))); + -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) { + return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))); } } diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index ccb8e06..6adba54 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -57,7 +57,7 @@ public: template<class Unsubscribe> class static_subscription : public subscription_base { - typedef Unsubscribe unsubscribe_call_type; + typedef typename std::decay<Unsubscribe>::type unsubscribe_call_type; unsubscribe_call_type unsubscribe_call; static_subscription() { @@ -83,7 +83,7 @@ public: template<class I> class subscription : public subscription_base { - typedef I inner_t; + typedef typename std::decay<I>::type inner_t; inner_t inner; mutable bool issubscribed; public: @@ -120,17 +120,17 @@ inline auto make_subscription() return subscription<void>(); } template<class I> -auto make_subscription(I i) +auto make_subscription(I&& i) -> typename std::enable_if<is_subscription<I>::value, subscription<I>>::type { - return subscription<I>(std::move(i)); + return subscription<I>(std::forward<I>(i)); } template<class Unsubscribe> -auto make_subscription(Unsubscribe u) +auto make_subscription(Unsubscribe&& u) -> typename std::enable_if<!is_subscription<Unsubscribe>::value, subscription< static_subscription<Unsubscribe>>>::type { return subscription< static_subscription<Unsubscribe>>( - static_subscription<Unsubscribe>(std::move(u))); + static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u))); } class composite_subscription : public subscription_base @@ -222,24 +222,39 @@ public: composite_subscription() : state(std::make_shared<state_t>()) { + if (!state) { + abort(); + } } composite_subscription(const composite_subscription& o) : state(o.state) { + if (!state) { + abort(); + } } composite_subscription(composite_subscription&& o) : state(std::move(o.state)) { + if (!state) { + abort(); + } } composite_subscription& operator=(const composite_subscription& o) { state = o.state; + if (!state) { + abort(); + } return *this; } composite_subscription& operator=(composite_subscription&& o) { state = std::move(o.state); + if (!state) { + abort(); + } return *this; } diff --git a/Rx/v2/src/rxcpp/rx-test.hpp b/Rx/v2/src/rxcpp/rx-test.hpp index 946ec03..e519cf8 100644 --- a/Rx/v2/src/rxcpp/rx-test.hpp +++ b/Rx/v2/src/rxcpp/rx-test.hpp @@ -18,7 +18,7 @@ struct test_subject_base typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; typedef std::shared_ptr<test_subject_base<T>> type; - virtual void on_subscribe(observer<T>) const =0; + virtual void on_subscribe(subscriber<T>) const =0; virtual std::vector<recorded_type> messages() const =0; virtual std::vector<rxn::subscription> subscriptions() const =0; }; @@ -30,29 +30,35 @@ struct test_source explicit test_source(typename test_subject_base<T>::type ts) : ts(std::move(ts)) { + if (!this->ts) abort(); } typename test_subject_base<T>::type ts; - void on_subscribe(observer<T> o) const { + void on_subscribe(subscriber<T> o) const { ts->on_subscribe(std::move(o)); } - template<class Observer> - typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type - on_subscribe(Observer o) const { - auto so = std::make_shared<Observer>(o); - ts->on_subscribe(make_observer_dynamic<T>( - so->get_subscription(), - // on_next - [so](T t){ - so->on_next(t); - }, - // on_error - [so](std::exception_ptr e){ - so->on_error(e); - }, - // on_completed - [so](){ - so->on_completed(); - })); + template<class Subscriber> + typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, subscriber<T>>::value, void>::type + on_subscribe(Subscriber&& o) const { + + static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber."); + + auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); + ts->on_subscribe( + make_subscriber<T>( + *so, + make_observer_dynamic<T>( + // on_next + [so](T t){ + so->on_next(t); + }, + // on_error + [so](std::exception_ptr e){ + so->on_error(e); + }, + // on_completed + [so](){ + so->on_completed(); + }))); } }; diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 53f1b3c..fa6efe9 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -139,16 +139,15 @@ public: if (is_set) { is_set = false; reinterpret_cast<T*>(&storage)->~T(); + std::fill_n(reinterpret_cast<char*>(&storage), sizeof(T), 0); } } - void reset(T value) { - if (is_set) { - *reinterpret_cast<T*>(&storage) = std::move(value); - } else { - new (reinterpret_cast<T*>(&storage)) T(std::move(value)); - is_set = true; - } + template<class U> + void reset(U&& value) { + reset(); + new (reinterpret_cast<T*>(&storage)) T(std::forward<U>(value)); + is_set = true; } maybe& operator=(const T& other) { @@ -212,8 +211,9 @@ struct arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, const Arg4&, result_type a) - : value(std::move(a)) { + template<class R> + arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, const Arg4&, R&& a) + : value(std::forward<R>(a)) { } }; @@ -227,8 +227,9 @@ struct arg_resolver_n<4, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, result_type a, const Arg5&) - : value(std::move(a)) { + template<class R> + arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, R&& a, const Arg5&) + : value(std::forward<R>(a)) { } }; @@ -242,8 +243,9 @@ struct arg_resolver_n<3, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, result_type a, const Arg4&, const Arg5&) - : value(std::move(a)) { + template<class R> + arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, R&& a, const Arg4&, const Arg5&) + : value(std::forward<R>(a)) { } }; @@ -257,8 +259,9 @@ struct arg_resolver_n<2, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(const Arg0&, const Arg1&, result_type a, const Arg3&, const Arg4&, const Arg5&) - : value(std::move(a)) { + template<class R> + arg_resolver_n(const Arg0&, const Arg1&, R&& a, const Arg3&, const Arg4&, const Arg5&) + : value(std::forward<R>(a)) { } }; @@ -272,8 +275,9 @@ struct arg_resolver_n<1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(const Arg0&, result_type a, const Arg2&, const Arg3&, const Arg4&, const Arg5&) - : value(std::move(a)) { + template<class R> + arg_resolver_n(const Arg0&, R&& a, const Arg2&, const Arg3&, const Arg4&, const Arg5&) + : value(std::forward<R>(a)) { } }; @@ -287,8 +291,9 @@ struct arg_resolver_n<0, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type; typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type; result_type value; - arg_resolver_n(result_type a, const Arg1&, const Arg2&, const Arg3&, const Arg4&, const Arg5&) - : value(std::move(a)) { + template<class R> + arg_resolver_n(R&& a, const Arg1&, const Arg2&, const Arg3&, const Arg4&, const Arg5&) + : value(std::forward<R>(a)) { } }; @@ -311,7 +316,7 @@ struct tag_unresolvable {}; template<template<class Arg> class Predicate, class Default, class Arg0 = tag_unresolvable, class Arg1 = tag_unresolvable, class Arg2 = tag_unresolvable, class Arg3 = tag_unresolvable, class Arg4 = tag_unresolvable, class Arg5 = tag_unresolvable> struct arg_resolver { - typedef typename arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type type; + typedef typename arg_resolver_n<5, Predicate, Default, typename std::decay<Arg0>::type, typename std::decay<Arg1>::type, typename std::decay<Arg2>::type, typename std::decay<Arg3>::type, typename std::decay<Arg4>::type, typename std::decay<Arg5>::type>::type type; }; @@ -323,54 +328,54 @@ auto resolve_arg() template<template<class Arg> class Predicate, class Default, class Arg0> -auto resolve_arg(Arg0 a0) --> decltype(typename arg_resolver<Predicate, Default, Arg0>::type(std::move(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { - return typename arg_resolver<Predicate, Default, Arg0>::type(std::move(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); +auto resolve_arg(Arg0&& a0) +-> decltype(typename arg_resolver<Predicate, Default, Arg0>::type(std::forward<Arg0>(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { + return typename arg_resolver<Predicate, Default, Arg0>::type(std::forward<Arg0>(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); } template<template<class Arg> class Predicate, class Default, class Arg0, class Arg1> -auto resolve_arg(Arg0 a0, Arg1 a1) +auto resolve_arg(Arg0&& a0, Arg1&& a1) -> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1>::type( - std::move(a0), std::move(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { + std::forward<Arg0>(a0), std::forward<Arg1>(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { return typename arg_resolver<Predicate, Default, Arg0, Arg1>::type( - std::move(a0), std::move(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); + std::forward<Arg0>(a0), std::forward<Arg1>(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); } template<template<class Arg> class Predicate, class Default, class Arg0, class Arg1, class Arg2> -auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2) +auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2) -> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2>::type( - std::move(a0), std::move(a1), std::move(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) { return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2>::type( - std::move(a0), std::move(a1), std::move(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable()); } template<template<class Arg> class Predicate, class Default, class Arg0, class Arg1, class Arg2, class Arg3> -auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3) +auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) -> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), tag_unresolvable(), tag_unresolvable())) { + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), tag_unresolvable(), tag_unresolvable())) { return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), tag_unresolvable(), tag_unresolvable()); + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), tag_unresolvable(), tag_unresolvable()); } template<template<class Arg> class Predicate, class Default, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4> -auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3, Arg4 a4) +auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) -> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), tag_unresolvable())) { + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), tag_unresolvable())) { return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), tag_unresolvable()); + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), tag_unresolvable()); } template<template<class Arg> class Predicate, class Default, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5> -auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3, Arg4 a4, Arg5 a5) +auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) -> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), std::move(a5))) { + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))) { return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type( - std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), std::move(a5)); + std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)); } struct arg_resolver_term {}; diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp index 250dbf6..cd3b7ff 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp @@ -28,30 +28,37 @@ public: typedef typename rxn::notification<T> notification_type; typedef rxn::recorded<typename notification_type::type> recorded_type; - static - recorded_type on_next(long ticks, T value) + struct on_next_factory { - return recorded_type(ticks, notification_type::make_on_next(value)); - } - - static - recorded_type on_completed(long ticks) + recorded_type operator()(long ticks, T value) const { + return recorded_type(ticks, notification_type::on_next(value)); + } + }; + struct on_completed_factory { - return recorded_type(ticks, notification_type::make_on_completed()); - } - - template<class Exception> - static - recorded_type on_error(long ticks, Exception e) + recorded_type operator()(long ticks) const { + return recorded_type(ticks, notification_type::on_completed()); + } + }; + struct on_error_factory { - return recorded_type(ticks, notification_type::make_on_error(e)); - } + template<class Exception> + recorded_type operator()(long ticks, Exception&& e) const { + return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e))); + } + }; + + static const on_next_factory on_next; + static const on_completed_factory on_completed; + static const on_error_factory on_error; - static - rxn::subscription subscribe(long subscribe, long unsubscribe) + struct subscribe_factory { - return rxn::subscription(subscribe, unsubscribe); - } + rxn::subscription operator()(long subscribe, long unsubscribe) const { + return rxn::subscription(subscribe, unsubscribe); + } + }; + static const subscribe_factory subscribe; private: ~messages(); @@ -86,27 +93,29 @@ public: using base::start; template<class T, class F> - auto start(F createSource, long created, long subscribed, long unsubscribed) - -> rxt::testable_observer<T> + auto start(F&& createSource, long created, long subscribed, long unsubscribed) + -> subscriber<T, rxt::testable_observer<T>> { + typename std::decay<F>::type createSrc = std::forward<F>(createSource); + struct state_type : public std::enable_shared_from_this<state_type> { - typedef decltype(createSource()) source_type; + typedef decltype(std::forward<F>(createSrc)()) source_type; std::unique_ptr<source_type> source; - rxt::testable_observer<T> o; + subscriber<T, rxt::testable_observer<T>> o; - explicit state_type(rxt::testable_observer<T> o) + explicit state_type(subscriber<T, rxt::testable_observer<T>> o) : source() , o(o) { } }; - std::shared_ptr<state_type> state(new state_type(this->make_observer<T>())); + std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>())); - schedule_absolute(created, make_action([createSource, state](action, scheduler) { - state->source.reset(new typename state_type::source_type(createSource())); + schedule_absolute(created, make_action([createSrc, state](action, scheduler) { + state->source.reset(new typename state_type::source_type(createSrc())); return make_action_empty(); })); schedule_absolute(subscribed, make_action([state](action, scheduler) { @@ -124,17 +133,17 @@ public: } template<class T, class F> - auto start(F createSource, long unsubscribed) - -> rxt::testable_observer<T> + auto start(F&& createSource, long unsubscribed) + -> subscriber<T, rxt::testable_observer<T>> { - return start<T>(std::move(createSource), created_time, subscribed_time, unsubscribed); + return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed); } template<class T, class F> - auto start(F createSource) - -> rxt::testable_observer<T> + auto start(F&& createSource) + -> subscriber<T, rxt::testable_observer<T>> { - return start<T>(std::move(createSource), created_time, subscribed_time, unsubscribed_time); + return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time); } template<class T> @@ -157,9 +166,28 @@ public: template<class T> rxt::testable_observer<T> make_observer(); + + template<class T> + subscriber<T, rxt::testable_observer<T>> make_subscriber(); }; template<class T> +//static +RXCPP_SELECT_ANY const typename test::messages<T>::on_next_factory test::messages<T>::on_next = test::messages<T>::on_next_factory(); + +template<class T> +//static +RXCPP_SELECT_ANY const typename test::messages<T>::on_completed_factory test::messages<T>::on_completed = test::messages<T>::on_completed_factory(); + +template<class T> +//static +RXCPP_SELECT_ANY const typename test::messages<T>::on_error_factory test::messages<T>::on_error = test::messages<T>::on_error_factory(); + +template<class T> +//static +RXCPP_SELECT_ANY const typename test::messages<T>::subscribe_factory test::messages<T>::subscribe = test::messages<T>::subscribe_factory(); + +template<class T> class mock_observer : public rxt::detail::test_subject_base<T> { @@ -175,7 +203,7 @@ public: typename test::shared sc; std::vector<recorded_type> m; - virtual void on_subscribe(observer<T>) const { + virtual void on_subscribe(subscriber<T>) const { abort(); } virtual std::vector<rxn::subscription> subscriptions() const { @@ -193,31 +221,38 @@ rxt::testable_observer<T> test::make_observer() typedef typename rxn::notification<T> notification_type; typedef rxn::recorded<typename notification_type::type> recorded_type; - auto ts = std::make_shared<mock_observer<T>>( - std::static_pointer_cast<test>(shared_from_this())); + std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>( + std::static_pointer_cast<test>(shared_from_this()))); return rxt::testable_observer<T>(ts, make_observer_dynamic<T>( // on_next [ts](T value) { ts->m.push_back( - recorded_type(ts->sc->clock(), notification_type::make_on_next(value))); + recorded_type(ts->sc->clock(), notification_type::on_next(value))); }, // on_error [ts](std::exception_ptr e) { ts->m.push_back( - recorded_type(ts->sc->clock(), notification_type::make_on_error(e))); + recorded_type(ts->sc->clock(), notification_type::on_error(e))); }, // on_completed [ts]() { ts->m.push_back( - recorded_type(ts->sc->clock(), notification_type::make_on_completed())); + recorded_type(ts->sc->clock(), notification_type::on_completed())); })); } template<class T> +subscriber<T, rxt::testable_observer<T>> test::make_subscriber() +{ + auto to = this->make_observer<T>(); + return rxcpp::make_subscriber<T>(to.get_subscription(), to); +} + +template<class T> class cold_observable : public rxt::detail::test_subject_base<T> { @@ -242,7 +277,7 @@ public: { } - virtual void on_subscribe(observer<T> o) const { + virtual void on_subscribe(subscriber<T> o) const { sv.push_back(rxn::subscription(sc->clock())); auto index = sv.size() - 1; @@ -272,8 +307,8 @@ public: template<class T> rxt::testable_observable<T> test::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) { - return rxt::testable_observable<T>(std::make_shared<cold_observable<T>>( - std::static_pointer_cast<test>(shared_from_this()), std::move(messages))); + auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(std::static_pointer_cast<test>(shared_from_this()), std::move(messages))); + return rxt::testable_observable<T>(co); } template<class T> @@ -283,7 +318,7 @@ class hot_observable typedef hot_observable<T> this_type; typename test::shared sc; typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; - typedef observer<T> observer_type; + typedef subscriber<T> observer_type; mutable std::vector<recorded_type> mv; mutable std::vector<rxn::subscription> sv; mutable std::vector<observer_type> observers; diff --git a/Rx/v2/src/rxcpp/sources/rx-range.hpp b/Rx/v2/src/rxcpp/sources/rx-range.hpp index 9a44ba9..418c292 100644 --- a/Rx/v2/src/rxcpp/sources/rx-range.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-range.hpp @@ -31,8 +31,8 @@ struct range : public source_base<T> init.step = s; init.sc = sc; } - template<class I> - void on_subscribe(observer<T, I> o) { + template<class Subscriber> + void on_subscribe(Subscriber o) { auto state = std::make_shared<state_type>(init); state->sc->schedule(o.get_subscription(), [=](rxsc::action that, rxsc::scheduler){ if (state->remaining == 0) { diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index fafdb14..82b05e7 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -18,7 +18,7 @@ class multicast_observer : public observer_base<T> { typedef observer_base<T> base; - typedef observer<T> observer_type; + typedef subscriber<T> observer_type; typedef std::vector<observer_type> list_type; struct mode @@ -66,7 +66,7 @@ class multicast_observer std::copy_if( old->observers.begin(), old->observers.end(), std::inserter(observers, observers.end()), - [](const observer<T>& o){ + [](const observer_type& o){ return o.is_subscribed(); }); } @@ -108,7 +108,7 @@ public: bool has_observers() const { return b->state->has_observers; } - void add(observer<T> o) const { + void add(observer_type o) const { std::unique_lock<std::mutex> guard(b->state->lock); switch (b->state->current) { case mode::Casting: @@ -148,7 +148,7 @@ public: c = b->completer; } for (auto& o : c->observers) { - o.on_next(std::move(t)); + o.on_next(t); } } } @@ -206,7 +206,7 @@ public: return observer<T, detail::multicast_observer<T>>(s); } observable<T> get_observable() const { - return make_dynamic_observable<T>([this](observer<T> o){ + return make_dynamic_observable<T>([this](subscriber<T> o){ this->s.add(std::move(o)); }); } |