diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-observable.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 97 |
1 files changed, 35 insertions, 62 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 806e2c4..944d230 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -46,38 +46,6 @@ struct has_on_subscribe_for static const bool value = std::is_same<detail_result, void>::value; }; -struct lift_function { -template<class SourceObservable, class OperatorFactory> -static auto chain(const SourceObservable& source, OperatorFactory&& of) - -> decltype(source.lift(std::forward<OperatorFactory>(of))) { - return source.lift(std::forward<OperatorFactory>(of)); -} -}; -struct operator_factory { -template<class SourceObservable, class OperatorFactory> -static auto chain(const SourceObservable& source, OperatorFactory&& of) - -> decltype(source.op(std::forward<OperatorFactory>(of))) { - return source.op(std::forward<OperatorFactory>(of)); -} -}; -struct not_chainable { -}; - -template<class T, class SourceObservable, class OperatorFactory> -struct select_chain -{ - typedef - typename std::conditional< - rxcpp::detail::is_lift_function_for<subscriber<T>, OperatorFactory>::value, - lift_function, - typename std::conditional< - rxcpp::detail::is_operator_factory_for<SourceObservable, OperatorFactory>::value, - operator_factory, - not_chainable - >::type - >::type type; -}; - } template<class T> @@ -131,9 +99,9 @@ public: } template<class Subscriber> - typename std::enable_if<!std::is_same<Subscriber, observer<T>>::value, void>::type + typename std::enable_if<is_subscriber<Subscriber>::value, void>::type on_subscribe(Subscriber o) const { - state->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer()))); + state->on_subscribe(o.as_dynamic()); } }; @@ -237,7 +205,7 @@ private: typedef typename std::decay<Subscriber>::type subscriber_type; - static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer"); + static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber"); static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible"); static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); @@ -337,12 +305,12 @@ public: /// this is intended to allow externally defined operators, that use make_subscriber, to be connected /// into the expression. /// - template<class Operator> + template<class ResultType, class Operator> auto lift(Operator&& op) const - -> observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>> { - return observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>>( - rxo::detail::lift<source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); - static_assert(detail::is_lift_function_for<subscriber<T>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); + -> observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>> { + return observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>>( + rxo::detail::lift<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); + static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); } /// @@ -366,16 +334,16 @@ public: /// template<class Predicate> auto filter(Predicate p) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::filter<T, Predicate>(std::move(p)))) { - return lift(rxo::detail::filter<T, Predicate>(std::move(p))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) { + return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p))); } /// finally () -> /// template<class LastCall> auto finally(LastCall lc) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::finally<T, LastCall>(std::move(lc)))) { - return lift(rxo::detail::finally<T, LastCall>(std::move(lc))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) { + return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))); } /// map (AKA Select) -> @@ -383,48 +351,48 @@ public: /// template<class Selector> auto map(Selector&& s) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::map<T, Selector>(std::move(s)))) { - return lift(rxo::detail::map<T, Selector>(std::move(s))); + -> decltype(EXPLICIT_THIS lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)))) { + return lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s))); } /// distinct_until_changed -> /// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned. /// auto distinct_until_changed() const - -> decltype(EXPLICIT_THIS lift(rxo::detail::distinct_until_changed<T>())) { - return lift(rxo::detail::distinct_until_changed<T>()); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) { + return lift<T>(rxo::detail::distinct_until_changed<T>()); } /// window -> /// produce observables containing count items emitted by this observable /// auto window(int count) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, count))) { - return lift(rxo::detail::window<T>(count, count)); + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) { + return lift<observable<T>>(rxo::detail::window<T>(count, count)); } /// window -> /// produce observables containing count items emitted by this observable /// auto window(int count, int skip) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, skip))) { - return lift(rxo::detail::window<T>(count, skip)); + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) { + return lift<observable<T>>(rxo::detail::window<T>(count, skip)); } /// buffer -> /// collect count items from this observable and produce a vector of them to emit from the new observable that is returned. /// auto buffer(int count) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, count))) { - return lift(rxo::detail::buffer_count<T>(count, count)); + -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) { + return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count)); } /// buffer -> /// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned. /// auto buffer(int count, int skip) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, skip))) { - return lift(rxo::detail::buffer_count<T>(count, skip)); + -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) { + return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip)); } template<class Coordination> @@ -747,8 +715,8 @@ public: /// template<class Coordination> auto observe_on(Coordination cn) const - -> decltype(EXPLICIT_THIS lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) { - return lift(rxo::detail::observe_on<T, Coordination>(std::move(cn))); + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) { + return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))); } /// reduce -> @@ -960,6 +928,11 @@ class observable<void, void> { ~observable(); public: + template<class T, class OnSubscribe> + static auto create(OnSubscribe os) + -> decltype(rxs::create<T>(std::move(os))) { + return rxs::create<T>(std::move(os)); + } template<class T> static auto range(T first = 0, T last = std::numeric_limits<T>::max(), ptrdiff_t step = 1) -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) { @@ -1087,8 +1060,8 @@ public: // template<class T, class SourceOperator, class OperatorFactory> auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } // @@ -1097,8 +1070,8 @@ auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFa // template<class T, class SourceOperator, class OperatorFactory> auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) { - return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of)); + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); } #endif |