summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-03-19 18:11:40 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-03-19 18:11:40 -0700
commitcb42db9eaa8dc288e42584d1ae892b8b811a28f9 (patch)
treee70eab217c912b45910e914fd6f6c257b4a444a1 /Rx/v2/src
parent64de52450d8a449257d06d2d33540acefa5a8728 (diff)
downloadRxCpp-cb42db9eaa8dc288e42584d1ae892b8b811a28f9.tar.gz
switch subscribe from observer to subscriber
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-filter.hpp33
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp72
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-map.hpp34
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-subscribe.hpp145
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-notification.hpp69
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp172
-rw-r--r--Rx/v2/src/rxcpp/rx-observer.hpp446
-rw-r--r--Rx/v2/src/rxcpp/rx-predef.hpp13
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp204
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp27
-rw-r--r--Rx/v2/src/rxcpp/rx-test.hpp46
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp81
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-test.hpp121
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-range.hpp4
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp10
16 files changed, 675 insertions, 804 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-filter.hpp b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
index ee61646..28f2caa 100644
--- a/Rx/v2/src/rxcpp/operators/rx-filter.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
@@ -16,23 +16,25 @@ namespace detail {
template<class T, class Observable, class Predicate>
struct filter : public operator_base<T>
{
- Observable source;
- Predicate test;
+ typedef typename std::decay<Observable>::type source_type;
+ typedef typename std::decay<Predicate>::type test_type;
+ source_type source;
+ test_type test;
template<class CT, class CP>
static auto check(int) -> decltype((*(CP*)nullptr)(*(CT*)nullptr));
template<class CT, class CP>
static void check(...);
- filter(Observable o, Predicate p)
+ filter(source_type o, test_type p)
: source(std::move(o))
, test(std::move(p))
{
- static_assert(std::is_convertible<decltype(check<T, Predicate>(0)), bool>::value, "filter Predicate must be a function with the signature bool(T)");
+ static_assert(std::is_convertible<decltype(check<T, test_type>(0)), bool>::value, "filter Predicate must be a function with the signature bool(T)");
}
- template<class I>
- void on_subscribe(observer<T, I> o) {
+ template<class Subscriber>
+ void on_subscribe(Subscriber o) {
source.subscribe(
- o.get_subscription(),
+ o,
// on_next
[this, o](T t) {
bool filtered = false;
@@ -61,23 +63,24 @@ struct filter : public operator_base<T>
template<class Predicate>
class filter_factory
{
- Predicate predicate;
+ typedef typename std::decay<Predicate>::type test_type;
+ test_type predicate;
public:
- filter_factory(Predicate p) : predicate(std::move(p)) {}
+ filter_factory(test_type p) : predicate(std::move(p)) {}
template<class Observable>
- auto operator()(Observable source)
- -> observable<typename Observable::value_type, filter<typename Observable::value_type, Observable, Predicate>> {
- return observable<typename Observable::value_type, filter<typename Observable::value_type, Observable, Predicate>>(
- filter<typename Observable::value_type, Observable, Predicate>(source, std::move(predicate)));
+ auto operator()(Observable&& source)
+ -> observable<typename std::decay<Observable>::type::value_type, filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>> {
+ return observable<typename std::decay<Observable>::type::value_type, filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>>(
+ filter<typename std::decay<Observable>::type::value_type, Observable, Predicate>(std::forward<Observable>(source), std::move(predicate)));
}
};
}
template<class Predicate>
-auto filter(Predicate p)
+auto filter(Predicate&& p)
-> detail::filter_factory<Predicate> {
- return detail::filter_factory<Predicate>(std::move(p));
+ return detail::filter_factory<Predicate>(std::forward<Predicate>(p));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index 7ff88e1..a52d3c8 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -15,7 +15,11 @@ namespace detail {
template<class Observable, class CollectionSelector, class ResultSelector>
struct flat_map_traits {
- typedef typename Observable::value_type source_value_type;
+ typedef typename std::decay<Observable>::type source_type;
+ typedef typename std::decay<CollectionSelector>::type collection_selector_type;
+ typedef typename std::decay<ResultSelector>::type result_selector_type;
+
+ typedef typename source_type::value_type source_value_type;
struct tag_not_valid {};
template<class CV, class CCS>
@@ -23,9 +27,9 @@ struct flat_map_traits {
template<class CV, class CCS>
static tag_not_valid collection_check(...);
- static_assert(!std::is_same<decltype(collection_check<source_value_type, CollectionSelector>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
+ static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
- typedef decltype((*(CollectionSelector*)nullptr)((*(source_value_type*)nullptr))) collection_type;
+ typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
@@ -36,9 +40,9 @@ struct flat_map_traits {
template<class CV, class CCV, class CRS>
static tag_not_valid result_check(...);
- static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, ResultSelector>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
+ static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
- typedef decltype((*(ResultSelector*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
+ typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
};
template<class Observable, class CollectionSelector, class ResultSelector>
@@ -48,39 +52,46 @@ struct flat_map
typedef flat_map<Observable, CollectionSelector, ResultSelector> this_type;
typedef flat_map_traits<Observable, CollectionSelector, ResultSelector> traits;
+ typedef typename traits::source_type source_type;
+ typedef typename traits::collection_selector_type collection_selector_type;
+ typedef typename traits::result_selector_type result_selector_type;
+
+ typedef typename traits::source_value_type source_value_type;
+ typedef typename traits::collection_type collection_type;
+ typedef typename traits::collection_value_type collection_value_type;
+
struct values
{
- values(Observable o, CollectionSelector s, ResultSelector rs)
+ values(source_type o, collection_selector_type s, result_selector_type rs)
: source(std::move(o))
, selectCollection(std::move(s))
, selectResult(std::move(rs))
{
}
- Observable source;
- CollectionSelector selectCollection;
- ResultSelector selectResult;
+ source_type source;
+ collection_selector_type selectCollection;
+ result_selector_type selectResult;
};
values initial;
- typedef typename traits::source_value_type source_value_type;
- typedef typename traits::collection_type collection_type;
- typedef typename traits::collection_value_type collection_value_type;
-
- flat_map(Observable o, CollectionSelector s, ResultSelector rs)
+ flat_map(source_type o, collection_selector_type s, result_selector_type rs)
: initial(std::move(o), std::move(s), std::move(rs))
{
}
- template<class I>
- void on_subscribe(observer<typename this_type::value_type, I> o) {
+ template<class Observer>
+ void on_subscribe(Observer&& o) {
+ static_assert(is_observer<Observer>::value, "subscribe must be passed an observer");
+
+ typedef typename std::decay<Observer>::type output_type;
- typedef observer<typename this_type::value_type, I> output_type;
struct state_type
: public std::enable_shared_from_this<state_type>
, public values
{
state_type(values i, output_type oarg)
: values(std::move(i))
+ , pendingCompletions(0)
, out(std::move(oarg))
{
}
@@ -95,19 +106,20 @@ struct flat_map
output_type out;
};
// take a copy of the values for each subscription
- auto state = std::shared_ptr<state_type>(new state_type(initial, std::move(o)));
+ auto state = std::shared_ptr<state_type>(new state_type(initial, std::forward<Observer>(o)));
composite_subscription outercs;
// when the out observer is unsubscribed all the
// inner subscriptions are unsubscribed as well
- state->out.get_subscription().add(outercs);
+ state->out.add(outercs);
++state->pendingCompletions;
// this subscribe does not share the observer subscription
// so that when it is unsubscribed the observer can be called
// until the inner subscriptions have finished
state->source.subscribe(
+ state->out,
outercs,
// on_next
[state](source_value_type st) {
@@ -124,16 +136,17 @@ struct flat_map
// when the out observer is unsubscribed all the
// inner subscriptions are unsubscribed as well
- auto innercstoken = state->out.get_subscription().add(innercs);
+ auto innercstoken = state->out.add(innercs);
innercs.add(make_subscription([state, innercstoken](){
- state->out.get_subscription().remove(innercstoken);
+ state->out.remove(innercstoken);
}));
++state->pendingCompletions;
// this subscribe does not share the source subscription
// so that when it is unsubscribed the source will continue
selectedCollection->subscribe(
+ state->out,
innercs,
// on_next
[state, st](collection_value_type ct) {
@@ -181,29 +194,32 @@ struct flat_map
template<class CollectionSelector, class ResultSelector>
class flat_map_factory
{
- CollectionSelector selectorCollection;
- ResultSelector selectorResult;
+ typedef typename std::decay<CollectionSelector>::type collection_selector_type;
+ typedef typename std::decay<ResultSelector>::type result_selector_type;
+
+ collection_selector_type selectorCollection;
+ result_selector_type selectorResult;
public:
- flat_map_factory(CollectionSelector s, ResultSelector rs)
+ flat_map_factory(collection_selector_type s, result_selector_type rs)
: selectorCollection(std::move(rs))
, selectorResult(std::move(s))
{
}
template<class Observable>
- auto operator()(Observable source)
+ auto operator()(Observable&& source)
-> observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>> {
return observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>>(
- flat_map<Observable, CollectionSelector, ResultSelector>(source, std::move(selectorCollection), std::move(selectorResult)));
+ flat_map<Observable, CollectionSelector, ResultSelector>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult)));
}
};
}
template<class CollectionSelector, class ResultSelector>
-auto flat_map(CollectionSelector s, ResultSelector rs)
+auto flat_map(CollectionSelector&& s, ResultSelector&& rs)
-> detail::flat_map_factory<CollectionSelector, ResultSelector> {
- return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::move(s), std::move(rs));
+ return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp
index 939d6e3..91d6673 100644
--- a/Rx/v2/src/rxcpp/operators/rx-map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp
@@ -19,15 +19,18 @@ struct map
{
typedef map<Observable, Selector> this_type;
+ typedef typename std::decay<Observable>::type source_type;
+ typedef typename std::decay<Selector>::type select_type;
+
struct values
{
- values(Observable o, Selector s)
+ values(source_type o, select_type s)
: source(std::move(o))
, select(std::move(s))
{
}
- Observable source;
- Selector select;
+ source_type source;
+ select_type select;
};
values initial;
@@ -39,17 +42,17 @@ struct map
template<class CF, class CP>
static tag_not_valid check(...);
- static_assert(!std::is_same<decltype(check<source_value_type, Selector>(0)), tag_not_valid>::value, "map Selector must be a function with the signature map::value_type(map::source_value_type)");
+ static_assert(!std::is_same<decltype(check<source_value_type, select_type>(0)), tag_not_valid>::value, "map Selector must be a function with the signature map::value_type(map::source_value_type)");
- map(Observable o, Selector s)
+ map(source_type o, select_type s)
: initial(std::move(o), std::move(s))
{
}
- template<class I>
- void on_subscribe(observer<typename this_type::value_type, I> o) {
+ template<class Subscriber>
+ void on_subscribe(Subscriber o) {
- typedef observer<typename this_type::value_type, I> output_type;
+ typedef Subscriber output_type;
struct state_type
: public std::enable_shared_from_this<state_type>
, public values
@@ -65,7 +68,7 @@ struct map
auto state = std::shared_ptr<state_type>(new state_type(initial, std::move(o)));
state->source.subscribe(
- state->out.get_subscription(),
+ state->out,
// on_next
[state](typename this_type::source_value_type st) {
util::detail::maybe<typename this_type::value_type> selected;
@@ -92,23 +95,24 @@ struct map
template<class Selector>
class map_factory
{
- Selector selector;
+ typedef typename std::decay<Selector>::type select_type;
+ select_type selector;
public:
- map_factory(Selector p) : selector(std::move(p)) {}
+ map_factory(select_type p) : selector(std::move(p)) {}
template<class Observable>
- auto operator()(Observable source)
+ auto operator()(Observable&& source)
-> observable<typename map<Observable, Selector>::value_type, map<Observable, Selector>> {
return observable<typename map<Observable, Selector>::value_type, map<Observable, Selector>>(
- map<Observable, Selector>(source, std::move(selector)));
+ map<Observable, Selector>(std::forward<Observable>(source), std::move(selector)));
}
};
}
template<class Selector>
-auto map(Selector p)
+auto map(Selector&& p)
-> detail::map_factory<Selector> {
- return detail::map_factory<Selector>(std::move(p));
+ return detail::map_factory<Selector>(std::forward<Selector>(p));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
index 0a53734..cc25c1d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
@@ -13,118 +13,61 @@ namespace operators {
namespace detail {
-template<class Observer>
-class subscribe_to_observer_factory
-{
- Observer observer;
-public:
- subscribe_to_observer_factory(Observer o) : observer(std::move(o)) {}
- template<class Observable>
- auto operator()(Observable source)
- -> decltype(source.subscribe(std::move(observer))) {
- return source.subscribe(std::move(observer));
- }
-};
-template<class OnNext, class OnError, class OnCompleted>
-class subscribe_factory
-{
- OnNext onnext;
- OnError onerror;
- OnCompleted oncompleted;
-public:
- subscribe_factory(OnNext n, OnError e, OnCompleted c)
- : onnext(std::move(n))
- , onerror(std::move(e))
- , oncompleted(std::move(c))
- {}
- template<class Observable>
- auto operator()(Observable source)
- -> decltype(source.subscribe(make_observer<typename Observable::value_type>(std::move(onnext), std::move(onerror), std::move(oncompleted)))) {
- return source.subscribe(make_observer<typename Observable::value_type>(std::move(onnext), std::move(onerror), std::move(oncompleted)));
- }
-};
-template<class OnNext, class OnError, class OnCompleted>
-class subscribe_factory_chained
+template<class Subscriber>
+class subscribe_factory;
+
+template<class T, class I>
+class subscribe_factory<subscriber<T, I>>
{
- composite_subscription cs;
- OnNext onnext;
- OnError onerror;
- OnCompleted oncompleted;
+ subscriber<T, I> scrbr;
public:
- subscribe_factory_chained(OnNext n, OnError e, OnCompleted c)
- : cs(std::move(cs))
- , onnext(std::move(n))
- , onerror(std::move(e))
- , oncompleted(std::move(c))
+ subscribe_factory(subscriber<T, I> s)
+ : scrbr(std::move(s))
{}
template<class Observable>
- auto operator()(Observable source)
- -> decltype(source.subscribe(make_observer<typename Observable::value_type>(std::move(cs), std::move(onnext), std::move(onerror), std::move(oncompleted)))) {
- return source.subscribe(make_observer<typename Observable::value_type>(std::move(cs), std::move(onnext), std::move(onerror), std::move(oncompleted)));
+ auto operator()(Observable&& source)
+ -> decltype(std::forward<Observable>(source).subscribe(std::move(scrbr))) {
+ return std::forward<Observable>(source).subscribe(std::move(scrbr));
}
};
-template<class Observer>
-auto subscribe(Observer o, tag_observer&&)
- -> subscribe_to_observer_factory<Observer> {
- return subscribe_to_observer_factory<Observer>(std::move(o));
}
-struct tag_function {};
-template<class OnNext>
-auto subscribe(OnNext n, tag_function&&)
- -> subscribe_factory<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty> {
- return subscribe_factory<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty>(std::move(n), rxcpp::detail::OnErrorEmpty(), rxcpp::detail::OnCompletedEmpty());
+template<class T, class Arg0>
+auto subscribe(Arg0&& a0)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0)));
}
-
-template<class OnNext, class OnError>
-auto subscribe(OnNext n, OnError e, tag_function&&)
- -> subscribe_factory<OnNext, OnError, rxcpp::detail::OnCompletedEmpty> {
- return subscribe_factory<OnNext, OnError, rxcpp::detail::OnCompletedEmpty>(std::move(n), std::move(e), rxcpp::detail::OnCompletedEmpty());
+template<class T, class Arg0, class Arg1>
+auto subscribe(Arg0&& a0, Arg1&& a1)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
}
-
-template<class OnNext, class OnError, class OnCompleted>
-auto subscribe(OnNext n, OnError e, OnCompleted c, tag_function&&)
- -> subscribe_factory<OnNext, OnError, OnCompleted> {
- return subscribe_factory<OnNext, OnError, OnCompleted>(std::move(n), std::move(e), std::move(c));
-}
-
-template<class OnNext>
-auto subscribe(composite_subscription cs, OnNext n, tag_subscription&&)
- -> subscribe_factory_chained<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty> {
- return subscribe_factory_chained<OnNext, rxcpp::detail::OnErrorEmpty, rxcpp::detail::OnCompletedEmpty>(std::move(cs), std::move(n), rxcpp::detail::OnErrorEmpty(), rxcpp::detail::OnCompletedEmpty());
-}
-
-template<class OnNext, class OnError>
-auto subscribe(composite_subscription cs, OnNext n, OnError e, tag_subscription&&)
- -> subscribe_factory_chained<OnNext, OnError, rxcpp::detail::OnCompletedEmpty> {
- return subscribe_factory_chained<OnNext, OnError, rxcpp::detail::OnCompletedEmpty>(std::move(cs), std::move(n), std::move(e), rxcpp::detail::OnCompletedEmpty());
-}
-
-}
-
-template<class Arg>
-auto subscribe(Arg a)
- -> decltype(detail::subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_function>::type())) {
- return detail::subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_function>::type());
+template<class T, class Arg0, class Arg1, class Arg2>
+auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
}
-
-template<class Arg1, class Arg2>
-auto subscribe(Arg1 a1, Arg2 a2)
- -> decltype(detail::subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) {
- return detail::subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type());
+template<class T, class Arg0, class Arg1, class Arg2, class Arg3>
+auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
}
-
-template<class Arg1, class Arg2, class Arg3>
-auto subscribe(Arg1 a1, Arg2 a2, Arg3 a3)
- -> decltype(detail::subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) {
- return detail::subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type());
+template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4>
+auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)));
}
-
-template<class OnNext, class OnError, class OnCompleted>
-auto subscribe(composite_subscription cs, OnNext n, OnError e, OnCompleted c)
- -> detail::subscribe_factory<OnNext, OnError, OnCompleted> {
- return detail::subscribe_factory<OnNext, OnError, OnCompleted>(std::move(cs), std::move(n), std::move(e), std::move(c));
+template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5>
+auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))>
+ (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)));
}
namespace detail {
@@ -133,9 +76,9 @@ class dynamic_factory
{
public:
template<class Observable>
- auto operator()(Observable source)
- -> observable<typename Observable::value_type> {
- return observable<typename Observable::value_type>(source);
+ auto operator()(Observable&& source)
+ -> observable<typename std::decay<Observable>::type::value_type> {
+ return observable<typename std::decay<Observable>::type::value_type>(std::forward<Observable>(source));
}
};
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index e210800..9131716 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -94,10 +94,10 @@
#include "rx-predef.hpp"
#include "rx-subscription.hpp"
#include "rx-observer.hpp"
-#include "rx-notification.hpp"
#include "rx-scheduler.hpp"
#include "rx-regulator.hpp"
#include "rx-subscriber.hpp"
+#include "rx-notification.hpp"
#include "rx-sources.hpp"
#include "rx-subjects.hpp"
#include "rx-operators.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp
index 503c392..dd1ff78 100644
--- a/Rx/v2/src/rxcpp/rx-notification.hpp
+++ b/Rx/v2/src/rxcpp/rx-notification.hpp
@@ -46,7 +46,7 @@ template<typename T>
struct notification_base
: public std::enable_shared_from_this<notification_base<T>>
{
- typedef observer<T> observer_type;
+ typedef subscriber<T> observer_type;
typedef std::shared_ptr<notification_base<T>> type;
virtual ~notification_base() {}
@@ -75,9 +75,9 @@ private:
}
virtual bool equals(const typename base::type& other) const {
bool result = false;
- other->accept(make_observer_dynamic<T>([this, &result](T value) {
+ other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T value) {
result = this->value == value;
- }));
+ })));
return result;
}
virtual void accept(const typename base::observer_type& o) const {
@@ -103,9 +103,9 @@ private:
virtual bool equals(const typename base::type& other) const {
bool result = false;
// not trying to compare exceptions
- other->accept(make_observer_dynamic<T>(nullptr, [&result](std::exception_ptr){
+ other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](std::exception_ptr){
result = true;
- }));
+ })));
return result;
}
virtual void accept(const typename base::observer_type& o) const {
@@ -122,9 +122,9 @@ private:
}
virtual bool equals(const typename base::type& other) const {
bool result = false;
- other->accept(make_observer_dynamic<T>(nullptr, nullptr, [&result](){
+ other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
result = true;
- }));
+ })));
return result;
}
virtual void accept(const typename base::observer_type& o) const {
@@ -154,26 +154,49 @@ private:
return std::make_shared<on_error_notification>(ep);
}
+ struct on_next_factory
+ {
+ type operator()(T value) const {
+ return std::make_shared<on_next_notification>(std::move(value));
+ }
+ };
+
+ struct on_completed_factory
+ {
+ type operator()() const {
+ return std::make_shared<on_completed_notification>();
+ }
+ };
+
+ struct on_error_factory
+ {
+ template<typename Exception>
+ type operator()(Exception&& e) const {
+ return make_on_error(typename std::conditional<
+ std::is_same<typename std::decay<Exception>::type, std::exception_ptr>::value,
+ exception_ptr_tag, exception_tag>::type(),
+ std::forward<Exception>(e));
+ }
+ };
public:
- static
- type make_on_next(T value) {
- return std::make_shared<on_next_notification>(std::move(value));
- }
- static
- type make_on_completed() {
- return std::make_shared<on_completed_notification>();
- }
- template<typename Exception>
- static
- type make_on_error(Exception&& e) {
- return make_on_error(typename std::conditional<
- std::is_same<typename std::decay<Exception>::type, std::exception_ptr>::value,
- exception_ptr_tag, exception_tag>::type(),
- std::forward<Exception>(e));
- }
+ const static on_next_factory on_next;
+ const static on_completed_factory on_completed;
+ const static on_error_factory on_error;
};
template<class T>
+//static
+RXCPP_SELECT_ANY const typename notification<T>::on_next_factory notification<T>::on_next = notification<T>::on_next_factory();
+
+template<class T>
+//static
+RXCPP_SELECT_ANY const typename notification<T>::on_completed_factory notification<T>::on_completed = notification<T>::on_completed_factory();
+
+template<class T>
+//static
+RXCPP_SELECT_ANY const typename notification<T>::on_error_factory notification<T>::on_error = notification<T>::on_error_factory();
+
+template<class T>
bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) {
if (!lhs && !rhs) {return true;}
if (!lhs || !rhs) {return false;}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 6a9c02f..7b1f336 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -20,10 +20,23 @@ struct is_operator_factory_for
template<class CS, class CF>
static not_void check(...);
- typedef decltype(check<Source, F>(0)) detail_result;
+ typedef decltype(check<typename std::decay<Source>::type, typename std::decay<F>::type>(0)) detail_result;
static const bool value = !std::is_same<detail_result, not_void>::value;
};
+template<class Subscriber, class T>
+struct has_on_subscribe_for
+{
+ struct not_void {};
+ template<class CS, class CT>
+ static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
+ template<class CS, class CT>
+ static not_void check(...);
+
+ typedef decltype(check<typename std::decay<Subscriber>::type, T>(0)) detail_result;
+ static const bool value = std::is_same<detail_result, void>::value;
+};
+
}
template<class T>
@@ -33,7 +46,7 @@ class dynamic_observable
struct state_type
: public std::enable_shared_from_this<state_type>
{
- typedef std::function<void(observer<T>)> onsubscribe_type;
+ typedef std::function<void(subscriber<T>)> onsubscribe_type;
onsubscribe_type on_subscribe;
};
@@ -48,8 +61,9 @@ class dynamic_observable
}
template<class SO>
- void construct(SO so, rxs::tag_source&&) {
- state->on_subscribe = [so](observer<T> o) mutable {
+ void construct(SO&& source, rxs::tag_source&&) {
+ typename std::decay<SO>::type so = std::forward<SO>(source);
+ state->on_subscribe = [so](subscriber<T> o) mutable {
so.on_subscribe(std::move(o));
};
}
@@ -74,28 +88,29 @@ public:
typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
}
- void on_subscribe(observer<T> o) const {
+ void on_subscribe(subscriber<T> o) const {
state->on_subscribe(std::move(o));
}
- template<class Observer>
- typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type
- on_subscribe(Observer o) const {
- auto so = std::make_shared<Observer>(o);
- state->on_subscribe(make_observer_dynamic<T>(
- so->get_subscription(),
- // on_next
- [so](T t){
- so->on_next(t);
- },
- // on_error
- [so](std::exception_ptr e){
- so->on_error(e);
- },
- // on_completed
- [so](){
- so->on_completed();
- }));
+ template<class Subscriber>
+ typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
+ on_subscribe(Subscriber&& o) const {
+ auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
+ state->on_subscribe(make_subscriber<T>(
+ *so,
+ make_observer_dynamic<T>(
+ // on_next
+ [so](T t){
+ so->on_next(t);
+ },
+ // on_error
+ [so](std::exception_ptr e){
+ so->on_error(e);
+ },
+ // on_completed
+ [so](){
+ so->on_completed();
+ })));
}
};
@@ -112,17 +127,23 @@ class observable
protected:
typedef observable<T, SourceOperator> this_type;
- mutable SourceOperator source_operator;
+ typedef typename std::decay<SourceOperator>::type source_operator_type;
+ mutable source_operator_type source_operator;
private:
template<class U, class SO>
friend class observable;
- template<class Observer>
- auto detail_subscribe(Observer o, tag_observer&&) const
- -> decltype(make_subscription(o)) {
+ template<class Subscriber>
+ auto detail_subscribe(Subscriber&& scrbr) const
+ -> decltype(make_subscription(*(typename std::decay<Subscriber>::type*)nullptr)) {
- static_assert(is_observer<Observer>::value, "subscribe must be passed an observer");
+ typedef typename std::decay<Subscriber>::type subscriber_type;
+ subscriber_type o = std::forward<Subscriber>(scrbr);
+
+ static_assert(is_observer<subscriber_type>::value, "subscribe must be passed an observer");
+ static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
+ static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
if (!o.is_subscribed()) {
return make_subscription(o);
@@ -154,51 +175,20 @@ private:
return make_subscription(o);
}
- struct tag_function {};
- template<class OnNext>
- auto detail_subscribe(OnNext n, tag_function&&) const
- -> decltype(make_subscription( make_observer<T>(std::move(n)))) {
- return subscribe( make_observer<T>(std::move(n)));
- }
-
- template<class OnNext, class OnError>
- auto detail_subscribe(OnNext n, OnError e, tag_function&&) const
- -> decltype(make_subscription( make_observer<T>(std::move(n), std::move(e)))) {
- return subscribe( make_observer<T>(std::move(n), std::move(e)));
- }
-
- template<class OnNext, class OnError, class OnCompleted>
- auto detail_subscribe(OnNext n, OnError e, OnCompleted c, tag_function&&) const
- -> decltype(make_subscription( make_observer<T>(std::move(n), std::move(e), std::move(c)))) {
- return subscribe( make_observer<T>(std::move(n), std::move(e), std::move(c)));
- }
-
- template<class OnNext>
- auto detail_subscribe(composite_subscription cs, OnNext n, tag_subscription&&) const
- -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n)))) {
- return subscribe( make_observer<T>(std::move(cs), std::move(n)));
- }
-
- template<class OnNext, class OnError>
- auto detail_subscribe(composite_subscription cs, OnNext n, OnError e, tag_subscription&&) const
- -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n), std::move(e)))) {
- return subscribe( make_observer<T>(std::move(cs), std::move(n), std::move(e)));
- }
-
public:
typedef T value_type;
- static_assert(rxo::is_operator<SourceOperator>::value || rxs::is_source<SourceOperator>::value, "observable must wrap an operator or source");
+ static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
observable()
{
}
- explicit observable(const SourceOperator& o)
+ explicit observable(const source_operator_type& o)
: source_operator(o)
{
}
- explicit observable(SourceOperator&& o)
+ explicit observable(source_operator_type&& o)
: source_operator(std::move(o))
{
}
@@ -228,54 +218,66 @@ public:
return *this;
}
- template<class Arg>
- auto subscribe(Arg a) const
- -> decltype(detail_subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, tag_function>::type())) {
- return detail_subscribe(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, tag_function>::type());
+ template<class Arg0>
+ auto subscribe(Arg0&& a0) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0)));
+ }
+
+ template<class Arg0, class Arg1>
+ auto subscribe(Arg0&& a0, Arg1&& a1) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
+ }
+
+ template<class Arg0, class Arg1, class Arg2>
+ auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
}
- template<class Arg1, class Arg2>
- auto subscribe(Arg1 a1, Arg2 a2) const
- -> decltype(detail_subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type())) {
- return detail_subscribe(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type());
+ template<class Arg0, class Arg1, class Arg2, class Arg3>
+ auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
}
- template<class Arg1, class Arg2, class Arg3>
- auto subscribe(Arg1 a1, Arg2 a2, Arg3 a3) const
- -> decltype(detail_subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type())) {
- return detail_subscribe(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, tag_function>::type());
+ template<class Arg0, class Arg1, class Arg2, class Arg3, class Arg4>
+ auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)));
}
- template<class OnNext, class OnError, class OnCompleted>
- auto subscribe(composite_subscription cs, OnNext n, OnError e, OnCompleted c) const
- -> decltype(make_subscription( make_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c)))) {
- return subscribe( make_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c)));
+ template<class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5>
+ auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) const
+ -> decltype(detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) {
+ return detail_subscribe(make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)));
}
template<class Predicate>
- auto filter(Predicate p) const
+ auto filter(Predicate&& p) const
-> observable<T, rxo::detail::filter<T, observable, Predicate>> {
return observable<T, rxo::detail::filter<T, observable, Predicate>>(
- rxo::detail::filter<T, observable, Predicate>(*this, std::move(p)));
+ rxo::detail::filter<T, observable, Predicate>(*this, std::forward<Predicate>(p)));
}
template<class Selector>
- auto map(Selector s) const
+ auto map(Selector&& s) const
-> observable<typename rxo::detail::map<observable, Selector>::value_type, rxo::detail::map<observable, Selector>> {
return observable<typename rxo::detail::map<observable, Selector>::value_type, rxo::detail::map<observable, Selector>>(
- rxo::detail::map<observable, Selector>(*this, std::move(s)));
+ rxo::detail::map<observable, Selector>(*this, std::forward<Selector>(s)));
}
template<class CollectionSelector, class ResultSelector>
- auto flat_map(CollectionSelector s, ResultSelector rs) const
+ auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const
-> observable<typename rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>> {
return observable<typename rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>>(
- rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>(*this, std::move(s), std::move(rs)));
+ rxo::detail::flat_map<observable, CollectionSelector, ResultSelector>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs)));
}
template<class OperatorFactory>
auto op(OperatorFactory&& of) const
- -> decltype(of(*(this_type*)nullptr)) {
+ -> decltype(of(*(const this_type*)nullptr)) {
static_assert(detail::is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
return of(*this);
}
diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp
index c5cb7a2..8d2ddcf 100644
--- a/Rx/v2/src/rxcpp/rx-observer.hpp
+++ b/Rx/v2/src/rxcpp/rx-observer.hpp
@@ -51,7 +51,7 @@ public:
using std::swap;
swap(s, o.s);
}
- typename this_type::subscription_type get_subscription() {
+ const typename this_type::subscription_type& get_subscription() const {
return s;
}
bool is_subscribed() const {
@@ -75,7 +75,7 @@ class is_observer
template<class C>
static void check(...);
public:
- static const bool value = std::is_convertible<decltype(check<T>(0)), tag_observer>::value;
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_observer>::value;
};
namespace detail {
@@ -102,7 +102,7 @@ struct is_on_next_of
template<class CT, class CF>
static not_void check(...);
- typedef decltype(check<T, F>(0)) detail_result;
+ typedef decltype(check<T, typename std::decay<F>::type>(0)) detail_result;
static const bool value = std::is_same<detail_result, void>::value;
};
@@ -115,7 +115,7 @@ struct is_on_error
template<class CF>
static not_void check(...);
- static const bool value = std::is_same<decltype(check<F>(0)), void>::value;
+ static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
};
template<class F>
@@ -127,7 +127,7 @@ struct is_on_completed
template<class CF>
static not_void check(...);
- static const bool value = std::is_same<decltype(check<F>(0)), void>::value;
+ static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
};
}
@@ -147,215 +147,17 @@ private:
on_error_t onerror;
on_completed_t oncompleted;
- struct tag_this_type {};
- struct tag_function {};
-
- template<class Arg>
- struct resolve_tag
- {
- typedef typename std::conditional<std::is_same<typename std::decay<Arg>::type, dynamic_observer<T>>::value, tag_this_type,
- typename std::conditional<is_subscription<Arg>::value, tag_subscription, tag_function>::type
- >::type type;
- };
-
- template<class Arg, class Tag = typename resolve_tag<Arg>::type>
- struct resolve_cs;
- template<class Arg>
- struct resolve_cs<Arg, tag_this_type>
- {
- auto operator()(const Arg& a)
- -> decltype(a.cs) {
- return a.cs;
- }
- auto operator()(Arg&& a)
- -> decltype(a.cs) {
- return std::move(a.cs);
- }
- };
- template<class Arg>
- struct resolve_cs<Arg, tag_subscription>
- {
- Arg operator()(Arg a) {
- return std::move(a);
- }
- };
- template<class Arg>
- struct resolve_cs<Arg, tag_function>
- {
- composite_subscription operator()(const Arg& a) {
- return composite_subscription();
- }
- };
-
- template<class Arg, class Tag = typename resolve_tag<Arg>::type>
- struct resolve_onnext;
- template<class Arg>
- struct resolve_onnext<Arg, tag_this_type>
- {
- auto operator()(const Arg& a)
- -> decltype(a.onnext) {
- return a.onnext;
- }
- auto operator()(Arg&& a)
- -> decltype(a.onnext) {
- return std::move(a.onnext);
- }
- };
- template<class Arg>
- struct resolve_onnext<Arg, tag_subscription>
- {
- template<class Arg1>
- on_next_t operator()(const Arg1& a1) {
- return on_next_t();
- }
- template<class Arg1, class Arg2>
- Arg2 operator()(const Arg1&, Arg2 a2) {
- static_assert(detail::is_on_next_of<T, Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value,
- "Function supplied for on_next must be a function with the signature void(T);");
- return std::move(a2);
- }
- };
- template<class Arg>
- struct resolve_onnext<Arg, tag_function>
- {
- template<class Arg1>
- Arg1 operator()(Arg1 a1) {
- static_assert(detail::is_on_next_of<T, Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value,
- "Function supplied for on_next must be a function with the signature void(T);");
- return std::move(a1);
- }
- template<class Arg1, class Arg2>
- Arg operator()(Arg1 a1, const Arg2&) {
- static_assert(detail::is_on_next_of<T, Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value,
- "Function supplied for on_next must be a function with the signature void(T);");
- return std::move(a1);
- }
- };
-
- template<class Arg, class Tag = typename resolve_tag<Arg>::type>
- struct resolve_onerror;
- template<class Arg>
- struct resolve_onerror<Arg, tag_this_type>
- {
- auto operator()(const Arg& a)
- -> decltype(a.onerror) {
- return a.onerror;
- }
- auto operator()(Arg&& a)
- -> decltype(a.onerror) {
- return std::move(a.onerror);
- }
- };
- template<class Arg>
- struct resolve_onerror<Arg, tag_subscription>
- {
- template<class Arg1>
- on_error_t operator()(const Arg1& a1) {
- return on_error_t();
- }
- template<class Arg1, class Arg2>
- Arg2 operator()(const Arg1&, Arg2 a2) {
- static_assert(detail::is_on_error<Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value,
- "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
- return std::move(a2);
- }
- };
- template<class Arg>
- struct resolve_onerror<Arg, tag_function>
- {
- template<class Arg1>
- on_error_t operator()(const Arg1& a1) {
- return on_error_t();
- }
- template<class Arg1, class Arg2>
- Arg1 operator()(Arg1 a1, const Arg2&) {
- static_assert(detail::is_on_error<Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value,
- "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
- return std::move(a1);
- }
- };
-
- template<class Arg, class Tag = typename resolve_tag<Arg>::type>
- struct resolve_oncompleted;
- template<class Arg>
- struct resolve_oncompleted<Arg, tag_this_type>
- {
- auto operator()(const Arg& a)
- -> decltype(a.oncompleted) {
- return a.oncompleted;
- }
- auto operator()(Arg&& a)
- -> decltype(a.oncompleted) {
- return std::move(a.oncompleted);
- }
- };
- template<class Arg>
- struct resolve_oncompleted<Arg, tag_subscription>
- {
- template<class Arg1>
- on_completed_t operator()(const Arg1& a1) {
- return on_completed_t();
- }
- template<class Arg1, class Arg2>
- Arg2 operator()(const Arg1&, Arg2 a2) {
- static_assert(detail::is_on_completed<Arg2>::value || std::is_same<Arg2, std::nullptr_t>::value,
- "Function supplied for on_completed must be a function with the signature void();");
- return std::move(a2);
- }
- };
- template<class Arg>
- struct resolve_oncompleted<Arg, tag_function>
- {
- template<class Arg1>
- on_completed_t operator()(const Arg1& a1) {
- return on_completed_t();
- }
- template<class Arg1, class Arg2>
- Arg1 operator()(Arg1 a1, const Arg2&) {
- static_assert(detail::is_on_completed<Arg1>::value || std::is_same<Arg1, std::nullptr_t>::value,
- "Function supplied for on_completed must be a function with the signature void();");
- return std::move(a1);
- }
- };
-
public:
dynamic_observer()
{
}
- template<class Arg>
- explicit dynamic_observer(Arg a)
- : base_type(resolve_cs<Arg>()(a))
- , onnext(resolve_onnext<Arg>()(a))
- , onerror(resolve_onerror<Arg>()(a))
- , oncompleted(resolve_oncompleted<Arg>()(a))
- {
- }
-
- template<class Arg1, class Arg2>
- dynamic_observer(Arg1 a1, Arg2 a2)
- : base_type(resolve_cs<Arg1>()(a1))
- , onnext(resolve_onnext<Arg1>()(a1, a2))
- , onerror(resolve_onerror<Arg1>()(a2, on_error_t()))
- , oncompleted(resolve_oncompleted<Arg1>()(on_completed_t(), on_completed_t()))
- {
- }
-
- template<class Arg1, class Arg2, class Arg3>
- dynamic_observer(Arg1 a1, Arg2 a2, Arg3 a3)
- : base_type(resolve_cs<Arg1>()(a1))
- , onnext(resolve_onnext<Arg1>()(a1, a2))
- , onerror(resolve_onerror<Arg1>()(a2, a3))
- , oncompleted(resolve_oncompleted<Arg1>()(a3, on_completed_t()))
- {
- }
-
template<class OnNext, class OnError, class OnCompleted>
- dynamic_observer(composite_subscription cs, OnNext n, OnError e, OnCompleted c)
+ dynamic_observer(composite_subscription cs, OnNext&& n, OnError&& e, OnCompleted&& c)
: base_type(std::move(cs))
- , onnext(std::move(n))
- , onerror(std::move(e))
- , oncompleted(std::move(c))
+ , onnext(std::forward<OnNext>(n))
+ , onerror(std::forward<OnError>(e))
+ , oncompleted(std::forward<OnCompleted>(c))
{
static_assert(detail::is_on_next_of<T, OnNext>::value || std::is_same<OnNext, std::nullptr_t>::value,
"Function supplied for on_next must be a function with the signature void(T);");
@@ -394,13 +196,14 @@ public:
}
};
-template<class T, class OnNext, class OnError = detail::OnErrorEmpty, class OnCompleted = detail::OnCompletedEmpty>
+template<class T, class OnNext, class OnError, class OnCompleted>
class static_observer : public observer_base<T>
{
public:
- typedef OnNext on_next_t;
- typedef OnError on_error_t;
- typedef OnCompleted on_completed_t;
+ typedef static_observer<T, OnNext, OnError, OnCompleted> this_type;
+ typedef typename std::decay<OnNext>::type on_next_t;
+ typedef typename std::decay<OnError>::type on_error_t;
+ typedef typename std::decay<OnCompleted>::type on_completed_t;
private:
on_next_t onnext;
@@ -412,39 +215,32 @@ public:
static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
- explicit static_observer(on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
- : observer_base<T>(composite_subscription())
- , onnext(std::move(n))
- , onerror(std::move(e))
- , oncompleted(std::move(c))
- {
- }
- explicit static_observer(composite_subscription cs, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
+ explicit static_observer(composite_subscription cs, on_next_t n, on_error_t e, on_completed_t c)
: observer_base<T>(std::move(cs))
, onnext(std::move(n))
, onerror(std::move(e))
, oncompleted(std::move(c))
{
}
- static_observer(const static_observer& o)
+ static_observer(const this_type& o)
: observer_base<T>(o)
, onnext(o.onnext)
, onerror(o.onerror)
, oncompleted(o.oncompleted)
{
}
- static_observer(static_observer&& o)
+ static_observer(this_type&& o)
: observer_base<T>(std::move(o))
, onnext(std::move(o.onnext))
, onerror(std::move(o.onerror))
, oncompleted(std::move(o.oncompleted))
{
}
- static_observer& operator=(static_observer o) {
+ static_observer& operator=(this_type o) {
swap(o);
return *this;
}
- void swap(static_observer& o) {
+ void swap(this_type& o) {
using std::swap;
observer_base<T>::swap(o);
swap(onnext, o.onnext);
@@ -466,8 +262,8 @@ public:
template<class T, class I>
class observer : public observer_root<T>
{
- typedef observer this_type;
- typedef typename std::conditional<is_observer<I>::value, I, dynamic_observer<T>>::type inner_t;
+ typedef observer<T, I> this_type;
+ typedef typename std::conditional<is_observer<I>::value, typename std::decay<I>::type, dynamic_observer<T>>::type inner_t;
struct detacher
{
@@ -515,7 +311,7 @@ public:
inner.on_completed();
}
}
- typename this_type::subscription_type get_subscription() {
+ const typename this_type::subscription_type& get_subscription() const {
return inner.get_subscription();
}
bool is_subscribed() const {
@@ -545,7 +341,7 @@ public:
}
void on_completed() const {
}
- typename this_type::subscription_type get_subscription() {
+ const typename this_type::subscription_type& get_subscription() const {
static typename this_type::subscription_type result;
result.unsubscribe();
return result;
@@ -571,115 +367,135 @@ auto make_observer()
namespace detail {
-struct tag_require_observer {};
-
-struct tag_require_function {};
-
-template<class I>
-auto make_observer(I i, tag_observer&&)
- -> observer<typename I::value_type, I> {
- return observer<typename I::value_type, I>(std::move(i));
+template<class T, class ResolvedArgSet>
+auto make_observer_resolved(ResolvedArgSet&& rs)
+ -> observer<T, static_observer<T, typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type::result_type>> {
+ return make_observer_resolved<T, static_observer<T, typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::result_type, typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type::result_type>>(std::forward<ResolvedArgSet>(rs));
}
-
-struct tag_function {};
-template<class T, class OnNext>
-auto make_observer(OnNext n, tag_function&&)
- -> observer<T, static_observer<T, OnNext>> {
- return observer<T, static_observer<T, OnNext>>(
- static_observer<T, OnNext>(std::move(n)));
+template<class T, class ResolvedArgSet>
+auto make_observer_dynamic_resolved(ResolvedArgSet&& rs)
+ -> observer<T, dynamic_observer<T>> {
+ return make_observer_resolved<T, dynamic_observer<T>>(std::forward<ResolvedArgSet>(rs));
}
-template<class T, class OnNext, class OnError>
-auto make_observer(OnNext n, OnError e, tag_function&&)
- -> observer<T, static_observer<T, OnNext, OnError>> {
- return observer<T, static_observer<T, OnNext, OnError>>(
- static_observer<T, OnNext, OnError>(std::move(n), std::move(e)));
+template<class T, class I, class ResolvedArgSet>
+auto make_observer_resolved(ResolvedArgSet&& rs)
+ -> observer<T, I> {
+ typedef I inner_type;
+ return observer<T, inner_type>(inner_type(
+ std::get<0>(std::forward<ResolvedArgSet>(rs)).value,
+ std::move(std::get<1>(std::forward<ResolvedArgSet>(rs)).value),
+ std::move(std::get<2>(std::forward<ResolvedArgSet>(rs)).value),
+ std::move(std::get<3>(std::forward<ResolvedArgSet>(rs)).value)));
+
+ typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rn_t;
+ typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type re_t;
+ typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type rc_t;
+
+ static_assert(rn_t::is_arg, "onnext is a required parameter");
+ static_assert(!(rn_t::is_arg && re_t::is_arg) || rn_t::n + 1 == re_t::n, "onnext, onerror parameters must be together and in order");
+ static_assert(!(re_t::is_arg && rc_t::is_arg) || re_t::n + 1 == rc_t::n, "onerror, oncompleted parameters must be together and in order");
+ static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order");
}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_observer(OnNext n, OnError e, OnCompleted c, tag_function&&)
- -> observer<T, static_observer<T, OnNext, OnError, OnCompleted>> {
- return observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
- static_observer<T, OnNext, OnError, OnCompleted>(std::move(n), std::move(e), std::move(c)));
-}
+struct tag_subscription_resolution
+{
+ template<class LHS>
+ struct predicate
+ {
+ static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value;
+ };
+ typedef composite_subscription default_type;
+};
-template<class T, class OnNext>
-auto make_observer(composite_subscription cs, OnNext n, tag_subscription&&)
- -> observer<T, static_observer<T, OnNext>> {
- return observer<T, static_observer<T, OnNext>>(
- static_observer<T, OnNext>(std::move(cs), std::move(n)));
-}
+template<class T>
+struct tag_onnext_resolution
+{
+ template<class LHS>
+ struct predicate
+ {
+ static const bool value = is_on_next_of<T, LHS>::value;
+ };
+ typedef OnNextEmpty<T> default_type;
+};
-template<class T, class OnNext, class OnError>
-auto make_observer(composite_subscription cs, OnNext n, OnError e, tag_subscription&&)
- -> observer<T, static_observer<T, OnNext, OnError>> {
- return observer<T, static_observer<T, OnNext, OnError>>(
- static_observer<T, OnNext, OnError>(std::move(cs), std::move(n), std::move(e)));
-}
+struct tag_onerror_resolution
+{
+ template<class LHS>
+ struct predicate
+ {
+ static const bool value = is_on_error<LHS>::value;
+ };
+ typedef OnErrorEmpty default_type;
+};
-}
+struct tag_oncompleted_resolution
+{
+ template<class LHS>
+ struct predicate
+ {
+ static const bool value = is_on_completed<LHS>::value;
+ };
+ typedef OnCompletedEmpty default_type;
+};
-template<class Arg>
-auto make_observer(Arg a)
- -> decltype(detail::make_observer(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_require_observer>::type())) {
- return detail::make_observer(std::move(a), typename std::conditional<is_observer<Arg>::value, tag_observer, detail::tag_require_observer>::type());
-}
+// types to disambiguate
+// on_next and optional on_error, on_completed +
+// optional subscription
+//
-template<class T, class Arg>
-auto make_observer(Arg a)
- -> decltype(detail::make_observer<T>(std::move(a), typename std::conditional<is_observer<Arg>::value, detail::tag_require_function, detail::tag_function>::type())) {
- return detail::make_observer<T>(std::move(a), typename std::conditional<is_observer<Arg>::value, detail::tag_require_function, detail::tag_function>::type());
-}
+template<class T>
+struct tag_observer_set
+ : public rxu::detail::tag_set<tag_subscription_resolution,
+ rxu::detail::tag_set<tag_onnext_resolution<T>,
+ rxu::detail::tag_set<tag_onerror_resolution,
+ rxu::detail::tag_set<tag_oncompleted_resolution>>>>
+{
+};
-template<class T, class Arg1, class Arg2>
-auto make_observer(Arg1 a1, Arg2 a2)
- -> decltype(detail::make_observer<T>(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) {
- return detail::make_observer<T>(std::move(a1), std::move(a2), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type());
-}
-template<class T, class Arg1, class Arg2, class Arg3>
-auto make_observer(Arg1 a1, Arg2 a2, Arg3 a3)
- -> decltype(detail::make_observer<T>(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type())) {
- return detail::make_observer<T>(std::move(a1), std::move(a2), std::move(a3), typename std::conditional<is_subscription<Arg1>::value, tag_subscription, detail::tag_function>::type());
}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_observer(composite_subscription cs, OnNext n, OnError e, OnCompleted c)
- -> observer<T, static_observer<T, OnNext, OnError, OnCompleted>> {
- return observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
- static_observer<T, OnNext, OnError, OnCompleted>(std::move(cs), std::move(n), std::move(e), std::move(c)));
+template<class T, class Arg0>
+auto make_observer(Arg0&& a0)
+ -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)))) {
+ return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)));
}
-
-template<class T, class OnNext>
-auto make_observer_dynamic(OnNext n)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(n)));
+template<class T, class Arg0, class Arg1>
+auto make_observer(Arg0&& a0, Arg1&& a1)
+ -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) {
+ return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
}
-template<class T, class OnNext, class OnError>
-auto make_observer_dynamic(OnNext n, OnError e)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(n), std::move(e)));
+template<class T, class Arg0, class Arg1, class Arg2>
+auto make_observer(Arg0&& a0, Arg1&& a1, Arg2&& a2)
+ -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) {
+ return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_observer_dynamic(OnNext n, OnError e, OnCompleted c)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(n), std::move(e), std::move(c)));
+template<class T, class Arg0, class Arg1, class Arg2, class Arg3>
+auto make_observer(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
+ -> decltype(detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) {
+ return detail::make_observer_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
}
-template<class T, class OnNext>
-auto make_observer_dynamic(composite_subscription cs, OnNext n)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n)));
+template<class T, class Arg0>
+auto make_observer_dynamic(Arg0&& a0)
+ -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)))) {
+ return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0)));
}
-template<class T, class OnNext, class OnError>
-auto make_observer_dynamic(composite_subscription cs, OnNext n, OnError e)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n), std::move(e)));
+template<class T, class Arg0, class Arg1>
+auto make_observer_dynamic(Arg0&& a0, Arg1&& a1)
+ -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) {
+ return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_observer_dynamic(composite_subscription cs, OnNext n, OnError e, OnCompleted c)
- -> observer<T> {
- return observer<T>(dynamic_observer<T>(std::move(cs), std::move(n), std::move(e), std::move(c)));
+template<class T, class Arg0, class Arg1, class Arg2>
+auto make_observer_dynamic(Arg0&& a0, Arg1&& a1, Arg2&& a2)
+ -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) {
+ return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
+}
+template<class T, class Arg0, class Arg1, class Arg2, class Arg3>
+auto make_observer_dynamic(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
+ -> decltype(detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) {
+ return detail::make_observer_dynamic_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_observer_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
}
}
diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp
index 81c2292..0a25131 100644
--- a/Rx/v2/src/rxcpp/rx-predef.hpp
+++ b/Rx/v2/src/rxcpp/rx-predef.hpp
@@ -15,6 +15,19 @@ class dynamic_observer;
template<class T, class I = dynamic_observer<T>>
class observer;
+struct tag_subscriber {};
+template<class T>
+class is_subscriber
+{
+ struct not_void {};
+ template<class C>
+ static typename C::subscriber_tag* check(int);
+ template<class C>
+ static not_void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_subscriber*>::value;
+};
+
template<class T>
class dynamic_observable;
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 1d53ab5..7bca7ad 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -9,29 +9,36 @@
namespace rxcpp {
-struct tag_subscriber {};
template<class T>
struct subscriber_base : public observer_root<T>, public subscription_base, public resumption_base
{
typedef tag_subscriber subscriber_tag;
};
-template<class T>
-class is_subscriber
-{
- template<class C>
- static typename C::subscriber_tag* check(int);
- template<class C>
- static void check(...);
-public:
- static const bool value = std::is_convertible<decltype(check<T>(0)), tag_subscriber*>::value;
-};
template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T>
{
+ typedef subscriber<T, Observer> this_type;
+ typedef typename std::decay<Observer>::type observer_type;
+
composite_subscription lifetime;
resumption controller;
- Observer destination;
+ observer_type destination;
+
+ struct detacher
+ {
+ ~detacher()
+ {
+ if (that) {
+ that->unsubscribe();
+ }
+ }
+ detacher(const this_type* that)
+ : that(that)
+ {
+ }
+ const this_type* that;
+ };
public:
typedef typename composite_subscription::weak_subscription weak_subscription;
@@ -40,20 +47,30 @@ public:
subscriber()
{
}
- subscriber(composite_subscription cs, resumption r, Observer o)
+ template<class U>
+ subscriber(composite_subscription cs, resumption r, U&& o)
: lifetime(std::move(cs))
- , destination(std::move(o))
, controller(std::move(r))
+ , destination(std::forward<U>(o))
{
}
- Observer get_observer() const {
+ const observer_type& get_observer() const {
+ return destination;
+ }
+ observer_type& get_observer() {
return destination;
}
- resumption get_resumption() const {
+ const resumption& get_resumption() const {
return controller;
}
- composite_subscription get_subscription() const {
+ resumption& get_resumption() {
+ return controller;
+ }
+ const composite_subscription& get_subscription() const {
+ return lifetime;
+ }
+ composite_subscription& get_subscription() {
return lifetime;
}
@@ -69,13 +86,23 @@ public:
// observer
//
void on_next(T t) const {
- destination.on_next(std::move(t));
+ if (is_subscribed()) {
+ detacher protect(this);
+ destination.on_next(std::move(t));
+ protect.that = nullptr;
+ }
}
void on_error(std::exception_ptr e) const {
- destination.on_error(e);
+ if (is_subscribed()) {
+ detacher protect(this);
+ destination.on_error(e);
+ }
}
void on_completed() const {
- destination.on_completed();
+ if (is_subscribed()) {
+ detacher protect(this);
+ destination.on_completed();
+ }
}
// composite_subscription
@@ -101,22 +128,7 @@ public:
};
-template<class T, class ResolvedArgSet>
-auto make_observer_resolved(ResolvedArgSet&& rs)
- -> observer<T, static_observer<T, decltype (std::get<2>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<3>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<4>(std::forward<ResolvedArgSet>(rs)).value)>> {
- typedef static_observer<T, decltype (std::get<2>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<3>(std::forward<ResolvedArgSet>(rs)).value), decltype (std::get<4>(std::forward<ResolvedArgSet>(rs)).value)> inner_type;
- return observer<T, inner_type>(inner_type(
- std::move(std::get<2>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<3>(std::forward<ResolvedArgSet>(rs)).value), std::move(std::get<4>(std::forward<ResolvedArgSet>(rs)).value)));
-
- typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t;
- typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t;
- typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t;
-
- static_assert(rn_t::is_arg, "onnext is a required parameter");
- static_assert(!(rn_t::is_arg && re_t::is_arg) || rn_t::n + 1 == re_t::n, "onnext, onerror parameters must be together and in order");
- static_assert(!(re_t::is_arg && rc_t::is_arg) || re_t::n + 1 == rc_t::n, "onerror, oncompleted parameters must be together and in order");
- static_assert(!(rn_t::is_arg && rc_t::is_arg && !re_t::is_arg) || rn_t::n + 1 == rc_t::n, "onnext, oncompleted parameters must be together and in order");
-}
+namespace detail {
template<class T, bool subscriber_is_arg, bool observer_is_arg, bool onnext_is_arg>
struct observer_selector;
@@ -126,8 +138,8 @@ struct observer_selector<T, subscriber_is_arg, true, false>
{
template<class Set>
static auto get_observer(Set&& rs)
- -> decltype(std::get<5>(std::forward<Set>(rs))) {
- return std::get<5>(std::forward<Set>(rs));
+ -> decltype(std::get<5>(std::forward<Set>(rs)).value) {
+ return std::get<5>(std::forward<Set>(rs)).value;
}
};
template<class T, bool subscriber_is_arg>
@@ -136,6 +148,9 @@ struct observer_selector<T, subscriber_is_arg, false, true>
template<class Set>
static auto get_observer(Set&& rs)
-> decltype(make_observer_resolved<T>(std::forward<Set>(rs))) {
+ if (!std::get<0>(std::forward<Set>(rs)).value.is_subscribed()) {
+ abort();
+ }
return make_observer_resolved<T>(std::forward<Set>(rs));
}
};
@@ -144,20 +159,23 @@ struct observer_selector<T, true, false, false>
{
template<class Set>
static auto get_observer(Set&& rs)
- -> decltype(std::get<6>(std::forward<Set>(rs)).value.get_observer()) {
- return std::get<6>(std::forward<Set>(rs)).value.get_observer();
+ -> const decltype( std::get<6>(std::forward<Set>(rs)).value.get_observer())& {
+ return std::get<6>(std::forward<Set>(rs)).value.get_observer();
}
};
template<class T, class ResolvedArgSet>
auto select_observer(ResolvedArgSet&& rs)
- -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) {
- return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs));
+ -> decltype(observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs))) {
+ if (!std::get<0>(std::forward<ResolvedArgSet>(rs)).value.is_subscribed()) {
+ abort();
+ }
+ return observer_selector<T, std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg, std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type::is_arg>::get_observer(std::forward<ResolvedArgSet>(rs));
- typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rr_t;
- typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type rn_t;
- typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type re_t;
- typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rc_t;
+ typedef typename std::decay<decltype(std::get<4>(std::forward<ResolvedArgSet>(rs)))>::type rr_t;
+ typedef typename std::decay<decltype(std::get<1>(std::forward<ResolvedArgSet>(rs)))>::type rn_t;
+ typedef typename std::decay<decltype(std::get<2>(std::forward<ResolvedArgSet>(rs)))>::type re_t;
+ typedef typename std::decay<decltype(std::get<3>(std::forward<ResolvedArgSet>(rs)))>::type rc_t;
typedef typename std::decay<decltype(std::get<5>(std::forward<ResolvedArgSet>(rs)))>::type ro_t;
typedef typename std::decay<decltype(std::get<6>(std::forward<ResolvedArgSet>(rs)))>::type rs_t;
@@ -166,20 +184,27 @@ auto select_observer(ResolvedArgSet&& rs)
}
template<class T, class ResolvedArgSet>
-auto make_subscriber_resolved(ResolvedArgSet&& rs)
- -> subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))> {
- auto rsub = std::get<0>(std::forward<ResolvedArgSet>(rs));
- auto rr = std::get<1>(std::forward<ResolvedArgSet>(rs));
- const auto& rscrbr = std::get<6>(std::forward<ResolvedArgSet>(rs));
- auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : std::move(rr.value);
- auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : std::move(rsub.value);
- return subscriber<T, decltype( select_observer<T>(std::forward<ResolvedArgSet>(rs)))>(
- std::move(s), std::move(r), select_observer<T>(std::forward<ResolvedArgSet>(rs)));
-
+auto make_subscriber_resolved(ResolvedArgSet&& rsArg)
+ -> subscriber<T, decltype( select_observer<T>(std::move(rsArg)))> {
+ const auto rs = std::forward<ResolvedArgSet>(rsArg);
+ if (!std::get<0>(rs).value.is_subscribed()) {
+ abort();
+ }
+ const auto rsub = std::get<0>(rs);
+ const auto rr = std::get<4>(rs);
+ const auto rscrbr = std::get<6>(rs);
+ const auto r = (rscrbr.is_arg && !rr.is_arg) ? rscrbr.value.get_resumption() : rr.value;
+ const auto s = (rscrbr.is_arg && !rsub.is_arg) ? rscrbr.value.get_subscription() : rsub.value;
+ return subscriber<T, decltype( select_observer<T>(std::move(rsArg)))>(
+ s, r, select_observer<T>(std::move(rs)));
+
+// at least for now do not enforce resolver
+#if 0
typedef typename std::decay<decltype(rr)>::type rr_t;
typedef typename std::decay<decltype(rscrbr)>::type rs_t;
static_assert(rs_t::is_arg || rr_t::is_arg, "at least one of; resumption or subscriber is a required parameter");
+#endif
}
template<class T>
@@ -203,16 +228,6 @@ struct tag_observer_resolution
typedef observer<T, void> default_type;
};
-struct tag_subscription_resolution
-{
- template<class LHS>
- struct predicate
- {
- static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value;
- };
- typedef composite_subscription default_type;
-};
-
struct tag_resumption_resolution
{
template<class LHS>
@@ -224,34 +239,6 @@ struct tag_resumption_resolution
};
-template<class T>
-struct tag_onnext_resolution
-{
- template<class LHS>
- struct predicate : public detail::is_on_next_of<T, LHS>
- {
- };
- typedef detail::OnNextEmpty<T> default_type;
-};
-
-struct tag_onerror_resolution
-{
- template<class LHS>
- struct predicate : public detail::is_on_error<LHS>
- {
- };
- typedef detail::OnErrorEmpty default_type;
-};
-
-struct tag_oncompleted_resolution
-{
- template<class LHS>
- struct predicate : public detail::is_on_completed<LHS>
- {
- };
- typedef detail::OnCompletedEmpty default_type;
-};
-
// types to disambiguate
// subscriber with override for subscription, observer, resumption |
// optional subscriber +
@@ -262,45 +249,48 @@ struct tag_oncompleted_resolution
template<class T>
struct tag_subscriber_set
+ // the first four must be the same as tag_observer_set or the indexing will fail
: public rxu::detail::tag_set<tag_subscription_resolution,
- rxu::detail::tag_set<tag_resumption_resolution,
rxu::detail::tag_set<tag_onnext_resolution<T>,
rxu::detail::tag_set<tag_onerror_resolution,
rxu::detail::tag_set<tag_oncompleted_resolution,
+ rxu::detail::tag_set<tag_resumption_resolution,
rxu::detail::tag_set<tag_observer_resolution<T>,
rxu::detail::tag_set<tag_subscriber_resolution<T>>>>>>>>
{
};
+}
+
template<class T, class Arg0>
auto make_subscriber(Arg0&& a0)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0)));
}
template<class T, class Arg0, class Arg1>
auto make_subscriber(Arg0&& a0, Arg1&& a1)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
}
template<class T, class Arg0, class Arg1, class Arg2>
auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
}
template<class T, class Arg0, class Arg1, class Arg2, class Arg3>
auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
}
template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4>
auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)));
}
template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5>
auto make_subscriber(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5)
- -> decltype(make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) {
- return make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)));
+ -> decltype(detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))) {
+ return detail::make_subscriber_resolved<T>(rxu::detail::resolve_arg_set(detail::tag_subscriber_set<T>(), std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)));
}
}
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index ccb8e06..6adba54 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -57,7 +57,7 @@ public:
template<class Unsubscribe>
class static_subscription : public subscription_base
{
- typedef Unsubscribe unsubscribe_call_type;
+ typedef typename std::decay<Unsubscribe>::type unsubscribe_call_type;
unsubscribe_call_type unsubscribe_call;
static_subscription()
{
@@ -83,7 +83,7 @@ public:
template<class I>
class subscription : public subscription_base
{
- typedef I inner_t;
+ typedef typename std::decay<I>::type inner_t;
inner_t inner;
mutable bool issubscribed;
public:
@@ -120,17 +120,17 @@ inline auto make_subscription()
return subscription<void>();
}
template<class I>
-auto make_subscription(I i)
+auto make_subscription(I&& i)
-> typename std::enable_if<is_subscription<I>::value,
subscription<I>>::type {
- return subscription<I>(std::move(i));
+ return subscription<I>(std::forward<I>(i));
}
template<class Unsubscribe>
-auto make_subscription(Unsubscribe u)
+auto make_subscription(Unsubscribe&& u)
-> typename std::enable_if<!is_subscription<Unsubscribe>::value,
subscription< static_subscription<Unsubscribe>>>::type {
return subscription< static_subscription<Unsubscribe>>(
- static_subscription<Unsubscribe>(std::move(u)));
+ static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
}
class composite_subscription : public subscription_base
@@ -222,24 +222,39 @@ public:
composite_subscription()
: state(std::make_shared<state_t>())
{
+ if (!state) {
+ abort();
+ }
}
composite_subscription(const composite_subscription& o)
: state(o.state)
{
+ if (!state) {
+ abort();
+ }
}
composite_subscription(composite_subscription&& o)
: state(std::move(o.state))
{
+ if (!state) {
+ abort();
+ }
}
composite_subscription& operator=(const composite_subscription& o)
{
state = o.state;
+ if (!state) {
+ abort();
+ }
return *this;
}
composite_subscription& operator=(composite_subscription&& o)
{
state = std::move(o.state);
+ if (!state) {
+ abort();
+ }
return *this;
}
diff --git a/Rx/v2/src/rxcpp/rx-test.hpp b/Rx/v2/src/rxcpp/rx-test.hpp
index 946ec03..e519cf8 100644
--- a/Rx/v2/src/rxcpp/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/rx-test.hpp
@@ -18,7 +18,7 @@ struct test_subject_base
typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
typedef std::shared_ptr<test_subject_base<T>> type;
- virtual void on_subscribe(observer<T>) const =0;
+ virtual void on_subscribe(subscriber<T>) const =0;
virtual std::vector<recorded_type> messages() const =0;
virtual std::vector<rxn::subscription> subscriptions() const =0;
};
@@ -30,29 +30,35 @@ struct test_source
explicit test_source(typename test_subject_base<T>::type ts)
: ts(std::move(ts))
{
+ if (!this->ts) abort();
}
typename test_subject_base<T>::type ts;
- void on_subscribe(observer<T> o) const {
+ void on_subscribe(subscriber<T> o) const {
ts->on_subscribe(std::move(o));
}
- template<class Observer>
- typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type
- on_subscribe(Observer o) const {
- auto so = std::make_shared<Observer>(o);
- ts->on_subscribe(make_observer_dynamic<T>(
- so->get_subscription(),
- // on_next
- [so](T t){
- so->on_next(t);
- },
- // on_error
- [so](std::exception_ptr e){
- so->on_error(e);
- },
- // on_completed
- [so](){
- so->on_completed();
- }));
+ template<class Subscriber>
+ typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, subscriber<T>>::value, void>::type
+ on_subscribe(Subscriber&& o) const {
+
+ static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber.");
+
+ auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
+ ts->on_subscribe(
+ make_subscriber<T>(
+ *so,
+ make_observer_dynamic<T>(
+ // on_next
+ [so](T t){
+ so->on_next(t);
+ },
+ // on_error
+ [so](std::exception_ptr e){
+ so->on_error(e);
+ },
+ // on_completed
+ [so](){
+ so->on_completed();
+ })));
}
};
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 53f1b3c..fa6efe9 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -139,16 +139,15 @@ public:
if (is_set) {
is_set = false;
reinterpret_cast<T*>(&storage)->~T();
+ std::fill_n(reinterpret_cast<char*>(&storage), sizeof(T), 0);
}
}
- void reset(T value) {
- if (is_set) {
- *reinterpret_cast<T*>(&storage) = std::move(value);
- } else {
- new (reinterpret_cast<T*>(&storage)) T(std::move(value));
- is_set = true;
- }
+ template<class U>
+ void reset(U&& value) {
+ reset();
+ new (reinterpret_cast<T*>(&storage)) T(std::forward<U>(value));
+ is_set = true;
}
maybe& operator=(const T& other) {
@@ -212,8 +211,9 @@ struct arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, const Arg4&, result_type a)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, const Arg4&, R&& a)
+ : value(std::forward<R>(a)) {
}
};
@@ -227,8 +227,9 @@ struct arg_resolver_n<4, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, result_type a, const Arg5&)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, const Arg3&, R&& a, const Arg5&)
+ : value(std::forward<R>(a)) {
}
};
@@ -242,8 +243,9 @@ struct arg_resolver_n<3, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, result_type a, const Arg4&, const Arg5&)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(const Arg0&, const Arg1&, const Arg2&, R&& a, const Arg4&, const Arg5&)
+ : value(std::forward<R>(a)) {
}
};
@@ -257,8 +259,9 @@ struct arg_resolver_n<2, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(const Arg0&, const Arg1&, result_type a, const Arg3&, const Arg4&, const Arg5&)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(const Arg0&, const Arg1&, R&& a, const Arg3&, const Arg4&, const Arg5&)
+ : value(std::forward<R>(a)) {
}
};
@@ -272,8 +275,9 @@ struct arg_resolver_n<1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(const Arg0&, result_type a, const Arg2&, const Arg3&, const Arg4&, const Arg5&)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(const Arg0&, R&& a, const Arg2&, const Arg3&, const Arg4&, const Arg5&)
+ : value(std::forward<R>(a)) {
}
};
@@ -287,8 +291,9 @@ struct arg_resolver_n<0, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>
typedef arg_resolver_n<n - 1, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5> prev_type;
typedef typename std::conditional<Predicate<result_type>::value, this_type, typename prev_type::type>::type type;
result_type value;
- arg_resolver_n(result_type a, const Arg1&, const Arg2&, const Arg3&, const Arg4&, const Arg5&)
- : value(std::move(a)) {
+ template<class R>
+ arg_resolver_n(R&& a, const Arg1&, const Arg2&, const Arg3&, const Arg4&, const Arg5&)
+ : value(std::forward<R>(a)) {
}
};
@@ -311,7 +316,7 @@ struct tag_unresolvable {};
template<template<class Arg> class Predicate, class Default, class Arg0 = tag_unresolvable, class Arg1 = tag_unresolvable, class Arg2 = tag_unresolvable, class Arg3 = tag_unresolvable, class Arg4 = tag_unresolvable, class Arg5 = tag_unresolvable>
struct arg_resolver
{
- typedef typename arg_resolver_n<5, Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type type;
+ typedef typename arg_resolver_n<5, Predicate, Default, typename std::decay<Arg0>::type, typename std::decay<Arg1>::type, typename std::decay<Arg2>::type, typename std::decay<Arg3>::type, typename std::decay<Arg4>::type, typename std::decay<Arg5>::type>::type type;
};
@@ -323,54 +328,54 @@ auto resolve_arg()
template<template<class Arg> class Predicate, class Default,
class Arg0>
-auto resolve_arg(Arg0 a0)
--> decltype(typename arg_resolver<Predicate, Default, Arg0>::type(std::move(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
- return typename arg_resolver<Predicate, Default, Arg0>::type(std::move(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
+auto resolve_arg(Arg0&& a0)
+-> decltype(typename arg_resolver<Predicate, Default, Arg0>::type(std::forward<Arg0>(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
+ return typename arg_resolver<Predicate, Default, Arg0>::type(std::forward<Arg0>(a0), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
}
template<template<class Arg> class Predicate, class Default,
class Arg0, class Arg1>
-auto resolve_arg(Arg0 a0, Arg1 a1)
+auto resolve_arg(Arg0&& a0, Arg1&& a1)
-> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1>::type(
- std::move(a0), std::move(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
return typename arg_resolver<Predicate, Default, Arg0, Arg1>::type(
- std::move(a0), std::move(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), tag_unresolvable(), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
}
template<template<class Arg> class Predicate, class Default,
class Arg0, class Arg1, class Arg2>
-auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2)
+auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2)
-> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2>::type(
- std::move(a0), std::move(a1), std::move(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable())) {
return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2>::type(
- std::move(a0), std::move(a1), std::move(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), tag_unresolvable(), tag_unresolvable(), tag_unresolvable());
}
template<template<class Arg> class Predicate, class Default,
class Arg0, class Arg1, class Arg2, class Arg3>
-auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3)
+auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
-> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), tag_unresolvable(), tag_unresolvable())) {
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), tag_unresolvable(), tag_unresolvable())) {
return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), tag_unresolvable(), tag_unresolvable());
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), tag_unresolvable(), tag_unresolvable());
}
template<template<class Arg> class Predicate, class Default,
class Arg0, class Arg1, class Arg2, class Arg3, class Arg4>
-auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3, Arg4 a4)
+auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4)
-> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), tag_unresolvable())) {
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), tag_unresolvable())) {
return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), tag_unresolvable());
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), tag_unresolvable());
}
template<template<class Arg> class Predicate, class Default,
class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5>
-auto resolve_arg(Arg0 a0, Arg1 a1, Arg2 a2, Arg3 a3, Arg4 a4, Arg5 a5)
+auto resolve_arg(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5)
-> decltype(typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), std::move(a5))) {
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))) {
return typename arg_resolver<Predicate, Default, Arg0, Arg1, Arg2, Arg3, Arg4, Arg5>::type(
- std::move(a0), std::move(a1), std::move(a2), std::move(a3), std::move(a4), std::move(a5));
+ std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5));
}
struct arg_resolver_term {};
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
index 250dbf6..cd3b7ff 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
@@ -28,30 +28,37 @@ public:
typedef typename rxn::notification<T> notification_type;
typedef rxn::recorded<typename notification_type::type> recorded_type;
- static
- recorded_type on_next(long ticks, T value)
+ struct on_next_factory
{
- return recorded_type(ticks, notification_type::make_on_next(value));
- }
-
- static
- recorded_type on_completed(long ticks)
+ recorded_type operator()(long ticks, T value) const {
+ return recorded_type(ticks, notification_type::on_next(value));
+ }
+ };
+ struct on_completed_factory
{
- return recorded_type(ticks, notification_type::make_on_completed());
- }
-
- template<class Exception>
- static
- recorded_type on_error(long ticks, Exception e)
+ recorded_type operator()(long ticks) const {
+ return recorded_type(ticks, notification_type::on_completed());
+ }
+ };
+ struct on_error_factory
{
- return recorded_type(ticks, notification_type::make_on_error(e));
- }
+ template<class Exception>
+ recorded_type operator()(long ticks, Exception&& e) const {
+ return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
+ }
+ };
+
+ static const on_next_factory on_next;
+ static const on_completed_factory on_completed;
+ static const on_error_factory on_error;
- static
- rxn::subscription subscribe(long subscribe, long unsubscribe)
+ struct subscribe_factory
{
- return rxn::subscription(subscribe, unsubscribe);
- }
+ rxn::subscription operator()(long subscribe, long unsubscribe) const {
+ return rxn::subscription(subscribe, unsubscribe);
+ }
+ };
+ static const subscribe_factory subscribe;
private:
~messages();
@@ -86,27 +93,29 @@ public:
using base::start;
template<class T, class F>
- auto start(F createSource, long created, long subscribed, long unsubscribed)
- -> rxt::testable_observer<T>
+ auto start(F&& createSource, long created, long subscribed, long unsubscribed)
+ -> subscriber<T, rxt::testable_observer<T>>
{
+ typename std::decay<F>::type createSrc = std::forward<F>(createSource);
+
struct state_type
: public std::enable_shared_from_this<state_type>
{
- typedef decltype(createSource()) source_type;
+ typedef decltype(std::forward<F>(createSrc)()) source_type;
std::unique_ptr<source_type> source;
- rxt::testable_observer<T> o;
+ subscriber<T, rxt::testable_observer<T>> o;
- explicit state_type(rxt::testable_observer<T> o)
+ explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
: source()
, o(o)
{
}
};
- std::shared_ptr<state_type> state(new state_type(this->make_observer<T>()));
+ std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>()));
- schedule_absolute(created, make_action([createSource, state](action, scheduler) {
- state->source.reset(new typename state_type::source_type(createSource()));
+ schedule_absolute(created, make_action([createSrc, state](action, scheduler) {
+ state->source.reset(new typename state_type::source_type(createSrc()));
return make_action_empty();
}));
schedule_absolute(subscribed, make_action([state](action, scheduler) {
@@ -124,17 +133,17 @@ public:
}
template<class T, class F>
- auto start(F createSource, long unsubscribed)
- -> rxt::testable_observer<T>
+ auto start(F&& createSource, long unsubscribed)
+ -> subscriber<T, rxt::testable_observer<T>>
{
- return start<T>(std::move(createSource), created_time, subscribed_time, unsubscribed);
+ return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
}
template<class T, class F>
- auto start(F createSource)
- -> rxt::testable_observer<T>
+ auto start(F&& createSource)
+ -> subscriber<T, rxt::testable_observer<T>>
{
- return start<T>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
+ return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
}
template<class T>
@@ -157,9 +166,28 @@ public:
template<class T>
rxt::testable_observer<T> make_observer();
+
+ template<class T>
+ subscriber<T, rxt::testable_observer<T>> make_subscriber();
};
template<class T>
+//static
+RXCPP_SELECT_ANY const typename test::messages<T>::on_next_factory test::messages<T>::on_next = test::messages<T>::on_next_factory();
+
+template<class T>
+//static
+RXCPP_SELECT_ANY const typename test::messages<T>::on_completed_factory test::messages<T>::on_completed = test::messages<T>::on_completed_factory();
+
+template<class T>
+//static
+RXCPP_SELECT_ANY const typename test::messages<T>::on_error_factory test::messages<T>::on_error = test::messages<T>::on_error_factory();
+
+template<class T>
+//static
+RXCPP_SELECT_ANY const typename test::messages<T>::subscribe_factory test::messages<T>::subscribe = test::messages<T>::subscribe_factory();
+
+template<class T>
class mock_observer
: public rxt::detail::test_subject_base<T>
{
@@ -175,7 +203,7 @@ public:
typename test::shared sc;
std::vector<recorded_type> m;
- virtual void on_subscribe(observer<T>) const {
+ virtual void on_subscribe(subscriber<T>) const {
abort();
}
virtual std::vector<rxn::subscription> subscriptions() const {
@@ -193,31 +221,38 @@ rxt::testable_observer<T> test::make_observer()
typedef typename rxn::notification<T> notification_type;
typedef rxn::recorded<typename notification_type::type> recorded_type;
- auto ts = std::make_shared<mock_observer<T>>(
- std::static_pointer_cast<test>(shared_from_this()));
+ std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(
+ std::static_pointer_cast<test>(shared_from_this())));
return rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
// on_next
[ts](T value)
{
ts->m.push_back(
- recorded_type(ts->sc->clock(), notification_type::make_on_next(value)));
+ recorded_type(ts->sc->clock(), notification_type::on_next(value)));
},
// on_error
[ts](std::exception_ptr e)
{
ts->m.push_back(
- recorded_type(ts->sc->clock(), notification_type::make_on_error(e)));
+ recorded_type(ts->sc->clock(), notification_type::on_error(e)));
},
// on_completed
[ts]()
{
ts->m.push_back(
- recorded_type(ts->sc->clock(), notification_type::make_on_completed()));
+ recorded_type(ts->sc->clock(), notification_type::on_completed()));
}));
}
template<class T>
+subscriber<T, rxt::testable_observer<T>> test::make_subscriber()
+{
+ auto to = this->make_observer<T>();
+ return rxcpp::make_subscriber<T>(to.get_subscription(), to);
+}
+
+template<class T>
class cold_observable
: public rxt::detail::test_subject_base<T>
{
@@ -242,7 +277,7 @@ public:
{
}
- virtual void on_subscribe(observer<T> o) const {
+ virtual void on_subscribe(subscriber<T> o) const {
sv.push_back(rxn::subscription(sc->clock()));
auto index = sv.size() - 1;
@@ -272,8 +307,8 @@ public:
template<class T>
rxt::testable_observable<T> test::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
{
- return rxt::testable_observable<T>(std::make_shared<cold_observable<T>>(
- std::static_pointer_cast<test>(shared_from_this()), std::move(messages)));
+ auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(std::static_pointer_cast<test>(shared_from_this()), std::move(messages)));
+ return rxt::testable_observable<T>(co);
}
template<class T>
@@ -283,7 +318,7 @@ class hot_observable
typedef hot_observable<T> this_type;
typename test::shared sc;
typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
- typedef observer<T> observer_type;
+ typedef subscriber<T> observer_type;
mutable std::vector<recorded_type> mv;
mutable std::vector<rxn::subscription> sv;
mutable std::vector<observer_type> observers;
diff --git a/Rx/v2/src/rxcpp/sources/rx-range.hpp b/Rx/v2/src/rxcpp/sources/rx-range.hpp
index 9a44ba9..418c292 100644
--- a/Rx/v2/src/rxcpp/sources/rx-range.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-range.hpp
@@ -31,8 +31,8 @@ struct range : public source_base<T>
init.step = s;
init.sc = sc;
}
- template<class I>
- void on_subscribe(observer<T, I> o) {
+ template<class Subscriber>
+ void on_subscribe(Subscriber o) {
auto state = std::make_shared<state_type>(init);
state->sc->schedule(o.get_subscription(), [=](rxsc::action that, rxsc::scheduler){
if (state->remaining == 0) {
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index fafdb14..82b05e7 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -18,7 +18,7 @@ class multicast_observer
: public observer_base<T>
{
typedef observer_base<T> base;
- typedef observer<T> observer_type;
+ typedef subscriber<T> observer_type;
typedef std::vector<observer_type> list_type;
struct mode
@@ -66,7 +66,7 @@ class multicast_observer
std::copy_if(
old->observers.begin(), old->observers.end(),
std::inserter(observers, observers.end()),
- [](const observer<T>& o){
+ [](const observer_type& o){
return o.is_subscribed();
});
}
@@ -108,7 +108,7 @@ public:
bool has_observers() const {
return b->state->has_observers;
}
- void add(observer<T> o) const {
+ void add(observer_type o) const {
std::unique_lock<std::mutex> guard(b->state->lock);
switch (b->state->current) {
case mode::Casting:
@@ -148,7 +148,7 @@ public:
c = b->completer;
}
for (auto& o : c->observers) {
- o.on_next(std::move(t));
+ o.on_next(t);
}
}
}
@@ -206,7 +206,7 @@ public:
return observer<T, detail::multicast_observer<T>>(s);
}
observable<T> get_observable() const {
- return make_dynamic_observable<T>([this](observer<T> o){
+ return make_dynamic_observable<T>([this](subscriber<T> o){
this->s.add(std::move(o));
});
}