diff options
29 files changed, 790 insertions, 238 deletions
diff --git a/Rx/v2/examples/println/main.cpp b/Rx/v2/examples/println/main.cpp index 6904d7f..efe09ca 100644 --- a/Rx/v2/examples/println/main.cpp +++ b/Rx/v2/examples/println/main.cpp @@ -64,5 +64,6 @@ int main(int argc, char** argv) hello_tpl().subscribe(rxu::println(std::cout)); + hello_tpl().subscribe(rxu::print_delimited_by(std::cout, " and "), rxu::endline(std::cout)); return 0; } diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp index 1a9029a..e3a2970 100644 --- a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp @@ -84,9 +84,9 @@ struct buffer_count dest.on_completed(); } - static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, buffer_count_values v) { + static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) { auto cs = d.get_subscription(); - return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(v))); + return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v))); } }; @@ -105,8 +105,8 @@ public: buffer_count_factory(int c, int s) : count(c), skip(s) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip))) { - return source.lift(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip)); + -> decltype(source.template lift<std::vector<typename std::decay<Observable>::type::value_type>>(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip))) { + return source.template lift<std::vector<typename std::decay<Observable>::type::value_type>>(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp index a5ad360..910edda 100644 --- a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp @@ -62,8 +62,8 @@ class distinct_until_changed_factory public: template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(distinct_until_changed<typename std::decay<Observable>::type>::value_type)) { - return source.lift(distinct_until_changed<typename std::decay<Observable>::type>::value_type); + -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(distinct_until_changed<typename std::decay<Observable>::type>::value_type)) { + return source.template lift<typename std::decay<Observable>::type::value_type>(distinct_until_changed<typename std::decay<Observable>::type>::value_type); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-filter.hpp b/Rx/v2/src/rxcpp/operators/rx-filter.hpp index 6dc727c..775bc16 100644 --- a/Rx/v2/src/rxcpp/operators/rx-filter.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-filter.hpp @@ -79,8 +79,8 @@ public: filter_factory(test_type p) : predicate(std::move(p)) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate))) { - return source.lift(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate)); + -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate))) { + return source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-finally.hpp b/Rx/v2/src/rxcpp/operators/rx-finally.hpp index a5d7ef9..6e2b7ad 100644 --- a/Rx/v2/src/rxcpp/operators/rx-finally.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-finally.hpp @@ -76,8 +76,8 @@ public: finally_factory(last_call_type lc) : last_call(std::move(lc)) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call))) { - return source.lift(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call)); + -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call))) { + return source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-lift.hpp b/Rx/v2/src/rxcpp/operators/rx-lift.hpp index 1800984..ad2ec4f 100644 --- a/Rx/v2/src/rxcpp/operators/rx-lift.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-lift.hpp @@ -11,7 +11,7 @@ namespace rxcpp { namespace detail { -template<class S, class F> +template<class V, class S, class F> struct is_lift_function_for { struct tag_not_valid {}; @@ -23,7 +23,7 @@ struct is_lift_function_for { typedef typename std::decay<S>::type for_type; typedef typename std::decay<F>::type func_type; typedef decltype(check<for_type, func_type>(0)) detail_result; - static const bool value = is_subscriber<detail_result>::value && is_subscriber<for_type>::value; + static const bool value = is_subscriber<detail_result>::value && is_subscriber<for_type>::value && std::is_convertible<V, typename detail_result::value_type>::value; }; } @@ -32,25 +32,22 @@ namespace operators { namespace detail { -template<class SourceOperator, class Operator> +template<class ResultType, class SourceOperator, class Operator> struct lift_traits { + typedef typename std::decay<ResultType>::type result_value_type; typedef typename std::decay<SourceOperator>::type source_operator_type; typedef typename std::decay<Operator>::type operator_type; typedef typename source_operator_type::value_type source_value_type; - static_assert(rxcpp::detail::is_lift_function_for<subscriber<source_value_type>, operator_type>::value, "lift Operator must be a function with the signature subscriber<...>(subscriber<source_value_type, ...>)"); - - typedef decltype((*(operator_type*)nullptr)(*(subscriber<source_value_type>*)nullptr)) result_for_dynamic_source_subscriber_type; - - typedef typename result_for_dynamic_source_subscriber_type::value_type result_value_type; + static_assert(rxcpp::detail::is_lift_function_for<source_value_type, subscriber<result_value_type>, operator_type>::value, "lift Operator must be a function with the signature subscriber<source_value_type, ...>(subscriber<result_value_type, ...>)"); }; -template<class SourceOperator, class Operator> -struct lift : public operator_base<typename lift_traits<SourceOperator, Operator>::result_value_type> +template<class ResultType, class SourceOperator, class Operator> +struct lift : public operator_base<typename lift_traits<ResultType, SourceOperator, Operator>::result_value_type> { - typedef lift_traits<SourceOperator, Operator> traits; + typedef lift_traits<ResultType, SourceOperator, Operator> traits; typedef typename traits::source_operator_type source_operator_type; typedef typename traits::operator_type operator_type; source_operator_type source; @@ -70,6 +67,26 @@ struct lift : public operator_base<typename lift_traits<SourceOperator, Operator } }; +template<class ResultType, class Operator> +class lift_factory +{ + typedef typename std::decay<Operator>::type operator_type; + operator_type chain; +public: + lift_factory(operator_type op) : chain(std::move(op)) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<ResultType>(chain)) { + return source.template lift<ResultType>(chain); + } +}; + +} + +template<class ResultType, class Operator> +auto lift(Operator&& op) + -> detail::lift_factory<ResultType, Operator> { + return detail::lift_factory<ResultType, Operator>(std::forward<Operator>(op)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp index 19e31d2..1de7602 100644 --- a/Rx/v2/src/rxcpp/operators/rx-map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp @@ -19,6 +19,7 @@ struct map { typedef typename std::decay<T>::type source_value_type; typedef typename std::decay<Selector>::type select_type; + typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type; select_type selector; map(select_type s) @@ -32,7 +33,7 @@ struct map typedef map_observer<Subscriber> this_type; typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type; typedef typename std::decay<Subscriber>::type dest_type; - typedef observer<value_type, this_type> observer_type; + typedef observer<T, this_type> observer_type; dest_type dest; select_type selector; @@ -58,9 +59,9 @@ struct map dest.on_completed(); } - static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, select_type s) { + static subscriber<T, observer_type> make(dest_type d, select_type s) { auto cs = d.get_subscription(); - return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(s))); + return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(s)))); } }; @@ -80,8 +81,8 @@ public: map_factory(select_type s) : selector(std::move(s)) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(map<typename std::decay<Observable>::type::value_type, select_type>(selector))) { - return source.lift(map<typename std::decay<Observable>::type::value_type, select_type>(selector)); + -> decltype(source.lift<typename map<typename std::decay<Observable>::type::value_type, select_type>::value_type>(map<typename std::decay<Observable>::type::value_type, select_type>(selector))) { + return source.lift<typename map<typename std::decay<Observable>::type::value_type, select_type>::value_type>(map<typename std::decay<Observable>::type::value_type, select_type>(selector)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index 8e6c18d..c1157de 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -193,8 +193,8 @@ public: observe_on_factory(coordination_type cn) : coordination(std::move(cn)) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination))) { - return source.lift(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination)); + -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination))) { + return source.template lift<typename std::decay<Observable>::type::value_type>(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-window.hpp b/Rx/v2/src/rxcpp/operators/rx-window.hpp index d87285d..c2bf220 100644 --- a/Rx/v2/src/rxcpp/operators/rx-window.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-window.hpp @@ -35,22 +35,6 @@ struct window { } - struct window_observable : public sources::source_base<T> - { - mutable rxcpp::subjects::subject<T> subj; - - window_observable(rxcpp::subjects::subject<T> s) - : subj(std::move(s)) - { - } - - template<class Subscriber> - void on_subscribe(Subscriber o) const { - static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); - subj.get_observable().subscribe(o); - } - }; - template<class Subscriber> struct window_observer : public window_values, public observer_base<observable<T>> { @@ -58,6 +42,7 @@ struct window typedef observer_base<observable<T>> base_type; typedef typename base_type::value_type value_type; typedef typename std::decay<Subscriber>::type dest_type; + typedef observer<T, this_type> observer_type; dest_type dest; mutable int cursor; mutable std::deque<rxcpp::subjects::subject<T>> subj; @@ -68,7 +53,7 @@ struct window , cursor(0) { subj.push_back(rxcpp::subjects::subject<T>()); - dest.on_next(observable<T, window_observable>(window_observable(subj[0]))); + dest.on_next(subj[0].get_observable().as_dynamic()); } void on_next(T v) const { for (auto s : subj) { @@ -83,7 +68,7 @@ struct window if (++cursor % this->skip == 0) { subj.push_back(rxcpp::subjects::subject<T>()); - dest.on_next(observable<T, window_observable>(window_observable(subj[subj.size() - 1]))); + dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic()); } } @@ -101,9 +86,9 @@ struct window dest.on_completed(); } - static subscriber<value_type, this_type> make(dest_type d, window_values v) { + static subscriber<T, observer_type> make(dest_type d, window_values v) { auto cs = d.get_subscription(); - return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(v))); + return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v)))); } }; @@ -122,8 +107,8 @@ public: window_factory(int c, int s) : count(c), skip(s) {} template<class Observable> auto operator()(Observable&& source) - -> decltype(source.lift(window<typename std::decay<Observable>::type::value_type>(count, skip))) { - return source.lift(window<typename std::decay<Observable>::type::value_type>(count, skip)); + -> decltype(source.template lift<observable<typename std::decay<Observable>::type::value_type>>(window<typename std::decay<Observable>::type::value_type>(count, skip))) { + return source.template lift<observable<typename std::decay<Observable>::type::value_type>>(window<typename std::decay<Observable>::type::value_type>(count, skip)); } }; diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 445e261..0352605 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -206,8 +206,8 @@ public: // template<class T, class SourceOperator, class OperatorFactory> auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } // @@ -216,8 +216,8 @@ auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source // template<class T, class SourceOperator, class OperatorFactory> auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } #endif diff --git a/Rx/v2/src/rxcpp/rx-coordination.hpp b/Rx/v2/src/rxcpp/rx-coordination.hpp index cbe030d..abd1506 100644 --- a/Rx/v2/src/rxcpp/rx-coordination.hpp +++ b/Rx/v2/src/rxcpp/rx-coordination.hpp @@ -195,13 +195,12 @@ class serialize_one_worker : public coordination_base } }; - template<class Subscriber> - struct serialize_observer : public observer_base<typename Subscriber::value_type> + template<class Observer> + struct serialize_observer { - typedef serialize_observer<Subscriber> this_type; - typedef observer_base<typename Subscriber::value_type> base_type; - typedef typename base_type::value_type value_type; - typedef typename std::decay<Subscriber>::type dest_type; + typedef serialize_observer<Observer> this_type; + typedef typename std::decay<Observer>::type dest_type; + typedef typename dest_type::value_type value_type; typedef observer<value_type, this_type> observer_type; dest_type dest; std::shared_ptr<std::mutex> lock; @@ -227,8 +226,9 @@ class serialize_one_worker : public coordination_base dest.on_completed(); } - static subscriber<value_type, this_type> make(dest_type d, std::shared_ptr<std::mutex> m) { - return make_subscriber<value_type>(d, this_type(d, std::move(m))); + template<class Subscriber> + static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) { + return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m)))); } }; @@ -259,9 +259,9 @@ class serialize_one_worker : public coordination_base return std::move(o); } template<class Subscriber> - auto out(Subscriber s) const - -> decltype(serialize_observer<Subscriber>::make(std::move(s), lock)) { - return serialize_observer<Subscriber>::make(std::move(s), lock); + auto out(const Subscriber& s) const + -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) { + return serialize_observer<decltype(s.get_observer())>::make(s, lock); } template<class F> auto act(F f) const diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 806e2c4..944d230 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -46,38 +46,6 @@ struct has_on_subscribe_for static const bool value = std::is_same<detail_result, void>::value; }; -struct lift_function { -template<class SourceObservable, class OperatorFactory> -static auto chain(const SourceObservable& source, OperatorFactory&& of) - -> decltype(source.lift(std::forward<OperatorFactory>(of))) { - return source.lift(std::forward<OperatorFactory>(of)); -} -}; -struct operator_factory { -template<class SourceObservable, class OperatorFactory> -static auto chain(const SourceObservable& source, OperatorFactory&& of) - -> decltype(source.op(std::forward<OperatorFactory>(of))) { - return source.op(std::forward<OperatorFactory>(of)); -} -}; -struct not_chainable { -}; - -template<class T, class SourceObservable, class OperatorFactory> -struct select_chain -{ - typedef - typename std::conditional< - rxcpp::detail::is_lift_function_for<subscriber<T>, OperatorFactory>::value, - lift_function, - typename std::conditional< - rxcpp::detail::is_operator_factory_for<SourceObservable, OperatorFactory>::value, - operator_factory, - not_chainable - >::type - >::type type; -}; - } template<class T> @@ -131,9 +99,9 @@ public: } template<class Subscriber> - typename std::enable_if<!std::is_same<Subscriber, observer<T>>::value, void>::type + typename std::enable_if<is_subscriber<Subscriber>::value, void>::type on_subscribe(Subscriber o) const { - state->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer()))); + state->on_subscribe(o.as_dynamic()); } }; @@ -237,7 +205,7 @@ private: typedef typename std::decay<Subscriber>::type subscriber_type; - static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer"); + static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber"); static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible"); static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); @@ -337,12 +305,12 @@ public: /// this is intended to allow externally defined operators, that use make_subscriber, to be connected /// into the expression. /// - template<class Operator> + template<class ResultType, class Operator> auto lift(Operator&& op) const - -> observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>> { - return observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>>( - rxo::detail::lift<source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); - static_assert(detail::is_lift_function_for<subscriber<T>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); + -> observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>> { + return observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>>( + rxo::detail::lift<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); + static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); } /// @@ -366,16 +334,16 @@ public: /// template<class Predicate> auto filter(Predicate p) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::filter<T, Predicate>(std::move(p)))) { - return lift(rxo::detail::filter<T, Predicate>(std::move(p))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) { + return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p))); } /// finally () -> /// template<class LastCall> auto finally(LastCall lc) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::finally<T, LastCall>(std::move(lc)))) { - return lift(rxo::detail::finally<T, LastCall>(std::move(lc))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) { + return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))); } /// map (AKA Select) -> @@ -383,48 +351,48 @@ public: /// template<class Selector> auto map(Selector&& s) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::map<T, Selector>(std::move(s)))) { - return lift(rxo::detail::map<T, Selector>(std::move(s))); + -> decltype(EXPLICIT_THIS lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)))) { + return lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s))); } /// distinct_until_changed -> /// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned. /// auto distinct_until_changed() const - -> decltype(EXPLICIT_THIS lift(rxo::detail::distinct_until_changed<T>())) { - return lift(rxo::detail::distinct_until_changed<T>()); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) { + return lift<T>(rxo::detail::distinct_until_changed<T>()); } /// window -> /// produce observables containing count items emitted by this observable /// auto window(int count) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, count))) { - return lift(rxo::detail::window<T>(count, count)); + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) { + return lift<observable<T>>(rxo::detail::window<T>(count, count)); } /// window -> /// produce observables containing count items emitted by this observable /// auto window(int count, int skip) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, skip))) { - return lift(rxo::detail::window<T>(count, skip)); + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) { + return lift<observable<T>>(rxo::detail::window<T>(count, skip)); } /// buffer -> /// collect count items from this observable and produce a vector of them to emit from the new observable that is returned. /// auto buffer(int count) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, count))) { - return lift(rxo::detail::buffer_count<T>(count, count)); + -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) { + return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count)); } /// buffer -> /// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned. /// auto buffer(int count, int skip) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, skip))) { - return lift(rxo::detail::buffer_count<T>(count, skip)); + -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) { + return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip)); } template<class Coordination> @@ -747,8 +715,8 @@ public: /// template<class Coordination> auto observe_on(Coordination cn) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) { - return lift(rxo::detail::observe_on<T, Coordination>(std::move(cn))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) { + return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))); } /// reduce -> @@ -960,6 +928,11 @@ class observable<void, void> { ~observable(); public: + template<class T, class OnSubscribe> + static auto create(OnSubscribe os) + -> decltype(rxs::create<T>(std::move(os))) { + return rxs::create<T>(std::move(os)); + } template<class T> static auto range(T first = 0, T last = std::numeric_limits<T>::max(), ptrdiff_t step = 1) -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) { @@ -1087,8 +1060,8 @@ public: // template<class T, class SourceOperator, class OperatorFactory> auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } // @@ -1097,8 +1070,8 @@ auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFa // template<class T, class SourceOperator, class OperatorFactory> auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } #endif diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp index 805cb94..1b82f40 100644 --- a/Rx/v2/src/rxcpp/rx-predef.hpp +++ b/Rx/v2/src/rxcpp/rx-predef.hpp @@ -7,14 +7,12 @@ #include "rx-includes.hpp" -auto rxcpp_trace_activity(...) -> rxcpp::trace_noop; - namespace rxcpp { // // create a typedef for rxcpp_trace_type to override the default // -auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))& { +inline auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))& { static decltype(rxcpp_trace_activity(trace_tag())) trace; return trace; } diff --git a/Rx/v2/src/rxcpp/rx-sources.hpp b/Rx/v2/src/rxcpp/rx-sources.hpp index b4205ff..c9fb73f 100644 --- a/Rx/v2/src/rxcpp/rx-sources.hpp +++ b/Rx/v2/src/rxcpp/rx-sources.hpp @@ -35,6 +35,7 @@ namespace rxs=sources; } +#include "sources/rx-create.hpp" #include "sources/rx-range.hpp" #include "sources/rx-iterate.hpp" #include "sources/rx-interval.hpp" diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 53fbd92..dbb3615 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -18,32 +18,37 @@ struct subscriber_base : public observer_base<T>, public subscription_base template<class T, class Observer = observer<T>> class subscriber : public subscriber_base<T> { + static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers"); + static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>"); typedef subscriber<T, Observer> this_type; typedef typename std::decay<Observer>::type observer_type; composite_subscription lifetime; observer_type destination; + trace_id id; struct nextdetacher { ~nextdetacher() { trace_activity().on_next_return(*that); - if (that) { + if (do_unsubscribe) { that->unsubscribe(); } } nextdetacher(const this_type* that) : that(that) + , do_unsubscribe(true) { } template<class U> void operator()(U u) { trace_activity().on_next_enter(*that, u); that->destination.on_next(std::move(u)); - that = nullptr; + do_unsubscribe = false; } const this_type* that; + bool do_unsubscribe; }; struct errordetacher @@ -89,24 +94,31 @@ public: subscriber(const this_type& o) : lifetime(o.lifetime) , destination(o.destination) + , id(o.id) { } subscriber(this_type&& o) : lifetime(std::move(o.lifetime)) , destination(std::move(o.destination)) + , id(std::move(o.id)) { } template<class U> - subscriber(composite_subscription cs, U&& o) + subscriber(trace_id id, composite_subscription cs, U&& o) : lifetime(std::move(cs)) , destination(std::forward<U>(o)) + , id(std::move(id)) { + static_assert(!is_subscriber<U>::value, "cannot nest subscribers"); + static_assert(is_observer<U>::value, "must pass observer to subscriber"); + trace_activity().create_subscriber(*this); } this_type& operator=(this_type o) { lifetime = std::move(o.lifetime); destination = std::move(o.destination); + id = std::move(o.id); return *this; } @@ -122,9 +134,12 @@ public: composite_subscription& get_subscription() { return lifetime; } + trace_id get_id() const { + return id; + } subscriber<T> as_dynamic() const { - return subscriber<T>(lifetime, destination.as_dynamic()); + return subscriber<T>(id, lifetime, destination.as_dynamic()); } // observer @@ -192,7 +207,7 @@ auto make_subscriber() -> typename std::enable_if< detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value, subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>>::type { - return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, detail::OnNextEmpty<T>>>( static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); } @@ -201,14 +216,14 @@ template<class T, class I> auto make_subscriber( const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(composite_subscription(), o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class Observer> auto make_subscriber(const Observer& o) -> typename std::enable_if< is_observer<Observer>::value, subscriber<T, Observer>>::type { - return subscriber<T, Observer>(composite_subscription(), o); + return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class Observer> auto make_subscriber(const Observer& o) @@ -218,14 +233,14 @@ auto make_subscriber(const Observer& o) !is_subscription<Observer>::value && !is_observer<Observer>::value, subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(composite_subscription(), o); + return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class OnNext> auto make_subscriber(const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); } @@ -235,7 +250,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe) detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); } @@ -245,7 +260,7 @@ auto make_subscriber(const OnNext& on, const OnCompleted& oc) detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); } @@ -256,7 +271,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); } @@ -267,7 +282,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) template<class T> auto make_subscriber(const composite_subscription& cs) -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { - return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(cs, + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, detail::OnNextEmpty<T>>>( static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); } @@ -276,14 +291,21 @@ template<class T, class I> auto make_subscriber(const composite_subscription& cs, const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(cs, o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); +} +template<class T, class I> +auto make_subscriber(const composite_subscription& cs, + const subscriber<T, I>& s) + -> subscriber<T, I> { + return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer()); } template<class T, class Observer> auto make_subscriber(const composite_subscription& cs, const Observer& o) -> typename std::enable_if< + !is_subscriber<Observer>::value && is_observer<Observer>::value, subscriber<T, Observer>>::type { - return subscriber<T, Observer>(cs, o); + return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); } template<class T, class Observer> auto make_subscriber(const composite_subscription& cs, const Observer& o) @@ -293,14 +315,14 @@ auto make_subscriber(const composite_subscription& cs, const Observer& o) !is_subscription<Observer>::value && !is_observer<Observer>::value, subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(cs, o); + return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o)); } template<class T, class OnNext> auto make_subscriber(const composite_subscription& cs, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); } @@ -310,7 +332,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); } @@ -320,7 +342,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); } @@ -331,7 +353,153 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); +} + +// explicit id +// + +template<class T> +auto make_subscriber(trace_id id) + -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, detail::OnNextEmpty<T>>>( + static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); +} + +template<class T> +auto make_subscriber(trace_id id, const composite_subscription& cs) + -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), cs, + observer<T, static_observer<T, detail::OnNextEmpty<T>>>( + static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); +} + +template<class T, class I> +auto make_subscriber(trace_id id, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o); +} +template<class T, class I> +auto make_subscriber(trace_id id, const composite_subscription& cs, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), cs, o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + return subscriber<T, Observer>(std::move(id), composite_subscription(), o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + return subscriber<T, Observer>(std::move(id), cs, o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + return subscriber<T, observer<T, Observer>>(std::move(id), cs, o); +} +template<class T, class OnNext> +auto make_subscriber(trace_id id, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); +} +template<class T, class OnNext> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); +} +template<class T, class OnNext, class OnError> +auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); +} +template<class T, class OnNext, class OnError> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); +} +template<class T, class OnNext, class OnCompleted> +auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); +} +template<class T, class OnNext, class OnCompleted> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); +} +template<class T, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); +} +template<class T, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); } @@ -342,15 +510,37 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O template<class T, class OtherT, class OtherObserver, class I> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(scbr.get_subscription(), o); + -> subscriber<T, observer<T, I>> { + auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class I> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) -> typename std::enable_if< + !is_subscription<Observer>::value && is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(scbr.get_subscription(), o); + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) @@ -359,37 +549,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observ !is_subscriber<Observer>::value && !is_subscription<Observer>::value && !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(scbr.get_subscription(), o); + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o)); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc) @@ -397,24 +642,56 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class I> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(cs, o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); +} +template<class T, class OtherT, class OtherObserver, class I> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), cs, o); } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) -> typename std::enable_if< is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(cs, o); + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(std::move(id), cs, o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) @@ -423,37 +700,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos !is_subscriber<Observer>::value && !is_subscription<Observer>::value && !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(cs, o); + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs, observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) @@ -461,18 +793,48 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; } -// override lifetime -// template<class T, class Observer> auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs) -> subscriber<T, Observer> { - return subscriber<T, Observer>(cs, scbr.get_observer()); + auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class Observer> +auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs) + -> subscriber<T, Observer> { + auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; +} + +template<class T, class Observer> +auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id) + -> subscriber<T, Observer> { + auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; } } diff --git a/Rx/v2/src/rxcpp/rx-test.hpp b/Rx/v2/src/rxcpp/rx-test.hpp index cf4841b..542c55b 100644 --- a/Rx/v2/src/rxcpp/rx-test.hpp +++ b/Rx/v2/src/rxcpp/rx-test.hpp @@ -42,7 +42,7 @@ struct test_source static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber."); - ts->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer()))); + ts->on_subscribe(o.as_dynamic()); } }; @@ -111,8 +111,8 @@ namespace rxt=test; // template<class T, class OperatorFactory> auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } // @@ -121,8 +121,8 @@ auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFac // template<class T, class OperatorFactory> auto operator | (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } #include "schedulers/rx-test.hpp" diff --git a/Rx/v2/src/rxcpp/rx-trace.hpp b/Rx/v2/src/rxcpp/rx-trace.hpp index 3da4ec8..e72caaf 100644 --- a/Rx/v2/src/rxcpp/rx-trace.hpp +++ b/Rx/v2/src/rxcpp/rx-trace.hpp @@ -5,10 +5,40 @@ #if !defined(RXCPP_RX_TRACE_HPP) #define RXCPP_RX_TRACE_HPP +#include <iostream> #include <exception> +#include <atomic> namespace rxcpp { +struct trace_id +{ + static inline trace_id make_next_id_subscriber() { + static std::atomic<unsigned long> id(0xB0000000); + return trace_id{++id}; + } + unsigned long id; +}; + +inline bool operator==(const trace_id& lhs, const trace_id& rhs) { + return lhs.id == rhs.id; +} +inline bool operator!=(const trace_id& lhs, const trace_id& rhs) { + return !(lhs==rhs); +} + +inline bool operator<(const trace_id& lhs, const trace_id& rhs) { + if ((lhs.id & 0xF0000000) != (rhs.id & 0xF0000000)) std::terminate(); + return lhs.id < rhs.id; +} +inline bool operator>(const trace_id& lhs, const trace_id& rhs) { + return rhs<lhs; +} + +inline std::ostream& operator<< (std::ostream& os, const trace_id& id) { + return os << std::hex << id.id << std::dec; +} + struct trace_noop { template<class Worker, class Schedulable> @@ -32,6 +62,9 @@ struct trace_noop template<class Observable> inline void subscribe_return(const Observable& o) {} + template<class SubscriberFrom, class SubscriberTo> + inline void connect(const SubscriberFrom&, const SubscriberTo&) {} + template<class OperatorSource, class OperatorChain, class Subscriber, class SubscriberLifted> inline void lift_enter(const OperatorSource&, const OperatorChain&, const Subscriber&, const SubscriberLifted&) {} template<class OperatorSource, class OperatorChain> @@ -52,6 +85,9 @@ struct trace_noop template<class SubscriptionState> inline void subscription_remove_return(const SubscriptionState&) {} + template<class Subscriber> + inline void create_subscriber(const Subscriber&) {} + template<class Subscriber, class T> inline void on_next_enter(const Subscriber&, const T&) {} template<class Subscriber> @@ -72,7 +108,7 @@ struct trace_tag {}; } -auto rxcpp_trace_activity(...) -> rxcpp::trace_noop; +inline auto rxcpp_trace_activity(...) -> rxcpp::trace_noop; #endif diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 06da93e..9e613c4 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -261,27 +261,85 @@ struct plus { return std::forward<LHS>(lhs) + std::forward<RHS>(rhs); } }; -template<class OStream> -struct println_function +namespace detail { +template<class OStream, class Delimit> +struct print_function { OStream& os; - println_function(OStream& os) : os(os) {} + Delimit delimit; + print_function(OStream& os, Delimit d) : os(os), delimit(std::move(d)) {} template<class... TN> void operator()(const TN&... tn) const { bool inserts[] = {(os << tn, true)...}; - os << std::endl; + delimit(); } template<class... TN> void operator()(const std::tuple<TN...>& tpl) const { - apply(tpl, *this); + rxcpp::util::apply(tpl, *this); } }; + +template<class OStream> +struct endline +{ + OStream& os; + endline(OStream& os) : os(os) {} + void operator()() const { + os << std::endl; + } +}; + +template<class OStream, class ValueType> +struct insert_value +{ + OStream& os; + ValueType value; + insert_value(OStream& os, ValueType v) : os(os), value(std::move(v)) {} + void operator()() const { + os << value; + } +}; + +template<class OStream, class Function> +struct insert_function +{ + OStream& os; + Function call; + insert_function(OStream& os, Function f) : os(os), call(std::move(f)) {} + void operator()() const { + call(os); + } +}; + +} + +template<class OStream> +auto endline(OStream& os) + -> detail::endline<OStream> { + return detail::endline<OStream>(os); +} + +template<class OStream, class Delimit> +auto print(OStream& os, Delimit d) + -> decltype(d(), detail::print_function<OStream, Delimit>(os, std::move(d))) { + return detail::print_function<OStream, Delimit>(os, std::move(d)); +} +template<class OStream, class Delimit> +auto print(OStream& os, Delimit d) + -> decltype(d(os), detail::print_function<OStream, detail::insert_function<OStream, Delimit>>(os, detail::insert_function<OStream, Delimit>(os, std::move(d)))) { + return detail::print_function<OStream, detail::insert_function<OStream, Delimit>>(os, detail::insert_function<OStream, Delimit>(os, std::move(d))); +} template<class OStream> auto println(OStream& os) - -> println_function<OStream> { - return println_function<OStream>(os); + -> decltype(print(os, endline(os))) { + return print(os, endline(os)); +} +template<class OStream, class DelimitValue> +auto print_delimited_by(OStream& os, DelimitValue dv) + -> detail::print_function<OStream, detail::insert_value<OStream, DelimitValue>> { + return detail::print_function<OStream, detail::insert_value<OStream, DelimitValue>>(os, detail::insert_value<OStream, DelimitValue>(os, std::move(dv))); } namespace detail { diff --git a/Rx/v2/src/rxcpp/sources/rx-create.hpp b/Rx/v2/src/rxcpp/sources/rx-create.hpp new file mode 100644 index 0000000..2c50e98 --- /dev/null +++ b/Rx/v2/src/rxcpp/sources/rx-create.hpp @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_SOURCES_RX_CREATE_HPP) +#define RXCPP_SOURCES_RX_CREATE_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace sources { + +namespace detail { + +template<class T, class OnSubscribe> +struct create : public source_base<T> +{ + typedef create<T, OnSubscribe> this_type; + + typedef typename std::decay<OnSubscribe>::type on_subscribe_type; + + on_subscribe_type on_subscribe_function; + + create(on_subscribe_type os) + : on_subscribe_function(std::move(os)) + { + } + + template<class Subscriber> + void on_subscribe(Subscriber o) const { + + on_exception( + [&](){ + this->on_subscribe_function(o); + return true; + }, + o); + } +}; + +} + +template<class T, class OnSubscribe> +auto create(OnSubscribe os) + -> observable<T, detail::create<T, OnSubscribe>> { + return observable<T, detail::create<T, OnSubscribe>>( + detail::create<T, OnSubscribe>(std::move(os))); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/sources/rx-error.hpp b/Rx/v2/src/rxcpp/sources/rx-error.hpp index 4b2c4bb..5014c7c 100644 --- a/Rx/v2/src/rxcpp/sources/rx-error.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-error.hpp @@ -44,7 +44,7 @@ struct error : public source_base<T> // creates a worker whose lifetime is the same as this subscription auto coordinator = initial.coordination.create_coordinator(o.get_subscription()); - auto controller = coordinator.get_output().get_worker(); + auto controller = coordinator.get_worker(); auto exception = initial.exception; auto producer = [=](const rxsc::schedulable&){ diff --git a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp index 539f8d6..ff3becb 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp @@ -49,6 +49,10 @@ public: { } + subscriber<T> get_subscriber() const { + return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic(); + } + T get_value() const { return state->get(); } @@ -65,13 +69,11 @@ public: template<class T> class behavior { - composite_subscription lifetime; detail::behavior_observer<T> s; public: explicit behavior(T f, composite_subscription cs = composite_subscription()) - : lifetime(std::move(cs)) - , s(std::move(f), lifetime) + : s(std::move(f), cs) { } @@ -84,15 +86,16 @@ public: } subscriber<T> get_subscriber() const { - return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::behavior_observer<T>>(s))); + return s.get_subscriber(); } observable<T> get_observable() const { - return make_observable_dynamic<T>([this](subscriber<T> o){ - if (lifetime.is_subscribed()) { + auto keepAlive = s; + return make_observable_dynamic<T>([=](subscriber<T> o){ + if (keepAlive.get_subscription().is_subscribed()) { o.on_next(get_value()); } - this->s.add(std::move(o)); + keepAlive.add(s.get_subscriber(), std::move(o)); }); } }; diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index eff91c9..e07cc69 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -77,12 +77,15 @@ class multicast_observer { explicit binder_type(composite_subscription cs) : state(std::make_shared<state_type>(cs)) + , id(trace_id::make_next_id_subscriber()) , current_generation(0) { } std::shared_ptr<state_type> state; + trace_id id; + // used to avoid taking lock in on_next mutable int current_generation; mutable std::shared_ptr<completer_type> current_completer; @@ -93,18 +96,29 @@ class multicast_observer std::shared_ptr<binder_type> b; - - public: + typedef subscriber<T, observer<T, detail::multicast_observer<T>>> input_subscriber_type; + explicit multicast_observer(composite_subscription cs) : b(std::make_shared<binder_type>(cs)) { } + trace_id get_id() const { + return b->id; + } + composite_subscription get_subscription() const { + return b->state->lifetime; + } + input_subscriber_type get_subscriber() const { + return make_subscriber<T>(get_id(), get_subscription(), observer<T, detail::multicast_observer<T>>(*this)); + } bool has_observers() const { std::unique_lock<std::mutex> guard(b->state->lock); return b->current_completer && !b->current_completer->observers.empty(); } - void add(observer_type o) const { + template<class SubscriberFrom> + void add(const SubscriberFrom& sf, observer_type o) const { + trace_activity().connect(sf, o); std::unique_lock<std::mutex> guard(b->state->lock); switch (b->state->current) { case mode::Casting: @@ -197,17 +211,15 @@ public: template<class T> class subject { - composite_subscription lifetime; detail::multicast_observer<T> s; public: subject() - : s(lifetime) + : s(composite_subscription()) { } explicit subject(composite_subscription cs) - : lifetime(cs) - , s(cs) + : s(cs) { } @@ -216,13 +228,13 @@ public: } subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const { - return make_subscriber<T>(lifetime, observer<T, detail::multicast_observer<T>>(s)); + return s.get_subscriber(); } observable<T> get_observable() const { auto keepAlive = s; return make_observable_dynamic<T>([=](subscriber<T> o){ - keepAlive.add(std::move(o)); + keepAlive.add(s.get_subscriber(), std::move(o)); }); } }; diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp index c28fa91..564f564 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp @@ -143,6 +143,10 @@ public: state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o)); } + subscriber<T> get_subscriber() const { + return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic(); + } + template<class V> void on_next(V v) const { state->on_next(std::move(v)); @@ -160,13 +164,11 @@ public: template<class T, class Coordination> class synchronize { - composite_subscription lifetime; detail::synchronize_observer<T, Coordination> s; public: explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription()) - : lifetime(composite_subscription()) - , s(std::move(cn), std::move(cs), lifetime) + : s(std::move(cn), std::move(cs), composite_subscription()) { } @@ -175,12 +177,13 @@ public: } subscriber<T> get_subscriber() const { - return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::synchronize_observer<T, Coordination>>(s))); + return s.get_subscriber(); } observable<T> get_observable() const { - return make_observable_dynamic<T>([this](subscriber<T> o){ - this->s.add(std::move(o)); + auto keepAlive = s; + return make_observable_dynamic<T>([=](subscriber<T> o){ + keepAlive.add(s.get_subscriber(), std::move(o)); }); } }; diff --git a/Rx/v2/test/operators/lift.cpp b/Rx/v2/test/operators/lift.cpp index 7cf35b6..0496019 100644 --- a/Rx/v2/test/operators/lift.cpp +++ b/Rx/v2/test/operators/lift.cpp @@ -57,8 +57,8 @@ struct liftfilter dest.on_completed(); } - static rx::subscriber<value_type, this_type> make(const dest_type& d, const test_type& t) { - return rx::make_subscriber<value_type>(d, this_type(d, t)); + static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) { + return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t))); } }; @@ -120,7 +120,7 @@ SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]" auto res = w.start( [&xs, &invoked]() { return xs - .lift(liftfilter([&invoked](int x) { + .lift<int>(liftfilter([&invoked](int x) { invoked++; return IsPrime(x); })) @@ -183,10 +183,10 @@ SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stre auto res = w.start( [&xs, &invoked]() { return xs - >> liftfilter([&invoked](int x) { + >> rxo::lift<int>(liftfilter([&invoked](int x) { invoked++; return IsPrime(x); - }) + })) // forget type to workaround lambda deduction bug on msvc 2013 >> rxo::as_dynamic(); }, @@ -250,7 +250,7 @@ SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][ return IsPrime(x); }; return xs - .lift([=](rx::subscriber<int> dest){ + .lift<int>([=](rx::subscriber<int> dest){ // VS2013 deduction issue requires dynamic (type-forgetting) return rx::make_subscriber<int>( dest, diff --git a/Rx/v2/test/sources/create.cpp b/Rx/v2/test/sources/create.cpp new file mode 100644 index 0000000..063ff7e --- /dev/null +++ b/Rx/v2/test/sources/create.cpp @@ -0,0 +1,46 @@ +#include "rxcpp/rx.hpp" +namespace rx=rxcpp; +namespace rxu=rxcpp::util; +namespace rxsc=rxcpp::schedulers; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("create stops on completion", "[create][sources]"){ + GIVEN("a test cold observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + long invoked = 0; + + WHEN("created"){ + + auto res = w.start( + [&]() { + return rx::observable<>::create<int>( + [&](const rx::subscriber<int>& s){ + invoked++; + s.on_next(1); + s.on_next(2); + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains all items"){ + auto required = rxu::to_vector({ + on.on_next(200, 1), + on.on_next(200, 2) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("create was called until completed"){ + REQUIRE(1 == invoked); + } + } + } +} diff --git a/Rx/v2/test/sources/defer.cpp b/Rx/v2/test/sources/defer.cpp index bf4779e..e9e62c2 100644 --- a/Rx/v2/test/sources/defer.cpp +++ b/Rx/v2/test/sources/defer.cpp @@ -6,7 +6,7 @@ namespace rxsc=rxcpp::schedulers; #include "rxcpp/rx-test.hpp" #include "catch.hpp" -SCENARIO("defer stops on completion", "[defer][operators]"){ +SCENARIO("defer stops on completion", "[defer][sources]"){ GIVEN("a test cold observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); diff --git a/Rx/v2/test/sources/interval.cpp b/Rx/v2/test/sources/interval.cpp index 49d68e2..ae1a149 100644 --- a/Rx/v2/test/sources/interval.cpp +++ b/Rx/v2/test/sources/interval.cpp @@ -7,7 +7,7 @@ namespace rxsc=rxcpp::schedulers; #include "catch.hpp" -SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf]"){ +SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf][sources]"){ GIVEN("schedule_periodically"){ WHEN("the period is 1sec and the initial is 2sec"){ using namespace std::chrono; @@ -29,7 +29,7 @@ SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf]") } } -SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][long][perf]"){ +SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][long][perf][sources]"){ GIVEN("schedule_periodically_duration"){ WHEN("the period is 1sec and the initial is 2sec"){ using namespace std::chrono; @@ -72,7 +72,7 @@ SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][l } } -SCENARIO("intervals", "[hide][periodically][interval][scheduler][long][perf]"){ +SCENARIO("intervals", "[hide][periodically][interval][scheduler][long][perf][sources]"){ GIVEN("10 intervals of 1 seconds"){ WHEN("the period is 1sec and the initial is 2sec"){ using namespace std::chrono; diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index b979c4f..6919b63 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -139,11 +139,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug std::atomic<int> v(0); auto s0 = rxs::range(1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ @@ -154,11 +154,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug })); auto s1 = rxs::range(values + 1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ @@ -169,11 +169,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug })); auto s2 = rxs::range((values * 2) + 1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ @@ -249,10 +249,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o std::atomic<int> v(0); auto s0 = rxs::range(1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .observe_on(es) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ @@ -263,10 +263,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o })); auto s1 = rxs::range(values + 1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .observe_on(es) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ @@ -277,10 +277,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o })); auto s2 = rxs::range((values * 2) + 1, es) .take(values) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .as_dynamic() .observe_on(es) - .lift(liftrequirecompletion) + .lift<int>(liftrequirecompletion) .subscribe( rx::make_observer_dynamic<int>( [&](int i){ diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 9e34654..3741630 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -33,6 +33,7 @@ set(TEST_SOURCES ${TEST_DIR}/subscriptions/observer.cpp ${TEST_DIR}/subscriptions/subscription.cpp ${TEST_DIR}/subjects/subject.cpp + ${TEST_DIR}/sources/create.cpp ${TEST_DIR}/sources/defer.cpp ${TEST_DIR}/sources/interval.cpp ${TEST_DIR}/operators/buffer.cpp |