summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-observable.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-observable.hpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp97
1 files changed, 35 insertions, 62 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 806e2c4..944d230 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -46,38 +46,6 @@ struct has_on_subscribe_for
static const bool value = std::is_same<detail_result, void>::value;
};
-struct lift_function {
-template<class SourceObservable, class OperatorFactory>
-static auto chain(const SourceObservable& source, OperatorFactory&& of)
- -> decltype(source.lift(std::forward<OperatorFactory>(of))) {
- return source.lift(std::forward<OperatorFactory>(of));
-}
-};
-struct operator_factory {
-template<class SourceObservable, class OperatorFactory>
-static auto chain(const SourceObservable& source, OperatorFactory&& of)
- -> decltype(source.op(std::forward<OperatorFactory>(of))) {
- return source.op(std::forward<OperatorFactory>(of));
-}
-};
-struct not_chainable {
-};
-
-template<class T, class SourceObservable, class OperatorFactory>
-struct select_chain
-{
- typedef
- typename std::conditional<
- rxcpp::detail::is_lift_function_for<subscriber<T>, OperatorFactory>::value,
- lift_function,
- typename std::conditional<
- rxcpp::detail::is_operator_factory_for<SourceObservable, OperatorFactory>::value,
- operator_factory,
- not_chainable
- >::type
- >::type type;
-};
-
}
template<class T>
@@ -131,9 +99,9 @@ public:
}
template<class Subscriber>
- typename std::enable_if<!std::is_same<Subscriber, observer<T>>::value, void>::type
+ typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const {
- state->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer())));
+ state->on_subscribe(o.as_dynamic());
}
};
@@ -237,7 +205,7 @@ private:
typedef typename std::decay<Subscriber>::type subscriber_type;
- static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer");
+ static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
@@ -337,12 +305,12 @@ public:
/// this is intended to allow externally defined operators, that use make_subscriber, to be connected
/// into the expression.
///
- template<class Operator>
+ template<class ResultType, class Operator>
auto lift(Operator&& op) const
- -> observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>> {
- return observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>>(
- rxo::detail::lift<source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
- static_assert(detail::is_lift_function_for<subscriber<T>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
+ -> observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>> {
+ return observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>>(
+ rxo::detail::lift<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
+ static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
}
///
@@ -366,16 +334,16 @@ public:
///
template<class Predicate>
auto filter(Predicate p) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::filter<T, Predicate>(std::move(p)))) {
- return lift(rxo::detail::filter<T, Predicate>(std::move(p)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) {
+ return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)));
}
/// finally () ->
///
template<class LastCall>
auto finally(LastCall lc) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::finally<T, LastCall>(std::move(lc)))) {
- return lift(rxo::detail::finally<T, LastCall>(std::move(lc)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) {
+ return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)));
}
/// map (AKA Select) ->
@@ -383,48 +351,48 @@ public:
///
template<class Selector>
auto map(Selector&& s) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::map<T, Selector>(std::move(s)))) {
- return lift(rxo::detail::map<T, Selector>(std::move(s)));
+ -> decltype(EXPLICIT_THIS lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)))) {
+ return lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)));
}
/// distinct_until_changed ->
/// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned.
///
auto distinct_until_changed() const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::distinct_until_changed<T>())) {
- return lift(rxo::detail::distinct_until_changed<T>());
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) {
+ return lift<T>(rxo::detail::distinct_until_changed<T>());
}
/// window ->
/// produce observables containing count items emitted by this observable
///
auto window(int count) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, count))) {
- return lift(rxo::detail::window<T>(count, count));
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) {
+ return lift<observable<T>>(rxo::detail::window<T>(count, count));
}
/// window ->
/// produce observables containing count items emitted by this observable
///
auto window(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, skip))) {
- return lift(rxo::detail::window<T>(count, skip));
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) {
+ return lift<observable<T>>(rxo::detail::window<T>(count, skip));
}
/// buffer ->
/// collect count items from this observable and produce a vector of them to emit from the new observable that is returned.
///
auto buffer(int count) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, count))) {
- return lift(rxo::detail::buffer_count<T>(count, count));
+ -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) {
+ return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count));
}
/// buffer ->
/// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned.
///
auto buffer(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, skip))) {
- return lift(rxo::detail::buffer_count<T>(count, skip));
+ -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) {
+ return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip));
}
template<class Coordination>
@@ -747,8 +715,8 @@ public:
///
template<class Coordination>
auto observe_on(Coordination cn) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) {
- return lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) {
+ return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
}
/// reduce ->
@@ -960,6 +928,11 @@ class observable<void, void>
{
~observable();
public:
+ template<class T, class OnSubscribe>
+ static auto create(OnSubscribe os)
+ -> decltype(rxs::create<T>(std::move(os))) {
+ return rxs::create<T>(std::move(os));
+ }
template<class T>
static auto range(T first = 0, T last = std::numeric_limits<T>::max(), ptrdiff_t step = 1)
-> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
@@ -1087,8 +1060,8 @@ public:
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
//
@@ -1097,8 +1070,8 @@ auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFa
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
#endif