summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/examples/println/main.cpp1
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp8
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-filter.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-finally.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-lift.hpp39
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-map.hpp11
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window.hpp29
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp8
-rw-r--r--Rx/v2/src/rxcpp/rx-coordination.hpp22
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp97
-rw-r--r--Rx/v2/src/rxcpp/rx-predef.hpp4
-rw-r--r--Rx/v2/src/rxcpp/rx-sources.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp462
-rw-r--r--Rx/v2/src/rxcpp/rx-test.hpp10
-rw-r--r--Rx/v2/src/rxcpp/rx-trace.hpp38
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp72
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-create.hpp55
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-error.hpp2
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-behavior.hpp17
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp30
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp15
-rw-r--r--Rx/v2/test/operators/lift.cpp12
-rw-r--r--Rx/v2/test/sources/create.cpp46
-rw-r--r--Rx/v2/test/sources/defer.cpp2
-rw-r--r--Rx/v2/test/sources/interval.cpp6
-rw-r--r--Rx/v2/test/subscriptions/subscription.cpp24
-rw-r--r--projects/CMake/CMakeLists.txt1
29 files changed, 790 insertions, 238 deletions
diff --git a/Rx/v2/examples/println/main.cpp b/Rx/v2/examples/println/main.cpp
index 6904d7f..efe09ca 100644
--- a/Rx/v2/examples/println/main.cpp
+++ b/Rx/v2/examples/println/main.cpp
@@ -64,5 +64,6 @@ int main(int argc, char** argv)
hello_tpl().subscribe(rxu::println(std::cout));
+ hello_tpl().subscribe(rxu::print_delimited_by(std::cout, " and "), rxu::endline(std::cout));
return 0;
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
index 1a9029a..e3a2970 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
@@ -84,9 +84,9 @@ struct buffer_count
dest.on_completed();
}
- static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, buffer_count_values v) {
+ static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
auto cs = d.get_subscription();
- return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(v)));
+ return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
}
};
@@ -105,8 +105,8 @@ public:
buffer_count_factory(int c, int s) : count(c), skip(s) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip))) {
- return source.lift(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip));
+ -> decltype(source.template lift<std::vector<typename std::decay<Observable>::type::value_type>>(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip))) {
+ return source.template lift<std::vector<typename std::decay<Observable>::type::value_type>>(buffer_count<typename std::decay<Observable>::type::value_type>(count, skip));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
index a5ad360..910edda 100644
--- a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
@@ -62,8 +62,8 @@ class distinct_until_changed_factory
public:
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(distinct_until_changed<typename std::decay<Observable>::type>::value_type)) {
- return source.lift(distinct_until_changed<typename std::decay<Observable>::type>::value_type);
+ -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(distinct_until_changed<typename std::decay<Observable>::type>::value_type)) {
+ return source.template lift<typename std::decay<Observable>::type::value_type>(distinct_until_changed<typename std::decay<Observable>::type>::value_type);
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-filter.hpp b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
index 6dc727c..775bc16 100644
--- a/Rx/v2/src/rxcpp/operators/rx-filter.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
@@ -79,8 +79,8 @@ public:
filter_factory(test_type p) : predicate(std::move(p)) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate))) {
- return source.lift(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate));
+ -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate))) {
+ return source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, test_type>(predicate));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-finally.hpp b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
index a5d7ef9..6e2b7ad 100644
--- a/Rx/v2/src/rxcpp/operators/rx-finally.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
@@ -76,8 +76,8 @@ public:
finally_factory(last_call_type lc) : last_call(std::move(lc)) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call))) {
- return source.lift(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call));
+ -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call))) {
+ return source.template lift<typename std::decay<Observable>::type::value_type>(filter<typename std::decay<Observable>::type::value_type, last_call_type>(last_call));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-lift.hpp b/Rx/v2/src/rxcpp/operators/rx-lift.hpp
index 1800984..ad2ec4f 100644
--- a/Rx/v2/src/rxcpp/operators/rx-lift.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-lift.hpp
@@ -11,7 +11,7 @@ namespace rxcpp {
namespace detail {
-template<class S, class F>
+template<class V, class S, class F>
struct is_lift_function_for {
struct tag_not_valid {};
@@ -23,7 +23,7 @@ struct is_lift_function_for {
typedef typename std::decay<S>::type for_type;
typedef typename std::decay<F>::type func_type;
typedef decltype(check<for_type, func_type>(0)) detail_result;
- static const bool value = is_subscriber<detail_result>::value && is_subscriber<for_type>::value;
+ static const bool value = is_subscriber<detail_result>::value && is_subscriber<for_type>::value && std::is_convertible<V, typename detail_result::value_type>::value;
};
}
@@ -32,25 +32,22 @@ namespace operators {
namespace detail {
-template<class SourceOperator, class Operator>
+template<class ResultType, class SourceOperator, class Operator>
struct lift_traits
{
+ typedef typename std::decay<ResultType>::type result_value_type;
typedef typename std::decay<SourceOperator>::type source_operator_type;
typedef typename std::decay<Operator>::type operator_type;
typedef typename source_operator_type::value_type source_value_type;
- static_assert(rxcpp::detail::is_lift_function_for<subscriber<source_value_type>, operator_type>::value, "lift Operator must be a function with the signature subscriber<...>(subscriber<source_value_type, ...>)");
-
- typedef decltype((*(operator_type*)nullptr)(*(subscriber<source_value_type>*)nullptr)) result_for_dynamic_source_subscriber_type;
-
- typedef typename result_for_dynamic_source_subscriber_type::value_type result_value_type;
+ static_assert(rxcpp::detail::is_lift_function_for<source_value_type, subscriber<result_value_type>, operator_type>::value, "lift Operator must be a function with the signature subscriber<source_value_type, ...>(subscriber<result_value_type, ...>)");
};
-template<class SourceOperator, class Operator>
-struct lift : public operator_base<typename lift_traits<SourceOperator, Operator>::result_value_type>
+template<class ResultType, class SourceOperator, class Operator>
+struct lift : public operator_base<typename lift_traits<ResultType, SourceOperator, Operator>::result_value_type>
{
- typedef lift_traits<SourceOperator, Operator> traits;
+ typedef lift_traits<ResultType, SourceOperator, Operator> traits;
typedef typename traits::source_operator_type source_operator_type;
typedef typename traits::operator_type operator_type;
source_operator_type source;
@@ -70,6 +67,26 @@ struct lift : public operator_base<typename lift_traits<SourceOperator, Operator
}
};
+template<class ResultType, class Operator>
+class lift_factory
+{
+ typedef typename std::decay<Operator>::type operator_type;
+ operator_type chain;
+public:
+ lift_factory(operator_type op) : chain(std::move(op)) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<ResultType>(chain)) {
+ return source.template lift<ResultType>(chain);
+ }
+};
+
+}
+
+template<class ResultType, class Operator>
+auto lift(Operator&& op)
+ -> detail::lift_factory<ResultType, Operator> {
+ return detail::lift_factory<ResultType, Operator>(std::forward<Operator>(op));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp
index 19e31d2..1de7602 100644
--- a/Rx/v2/src/rxcpp/operators/rx-map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp
@@ -19,6 +19,7 @@ struct map
{
typedef typename std::decay<T>::type source_value_type;
typedef typename std::decay<Selector>::type select_type;
+ typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type;
select_type selector;
map(select_type s)
@@ -32,7 +33,7 @@ struct map
typedef map_observer<Subscriber> this_type;
typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type;
typedef typename std::decay<Subscriber>::type dest_type;
- typedef observer<value_type, this_type> observer_type;
+ typedef observer<T, this_type> observer_type;
dest_type dest;
select_type selector;
@@ -58,9 +59,9 @@ struct map
dest.on_completed();
}
- static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, select_type s) {
+ static subscriber<T, observer_type> make(dest_type d, select_type s) {
auto cs = d.get_subscription();
- return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(s)));
+ return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(s))));
}
};
@@ -80,8 +81,8 @@ public:
map_factory(select_type s) : selector(std::move(s)) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(map<typename std::decay<Observable>::type::value_type, select_type>(selector))) {
- return source.lift(map<typename std::decay<Observable>::type::value_type, select_type>(selector));
+ -> decltype(source.lift<typename map<typename std::decay<Observable>::type::value_type, select_type>::value_type>(map<typename std::decay<Observable>::type::value_type, select_type>(selector))) {
+ return source.lift<typename map<typename std::decay<Observable>::type::value_type, select_type>::value_type>(map<typename std::decay<Observable>::type::value_type, select_type>(selector));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index 8e6c18d..c1157de 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -193,8 +193,8 @@ public:
observe_on_factory(coordination_type cn) : coordination(std::move(cn)) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination))) {
- return source.lift(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination));
+ -> decltype(source.template lift<typename std::decay<Observable>::type::value_type>(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination))) {
+ return source.template lift<typename std::decay<Observable>::type::value_type>(observe_on<typename std::decay<Observable>::type::value_type, coordination_type>(coordination));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-window.hpp b/Rx/v2/src/rxcpp/operators/rx-window.hpp
index d87285d..c2bf220 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window.hpp
@@ -35,22 +35,6 @@ struct window
{
}
- struct window_observable : public sources::source_base<T>
- {
- mutable rxcpp::subjects::subject<T> subj;
-
- window_observable(rxcpp::subjects::subject<T> s)
- : subj(std::move(s))
- {
- }
-
- template<class Subscriber>
- void on_subscribe(Subscriber o) const {
- static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
- subj.get_observable().subscribe(o);
- }
- };
-
template<class Subscriber>
struct window_observer : public window_values, public observer_base<observable<T>>
{
@@ -58,6 +42,7 @@ struct window
typedef observer_base<observable<T>> base_type;
typedef typename base_type::value_type value_type;
typedef typename std::decay<Subscriber>::type dest_type;
+ typedef observer<T, this_type> observer_type;
dest_type dest;
mutable int cursor;
mutable std::deque<rxcpp::subjects::subject<T>> subj;
@@ -68,7 +53,7 @@ struct window
, cursor(0)
{
subj.push_back(rxcpp::subjects::subject<T>());
- dest.on_next(observable<T, window_observable>(window_observable(subj[0])));
+ dest.on_next(subj[0].get_observable().as_dynamic());
}
void on_next(T v) const {
for (auto s : subj) {
@@ -83,7 +68,7 @@ struct window
if (++cursor % this->skip == 0) {
subj.push_back(rxcpp::subjects::subject<T>());
- dest.on_next(observable<T, window_observable>(window_observable(subj[subj.size() - 1])));
+ dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic());
}
}
@@ -101,9 +86,9 @@ struct window
dest.on_completed();
}
- static subscriber<value_type, this_type> make(dest_type d, window_values v) {
+ static subscriber<T, observer_type> make(dest_type d, window_values v) {
auto cs = d.get_subscription();
- return make_subscriber<value_type>(std::move(cs), this_type(std::move(d), std::move(v)));
+ return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
}
};
@@ -122,8 +107,8 @@ public:
window_factory(int c, int s) : count(c), skip(s) {}
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.lift(window<typename std::decay<Observable>::type::value_type>(count, skip))) {
- return source.lift(window<typename std::decay<Observable>::type::value_type>(count, skip));
+ -> decltype(source.template lift<observable<typename std::decay<Observable>::type::value_type>>(window<typename std::decay<Observable>::type::value_type>(count, skip))) {
+ return source.template lift<observable<typename std::decay<Observable>::type::value_type>>(window<typename std::decay<Observable>::type::value_type>(count, skip));
}
};
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 445e261..0352605 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -206,8 +206,8 @@ public:
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
//
@@ -216,8 +216,8 @@ auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::connectable_observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-coordination.hpp b/Rx/v2/src/rxcpp/rx-coordination.hpp
index cbe030d..abd1506 100644
--- a/Rx/v2/src/rxcpp/rx-coordination.hpp
+++ b/Rx/v2/src/rxcpp/rx-coordination.hpp
@@ -195,13 +195,12 @@ class serialize_one_worker : public coordination_base
}
};
- template<class Subscriber>
- struct serialize_observer : public observer_base<typename Subscriber::value_type>
+ template<class Observer>
+ struct serialize_observer
{
- typedef serialize_observer<Subscriber> this_type;
- typedef observer_base<typename Subscriber::value_type> base_type;
- typedef typename base_type::value_type value_type;
- typedef typename std::decay<Subscriber>::type dest_type;
+ typedef serialize_observer<Observer> this_type;
+ typedef typename std::decay<Observer>::type dest_type;
+ typedef typename dest_type::value_type value_type;
typedef observer<value_type, this_type> observer_type;
dest_type dest;
std::shared_ptr<std::mutex> lock;
@@ -227,8 +226,9 @@ class serialize_one_worker : public coordination_base
dest.on_completed();
}
- static subscriber<value_type, this_type> make(dest_type d, std::shared_ptr<std::mutex> m) {
- return make_subscriber<value_type>(d, this_type(d, std::move(m)));
+ template<class Subscriber>
+ static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
+ return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
}
};
@@ -259,9 +259,9 @@ class serialize_one_worker : public coordination_base
return std::move(o);
}
template<class Subscriber>
- auto out(Subscriber s) const
- -> decltype(serialize_observer<Subscriber>::make(std::move(s), lock)) {
- return serialize_observer<Subscriber>::make(std::move(s), lock);
+ auto out(const Subscriber& s) const
+ -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
+ return serialize_observer<decltype(s.get_observer())>::make(s, lock);
}
template<class F>
auto act(F f) const
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 806e2c4..944d230 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -46,38 +46,6 @@ struct has_on_subscribe_for
static const bool value = std::is_same<detail_result, void>::value;
};
-struct lift_function {
-template<class SourceObservable, class OperatorFactory>
-static auto chain(const SourceObservable& source, OperatorFactory&& of)
- -> decltype(source.lift(std::forward<OperatorFactory>(of))) {
- return source.lift(std::forward<OperatorFactory>(of));
-}
-};
-struct operator_factory {
-template<class SourceObservable, class OperatorFactory>
-static auto chain(const SourceObservable& source, OperatorFactory&& of)
- -> decltype(source.op(std::forward<OperatorFactory>(of))) {
- return source.op(std::forward<OperatorFactory>(of));
-}
-};
-struct not_chainable {
-};
-
-template<class T, class SourceObservable, class OperatorFactory>
-struct select_chain
-{
- typedef
- typename std::conditional<
- rxcpp::detail::is_lift_function_for<subscriber<T>, OperatorFactory>::value,
- lift_function,
- typename std::conditional<
- rxcpp::detail::is_operator_factory_for<SourceObservable, OperatorFactory>::value,
- operator_factory,
- not_chainable
- >::type
- >::type type;
-};
-
}
template<class T>
@@ -131,9 +99,9 @@ public:
}
template<class Subscriber>
- typename std::enable_if<!std::is_same<Subscriber, observer<T>>::value, void>::type
+ typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const {
- state->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer())));
+ state->on_subscribe(o.as_dynamic());
}
};
@@ -237,7 +205,7 @@ private:
typedef typename std::decay<Subscriber>::type subscriber_type;
- static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer");
+ static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
@@ -337,12 +305,12 @@ public:
/// this is intended to allow externally defined operators, that use make_subscriber, to be connected
/// into the expression.
///
- template<class Operator>
+ template<class ResultType, class Operator>
auto lift(Operator&& op) const
- -> observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>> {
- return observable<typename rxo::detail::lift<source_operator_type, Operator>::value_type, rxo::detail::lift<source_operator_type, Operator>>(
- rxo::detail::lift<source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
- static_assert(detail::is_lift_function_for<subscriber<T>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
+ -> observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>> {
+ return observable<typename rxo::detail::lift<ResultType, source_operator_type, Operator>::value_type, rxo::detail::lift<ResultType, source_operator_type, Operator>>(
+ rxo::detail::lift<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
+ static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
}
///
@@ -366,16 +334,16 @@ public:
///
template<class Predicate>
auto filter(Predicate p) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::filter<T, Predicate>(std::move(p)))) {
- return lift(rxo::detail::filter<T, Predicate>(std::move(p)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) {
+ return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)));
}
/// finally () ->
///
template<class LastCall>
auto finally(LastCall lc) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::finally<T, LastCall>(std::move(lc)))) {
- return lift(rxo::detail::finally<T, LastCall>(std::move(lc)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) {
+ return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)));
}
/// map (AKA Select) ->
@@ -383,48 +351,48 @@ public:
///
template<class Selector>
auto map(Selector&& s) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::map<T, Selector>(std::move(s)))) {
- return lift(rxo::detail::map<T, Selector>(std::move(s)));
+ -> decltype(EXPLICIT_THIS lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)))) {
+ return lift<typename rxo::detail::map<T, Selector>::value_type>(rxo::detail::map<T, Selector>(std::move(s)));
}
/// distinct_until_changed ->
/// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned.
///
auto distinct_until_changed() const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::distinct_until_changed<T>())) {
- return lift(rxo::detail::distinct_until_changed<T>());
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) {
+ return lift<T>(rxo::detail::distinct_until_changed<T>());
}
/// window ->
/// produce observables containing count items emitted by this observable
///
auto window(int count) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, count))) {
- return lift(rxo::detail::window<T>(count, count));
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) {
+ return lift<observable<T>>(rxo::detail::window<T>(count, count));
}
/// window ->
/// produce observables containing count items emitted by this observable
///
auto window(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::window<T>(count, skip))) {
- return lift(rxo::detail::window<T>(count, skip));
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) {
+ return lift<observable<T>>(rxo::detail::window<T>(count, skip));
}
/// buffer ->
/// collect count items from this observable and produce a vector of them to emit from the new observable that is returned.
///
auto buffer(int count) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, count))) {
- return lift(rxo::detail::buffer_count<T>(count, count));
+ -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) {
+ return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, count));
}
/// buffer ->
/// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned.
///
auto buffer(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::buffer_count<T>(count, skip))) {
- return lift(rxo::detail::buffer_count<T>(count, skip));
+ -> decltype(EXPLICIT_THIS lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) {
+ return lift<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip));
}
template<class Coordination>
@@ -747,8 +715,8 @@ public:
///
template<class Coordination>
auto observe_on(Coordination cn) const
- -> decltype(EXPLICIT_THIS lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) {
- return lift(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) {
+ return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
}
/// reduce ->
@@ -960,6 +928,11 @@ class observable<void, void>
{
~observable();
public:
+ template<class T, class OnSubscribe>
+ static auto create(OnSubscribe os)
+ -> decltype(rxs::create<T>(std::move(os))) {
+ return rxs::create<T>(std::move(os));
+ }
template<class T>
static auto range(T first = 0, T last = std::numeric_limits<T>::max(), ptrdiff_t step = 1)
-> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
@@ -1087,8 +1060,8 @@ public:
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
//
@@ -1097,8 +1070,8 @@ auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFa
//
template<class T, class SourceOperator, class OperatorFactory>
auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::observable<T, SourceOperator>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp
index 805cb94..1b82f40 100644
--- a/Rx/v2/src/rxcpp/rx-predef.hpp
+++ b/Rx/v2/src/rxcpp/rx-predef.hpp
@@ -7,14 +7,12 @@
#include "rx-includes.hpp"
-auto rxcpp_trace_activity(...) -> rxcpp::trace_noop;
-
namespace rxcpp {
//
// create a typedef for rxcpp_trace_type to override the default
//
-auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))& {
+inline auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))& {
static decltype(rxcpp_trace_activity(trace_tag())) trace;
return trace;
}
diff --git a/Rx/v2/src/rxcpp/rx-sources.hpp b/Rx/v2/src/rxcpp/rx-sources.hpp
index b4205ff..c9fb73f 100644
--- a/Rx/v2/src/rxcpp/rx-sources.hpp
+++ b/Rx/v2/src/rxcpp/rx-sources.hpp
@@ -35,6 +35,7 @@ namespace rxs=sources;
}
+#include "sources/rx-create.hpp"
#include "sources/rx-range.hpp"
#include "sources/rx-iterate.hpp"
#include "sources/rx-interval.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 53fbd92..dbb3615 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -18,32 +18,37 @@ struct subscriber_base : public observer_base<T>, public subscription_base
template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T>
{
+ static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
+ static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
typedef subscriber<T, Observer> this_type;
typedef typename std::decay<Observer>::type observer_type;
composite_subscription lifetime;
observer_type destination;
+ trace_id id;
struct nextdetacher
{
~nextdetacher()
{
trace_activity().on_next_return(*that);
- if (that) {
+ if (do_unsubscribe) {
that->unsubscribe();
}
}
nextdetacher(const this_type* that)
: that(that)
+ , do_unsubscribe(true)
{
}
template<class U>
void operator()(U u) {
trace_activity().on_next_enter(*that, u);
that->destination.on_next(std::move(u));
- that = nullptr;
+ do_unsubscribe = false;
}
const this_type* that;
+ bool do_unsubscribe;
};
struct errordetacher
@@ -89,24 +94,31 @@ public:
subscriber(const this_type& o)
: lifetime(o.lifetime)
, destination(o.destination)
+ , id(o.id)
{
}
subscriber(this_type&& o)
: lifetime(std::move(o.lifetime))
, destination(std::move(o.destination))
+ , id(std::move(o.id))
{
}
template<class U>
- subscriber(composite_subscription cs, U&& o)
+ subscriber(trace_id id, composite_subscription cs, U&& o)
: lifetime(std::move(cs))
, destination(std::forward<U>(o))
+ , id(std::move(id))
{
+ static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
+ static_assert(is_observer<U>::value, "must pass observer to subscriber");
+ trace_activity().create_subscriber(*this);
}
this_type& operator=(this_type o) {
lifetime = std::move(o.lifetime);
destination = std::move(o.destination);
+ id = std::move(o.id);
return *this;
}
@@ -122,9 +134,12 @@ public:
composite_subscription& get_subscription() {
return lifetime;
}
+ trace_id get_id() const {
+ return id;
+ }
subscriber<T> as_dynamic() const {
- return subscriber<T>(lifetime, destination.as_dynamic());
+ return subscriber<T>(id, lifetime, destination.as_dynamic());
}
// observer
@@ -192,7 +207,7 @@ auto make_subscriber()
-> typename std::enable_if<
detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>>::type {
- return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
}
@@ -201,14 +216,14 @@ template<class T, class I>
auto make_subscriber(
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(composite_subscription(), o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class Observer>
auto make_subscriber(const Observer& o)
-> typename std::enable_if<
is_observer<Observer>::value,
subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(composite_subscription(), o);
+ return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class Observer>
auto make_subscriber(const Observer& o)
@@ -218,14 +233,14 @@ auto make_subscriber(const Observer& o)
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(composite_subscription(), o);
+ return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class OnNext>
auto make_subscriber(const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
}
@@ -235,7 +250,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe)
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
}
@@ -245,7 +260,7 @@ auto make_subscriber(const OnNext& on, const OnCompleted& oc)
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
}
@@ -256,7 +271,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
}
@@ -267,7 +282,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
template<class T>
auto make_subscriber(const composite_subscription& cs)
-> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
- return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
}
@@ -276,14 +291,21 @@ template<class T, class I>
auto make_subscriber(const composite_subscription& cs,
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(cs, o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
+}
+template<class T, class I>
+auto make_subscriber(const composite_subscription& cs,
+ const subscriber<T, I>& s)
+ -> subscriber<T, I> {
+ return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
}
template<class T, class Observer>
auto make_subscriber(const composite_subscription& cs, const Observer& o)
-> typename std::enable_if<
+ !is_subscriber<Observer>::value &&
is_observer<Observer>::value,
subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(cs, o);
+ return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
}
template<class T, class Observer>
auto make_subscriber(const composite_subscription& cs, const Observer& o)
@@ -293,14 +315,14 @@ auto make_subscriber(const composite_subscription& cs, const Observer& o)
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(cs, o);
+ return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o));
}
template<class T, class OnNext>
auto make_subscriber(const composite_subscription& cs, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
}
@@ -310,7 +332,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
}
@@ -320,7 +342,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
}
@@ -331,7 +353,153 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+}
+
+// explicit id
+//
+
+template<class T>
+auto make_subscriber(trace_id id)
+ -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
+ static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
+}
+
+template<class T>
+auto make_subscriber(trace_id id, const composite_subscription& cs)
+ -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), cs,
+ observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
+ static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
+}
+
+template<class T, class I>
+auto make_subscriber(trace_id id,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o);
+}
+template<class T, class I>
+auto make_subscriber(trace_id id, const composite_subscription& cs,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), cs, o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ return subscriber<T, Observer>(std::move(id), composite_subscription(), o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ return subscriber<T, Observer>(std::move(id), cs, o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ return subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
+}
+template<class T, class OnNext>
+auto make_subscriber(trace_id id, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+}
+template<class T, class OnNext>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+}
+template<class T, class OnNext, class OnError>
+auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+}
+template<class T, class OnNext, class OnError>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+}
+template<class T, class OnNext, class OnCompleted>
+auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+}
+template<class T, class OnNext, class OnCompleted>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+}
+template<class T, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+}
+template<class T, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
}
@@ -342,15 +510,37 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
template<class T, class OtherT, class OtherObserver, class I>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr,
const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(scbr.get_subscription(), o);
+ -> subscriber<T, observer<T, I>> {
+ auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class I>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
-> typename std::enable_if<
+ !is_subscription<Observer>::value &&
is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(scbr.get_subscription(), o);
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
@@ -359,37 +549,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observ
!is_subscriber<Observer>::value &&
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(scbr.get_subscription(), o);
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
@@ -397,24 +642,56 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class I>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs,
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(cs, o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
+}
+template<class T, class OtherT, class OtherObserver, class I>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), cs, o);
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
-> typename std::enable_if<
is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(cs, o);
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(std::move(id), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
@@ -423,37 +700,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos
!is_subscriber<Observer>::value &&
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(cs, o);
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
@@ -461,18 +793,48 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
-// override lifetime
-//
template<class T, class Observer>
auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs)
-> subscriber<T, Observer> {
- return subscriber<T, Observer>(cs, scbr.get_observer());
+ auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class Observer>
+auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs)
+ -> subscriber<T, Observer> {
+ auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
+}
+
+template<class T, class Observer>
+auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id)
+ -> subscriber<T, Observer> {
+ auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
}
}
diff --git a/Rx/v2/src/rxcpp/rx-test.hpp b/Rx/v2/src/rxcpp/rx-test.hpp
index cf4841b..542c55b 100644
--- a/Rx/v2/src/rxcpp/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/rx-test.hpp
@@ -42,7 +42,7 @@ struct test_source
static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber.");
- ts->on_subscribe(make_subscriber<T>(o, make_observer_dynamic<T>(o.get_observer())));
+ ts->on_subscribe(o.as_dynamic());
}
};
@@ -111,8 +111,8 @@ namespace rxt=test;
//
template<class T, class OperatorFactory>
auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
//
@@ -121,8 +121,8 @@ auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFac
//
template<class T, class OperatorFactory>
auto operator | (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
- -> decltype(rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of))) {
- return rxcpp::detail::select_chain<T, rxcpp::test::testable_observable<T>, OperatorFactory>::type::chain(source, std::forward<OperatorFactory>(of));
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
}
#include "schedulers/rx-test.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-trace.hpp b/Rx/v2/src/rxcpp/rx-trace.hpp
index 3da4ec8..e72caaf 100644
--- a/Rx/v2/src/rxcpp/rx-trace.hpp
+++ b/Rx/v2/src/rxcpp/rx-trace.hpp
@@ -5,10 +5,40 @@
#if !defined(RXCPP_RX_TRACE_HPP)
#define RXCPP_RX_TRACE_HPP
+#include <iostream>
#include <exception>
+#include <atomic>
namespace rxcpp {
+struct trace_id
+{
+ static inline trace_id make_next_id_subscriber() {
+ static std::atomic<unsigned long> id(0xB0000000);
+ return trace_id{++id};
+ }
+ unsigned long id;
+};
+
+inline bool operator==(const trace_id& lhs, const trace_id& rhs) {
+ return lhs.id == rhs.id;
+}
+inline bool operator!=(const trace_id& lhs, const trace_id& rhs) {
+ return !(lhs==rhs);
+}
+
+inline bool operator<(const trace_id& lhs, const trace_id& rhs) {
+ if ((lhs.id & 0xF0000000) != (rhs.id & 0xF0000000)) std::terminate();
+ return lhs.id < rhs.id;
+}
+inline bool operator>(const trace_id& lhs, const trace_id& rhs) {
+ return rhs<lhs;
+}
+
+inline std::ostream& operator<< (std::ostream& os, const trace_id& id) {
+ return os << std::hex << id.id << std::dec;
+}
+
struct trace_noop
{
template<class Worker, class Schedulable>
@@ -32,6 +62,9 @@ struct trace_noop
template<class Observable>
inline void subscribe_return(const Observable& o) {}
+ template<class SubscriberFrom, class SubscriberTo>
+ inline void connect(const SubscriberFrom&, const SubscriberTo&) {}
+
template<class OperatorSource, class OperatorChain, class Subscriber, class SubscriberLifted>
inline void lift_enter(const OperatorSource&, const OperatorChain&, const Subscriber&, const SubscriberLifted&) {}
template<class OperatorSource, class OperatorChain>
@@ -52,6 +85,9 @@ struct trace_noop
template<class SubscriptionState>
inline void subscription_remove_return(const SubscriptionState&) {}
+ template<class Subscriber>
+ inline void create_subscriber(const Subscriber&) {}
+
template<class Subscriber, class T>
inline void on_next_enter(const Subscriber&, const T&) {}
template<class Subscriber>
@@ -72,7 +108,7 @@ struct trace_tag {};
}
-auto rxcpp_trace_activity(...) -> rxcpp::trace_noop;
+inline auto rxcpp_trace_activity(...) -> rxcpp::trace_noop;
#endif
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 06da93e..9e613c4 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -261,27 +261,85 @@ struct plus
{ return std::forward<LHS>(lhs) + std::forward<RHS>(rhs); }
};
-template<class OStream>
-struct println_function
+namespace detail {
+template<class OStream, class Delimit>
+struct print_function
{
OStream& os;
- println_function(OStream& os) : os(os) {}
+ Delimit delimit;
+ print_function(OStream& os, Delimit d) : os(os), delimit(std::move(d)) {}
template<class... TN>
void operator()(const TN&... tn) const {
bool inserts[] = {(os << tn, true)...};
- os << std::endl;
+ delimit();
}
template<class... TN>
void operator()(const std::tuple<TN...>& tpl) const {
- apply(tpl, *this);
+ rxcpp::util::apply(tpl, *this);
}
};
+
+template<class OStream>
+struct endline
+{
+ OStream& os;
+ endline(OStream& os) : os(os) {}
+ void operator()() const {
+ os << std::endl;
+ }
+};
+
+template<class OStream, class ValueType>
+struct insert_value
+{
+ OStream& os;
+ ValueType value;
+ insert_value(OStream& os, ValueType v) : os(os), value(std::move(v)) {}
+ void operator()() const {
+ os << value;
+ }
+};
+
+template<class OStream, class Function>
+struct insert_function
+{
+ OStream& os;
+ Function call;
+ insert_function(OStream& os, Function f) : os(os), call(std::move(f)) {}
+ void operator()() const {
+ call(os);
+ }
+};
+
+}
+
+template<class OStream>
+auto endline(OStream& os)
+ -> detail::endline<OStream> {
+ return detail::endline<OStream>(os);
+}
+
+template<class OStream, class Delimit>
+auto print(OStream& os, Delimit d)
+ -> decltype(d(), detail::print_function<OStream, Delimit>(os, std::move(d))) {
+ return detail::print_function<OStream, Delimit>(os, std::move(d));
+}
+template<class OStream, class Delimit>
+auto print(OStream& os, Delimit d)
+ -> decltype(d(os), detail::print_function<OStream, detail::insert_function<OStream, Delimit>>(os, detail::insert_function<OStream, Delimit>(os, std::move(d)))) {
+ return detail::print_function<OStream, detail::insert_function<OStream, Delimit>>(os, detail::insert_function<OStream, Delimit>(os, std::move(d)));
+}
template<class OStream>
auto println(OStream& os)
- -> println_function<OStream> {
- return println_function<OStream>(os);
+ -> decltype(print(os, endline(os))) {
+ return print(os, endline(os));
+}
+template<class OStream, class DelimitValue>
+auto print_delimited_by(OStream& os, DelimitValue dv)
+ -> detail::print_function<OStream, detail::insert_value<OStream, DelimitValue>> {
+ return detail::print_function<OStream, detail::insert_value<OStream, DelimitValue>>(os, detail::insert_value<OStream, DelimitValue>(os, std::move(dv)));
}
namespace detail {
diff --git a/Rx/v2/src/rxcpp/sources/rx-create.hpp b/Rx/v2/src/rxcpp/sources/rx-create.hpp
new file mode 100644
index 0000000..2c50e98
--- /dev/null
+++ b/Rx/v2/src/rxcpp/sources/rx-create.hpp
@@ -0,0 +1,55 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_SOURCES_RX_CREATE_HPP)
+#define RXCPP_SOURCES_RX_CREATE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace sources {
+
+namespace detail {
+
+template<class T, class OnSubscribe>
+struct create : public source_base<T>
+{
+ typedef create<T, OnSubscribe> this_type;
+
+ typedef typename std::decay<OnSubscribe>::type on_subscribe_type;
+
+ on_subscribe_type on_subscribe_function;
+
+ create(on_subscribe_type os)
+ : on_subscribe_function(std::move(os))
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(Subscriber o) const {
+
+ on_exception(
+ [&](){
+ this->on_subscribe_function(o);
+ return true;
+ },
+ o);
+ }
+};
+
+}
+
+template<class T, class OnSubscribe>
+auto create(OnSubscribe os)
+ -> observable<T, detail::create<T, OnSubscribe>> {
+ return observable<T, detail::create<T, OnSubscribe>>(
+ detail::create<T, OnSubscribe>(std::move(os)));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/sources/rx-error.hpp b/Rx/v2/src/rxcpp/sources/rx-error.hpp
index 4b2c4bb..5014c7c 100644
--- a/Rx/v2/src/rxcpp/sources/rx-error.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-error.hpp
@@ -44,7 +44,7 @@ struct error : public source_base<T>
// creates a worker whose lifetime is the same as this subscription
auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
- auto controller = coordinator.get_output().get_worker();
+ auto controller = coordinator.get_worker();
auto exception = initial.exception;
auto producer = [=](const rxsc::schedulable&){
diff --git a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp
index 539f8d6..ff3becb 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp
@@ -49,6 +49,10 @@ public:
{
}
+ subscriber<T> get_subscriber() const {
+ return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic();
+ }
+
T get_value() const {
return state->get();
}
@@ -65,13 +69,11 @@ public:
template<class T>
class behavior
{
- composite_subscription lifetime;
detail::behavior_observer<T> s;
public:
explicit behavior(T f, composite_subscription cs = composite_subscription())
- : lifetime(std::move(cs))
- , s(std::move(f), lifetime)
+ : s(std::move(f), cs)
{
}
@@ -84,15 +86,16 @@ public:
}
subscriber<T> get_subscriber() const {
- return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::behavior_observer<T>>(s)));
+ return s.get_subscriber();
}
observable<T> get_observable() const {
- return make_observable_dynamic<T>([this](subscriber<T> o){
- if (lifetime.is_subscribed()) {
+ auto keepAlive = s;
+ return make_observable_dynamic<T>([=](subscriber<T> o){
+ if (keepAlive.get_subscription().is_subscribed()) {
o.on_next(get_value());
}
- this->s.add(std::move(o));
+ keepAlive.add(s.get_subscriber(), std::move(o));
});
}
};
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index eff91c9..e07cc69 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -77,12 +77,15 @@ class multicast_observer
{
explicit binder_type(composite_subscription cs)
: state(std::make_shared<state_type>(cs))
+ , id(trace_id::make_next_id_subscriber())
, current_generation(0)
{
}
std::shared_ptr<state_type> state;
+ trace_id id;
+
// used to avoid taking lock in on_next
mutable int current_generation;
mutable std::shared_ptr<completer_type> current_completer;
@@ -93,18 +96,29 @@ class multicast_observer
std::shared_ptr<binder_type> b;
-
-
public:
+ typedef subscriber<T, observer<T, detail::multicast_observer<T>>> input_subscriber_type;
+
explicit multicast_observer(composite_subscription cs)
: b(std::make_shared<binder_type>(cs))
{
}
+ trace_id get_id() const {
+ return b->id;
+ }
+ composite_subscription get_subscription() const {
+ return b->state->lifetime;
+ }
+ input_subscriber_type get_subscriber() const {
+ return make_subscriber<T>(get_id(), get_subscription(), observer<T, detail::multicast_observer<T>>(*this));
+ }
bool has_observers() const {
std::unique_lock<std::mutex> guard(b->state->lock);
return b->current_completer && !b->current_completer->observers.empty();
}
- void add(observer_type o) const {
+ template<class SubscriberFrom>
+ void add(const SubscriberFrom& sf, observer_type o) const {
+ trace_activity().connect(sf, o);
std::unique_lock<std::mutex> guard(b->state->lock);
switch (b->state->current) {
case mode::Casting:
@@ -197,17 +211,15 @@ public:
template<class T>
class subject
{
- composite_subscription lifetime;
detail::multicast_observer<T> s;
public:
subject()
- : s(lifetime)
+ : s(composite_subscription())
{
}
explicit subject(composite_subscription cs)
- : lifetime(cs)
- , s(cs)
+ : s(cs)
{
}
@@ -216,13 +228,13 @@ public:
}
subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const {
- return make_subscriber<T>(lifetime, observer<T, detail::multicast_observer<T>>(s));
+ return s.get_subscriber();
}
observable<T> get_observable() const {
auto keepAlive = s;
return make_observable_dynamic<T>([=](subscriber<T> o){
- keepAlive.add(std::move(o));
+ keepAlive.add(s.get_subscriber(), std::move(o));
});
}
};
diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
index c28fa91..564f564 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
@@ -143,6 +143,10 @@ public:
state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
}
+ subscriber<T> get_subscriber() const {
+ return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
+ }
+
template<class V>
void on_next(V v) const {
state->on_next(std::move(v));
@@ -160,13 +164,11 @@ public:
template<class T, class Coordination>
class synchronize
{
- composite_subscription lifetime;
detail::synchronize_observer<T, Coordination> s;
public:
explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription())
- : lifetime(composite_subscription())
- , s(std::move(cn), std::move(cs), lifetime)
+ : s(std::move(cn), std::move(cs), composite_subscription())
{
}
@@ -175,12 +177,13 @@ public:
}
subscriber<T> get_subscriber() const {
- return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::synchronize_observer<T, Coordination>>(s)));
+ return s.get_subscriber();
}
observable<T> get_observable() const {
- return make_observable_dynamic<T>([this](subscriber<T> o){
- this->s.add(std::move(o));
+ auto keepAlive = s;
+ return make_observable_dynamic<T>([=](subscriber<T> o){
+ keepAlive.add(s.get_subscriber(), std::move(o));
});
}
};
diff --git a/Rx/v2/test/operators/lift.cpp b/Rx/v2/test/operators/lift.cpp
index 7cf35b6..0496019 100644
--- a/Rx/v2/test/operators/lift.cpp
+++ b/Rx/v2/test/operators/lift.cpp
@@ -57,8 +57,8 @@ struct liftfilter
dest.on_completed();
}
- static rx::subscriber<value_type, this_type> make(const dest_type& d, const test_type& t) {
- return rx::make_subscriber<value_type>(d, this_type(d, t));
+ static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
+ return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
}
};
@@ -120,7 +120,7 @@ SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"
auto res = w.start(
[&xs, &invoked]() {
return xs
- .lift(liftfilter([&invoked](int x) {
+ .lift<int>(liftfilter([&invoked](int x) {
invoked++;
return IsPrime(x);
}))
@@ -183,10 +183,10 @@ SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stre
auto res = w.start(
[&xs, &invoked]() {
return xs
- >> liftfilter([&invoked](int x) {
+ >> rxo::lift<int>(liftfilter([&invoked](int x) {
invoked++;
return IsPrime(x);
- })
+ }))
// forget type to workaround lambda deduction bug on msvc 2013
>> rxo::as_dynamic();
},
@@ -250,7 +250,7 @@ SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][
return IsPrime(x);
};
return xs
- .lift([=](rx::subscriber<int> dest){
+ .lift<int>([=](rx::subscriber<int> dest){
// VS2013 deduction issue requires dynamic (type-forgetting)
return rx::make_subscriber<int>(
dest,
diff --git a/Rx/v2/test/sources/create.cpp b/Rx/v2/test/sources/create.cpp
new file mode 100644
index 0000000..063ff7e
--- /dev/null
+++ b/Rx/v2/test/sources/create.cpp
@@ -0,0 +1,46 @@
+#include "rxcpp/rx.hpp"
+namespace rx=rxcpp;
+namespace rxu=rxcpp::util;
+namespace rxsc=rxcpp::schedulers;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("create stops on completion", "[create][sources]"){
+ GIVEN("a test cold observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ long invoked = 0;
+
+ WHEN("created"){
+
+ auto res = w.start(
+ [&]() {
+ return rx::observable<>::create<int>(
+ [&](const rx::subscriber<int>& s){
+ invoked++;
+ s.on_next(1);
+ s.on_next(2);
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains all items"){
+ auto required = rxu::to_vector({
+ on.on_next(200, 1),
+ on.on_next(200, 2)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("create was called until completed"){
+ REQUIRE(1 == invoked);
+ }
+ }
+ }
+}
diff --git a/Rx/v2/test/sources/defer.cpp b/Rx/v2/test/sources/defer.cpp
index bf4779e..e9e62c2 100644
--- a/Rx/v2/test/sources/defer.cpp
+++ b/Rx/v2/test/sources/defer.cpp
@@ -6,7 +6,7 @@ namespace rxsc=rxcpp::schedulers;
#include "rxcpp/rx-test.hpp"
#include "catch.hpp"
-SCENARIO("defer stops on completion", "[defer][operators]"){
+SCENARIO("defer stops on completion", "[defer][sources]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
diff --git a/Rx/v2/test/sources/interval.cpp b/Rx/v2/test/sources/interval.cpp
index 49d68e2..ae1a149 100644
--- a/Rx/v2/test/sources/interval.cpp
+++ b/Rx/v2/test/sources/interval.cpp
@@ -7,7 +7,7 @@ namespace rxsc=rxcpp::schedulers;
#include "catch.hpp"
-SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf]"){
+SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf][sources]"){
GIVEN("schedule_periodically"){
WHEN("the period is 1sec and the initial is 2sec"){
using namespace std::chrono;
@@ -29,7 +29,7 @@ SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf]")
}
}
-SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][long][perf]"){
+SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][long][perf][sources]"){
GIVEN("schedule_periodically_duration"){
WHEN("the period is 1sec and the initial is 2sec"){
using namespace std::chrono;
@@ -72,7 +72,7 @@ SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][l
}
}
-SCENARIO("intervals", "[hide][periodically][interval][scheduler][long][perf]"){
+SCENARIO("intervals", "[hide][periodically][interval][scheduler][long][perf][sources]"){
GIVEN("10 intervals of 1 seconds"){
WHEN("the period is 1sec and the initial is 2sec"){
using namespace std::chrono;
diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp
index b979c4f..6919b63 100644
--- a/Rx/v2/test/subscriptions/subscription.cpp
+++ b/Rx/v2/test/subscriptions/subscription.cpp
@@ -139,11 +139,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug
std::atomic<int> v(0);
auto s0 = rxs::range(1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
@@ -154,11 +154,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug
}));
auto s1 = rxs::range(values + 1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
@@ -169,11 +169,11 @@ SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug
}));
auto s2 = rxs::range((values * 2) + 1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
@@ -249,10 +249,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o
std::atomic<int> v(0);
auto s0 = rxs::range(1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
@@ -263,10 +263,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o
}));
auto s1 = rxs::range(values + 1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
@@ -277,10 +277,10 @@ SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][o
}));
auto s2 = rxs::range((values * 2) + 1, es)
.take(values)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
- .lift(liftrequirecompletion)
+ .lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int i){
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 9e34654..3741630 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -33,6 +33,7 @@ set(TEST_SOURCES
${TEST_DIR}/subscriptions/observer.cpp
${TEST_DIR}/subscriptions/subscription.cpp
${TEST_DIR}/subjects/subject.cpp
+ ${TEST_DIR}/sources/create.cpp
${TEST_DIR}/sources/defer.cpp
${TEST_DIR}/sources/interval.cpp
${TEST_DIR}/operators/buffer.cpp