diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-09-01 22:38:02 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-09-01 22:38:02 -0700 |
commit | ff573fcd26752d14b5e00b7a4e42f147ae7bc92e (patch) | |
tree | c26204dc5014c0323c215055a94fc78e2a230d75 /Rx | |
parent | 30a87eb8980b16b8212832866a1ec3d447cefc4c (diff) | |
download | RxCpp-ff573fcd26752d14b5e00b7a4e42f147ae7bc92e.tar.gz |
allow conversions to dynamic
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 4 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 43 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-grouped_observable.hpp | 44 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 15 |
4 files changed, 33 insertions, 73 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index dcba7ed..5ef609b 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -59,6 +59,7 @@ struct group_by typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; typedef typename traits_type::key_selector_type key_selector_type; typedef typename traits_type::marble_selector_type marble_selector_type; + typedef typename traits_type::marble_type marble_type; typedef typename traits_type::predicate_type predicate_type; typedef typename traits_type::subject_type subject_type; typedef typename traits_type::key_type key_type; @@ -83,7 +84,7 @@ struct group_by { } - struct group_by_observable + struct group_by_observable : public rxs::source_base<marble_type> { subject_type subject; key_type key; @@ -109,7 +110,6 @@ struct group_by { typedef group_by_observer<Subscriber> this_type; typedef typename traits_type::grouped_observable_type value_type; - typedef typename traits_type::marble_type marble_type; typedef typename std::decay<Subscriber>::type dest_type; typedef observer<T, this_type> observer_type; dest_type dest; diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 0352605..731d844 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -28,15 +28,13 @@ struct has_on_connect template<class T> class dynamic_connectable_observable - : public rxs::source_base<T> + : public dynamic_observable<T> { struct state_type : public std::enable_shared_from_this<state_type> { - typedef std::function<void(subscriber<T>)> onsubscribe_type; typedef std::function<void(composite_subscription)> onconnect_type; - onsubscribe_type on_subscribe; onconnect_type on_connect; }; std::shared_ptr<state_type> state; @@ -54,9 +52,6 @@ class dynamic_connectable_observable template<class SO> void construct(SO&& source, rxs::tag_source&&) { auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source)); - state->on_subscribe = [so](subscriber<T> o) mutable { - so->on_subscribe(std::move(o)); - }; state->on_connect = [so](composite_subscription cs) mutable { so->on_connect(std::move(cs)); }; @@ -71,45 +66,23 @@ public: } template<class SOF> - explicit dynamic_connectable_observable(SOF&& sof) - : state(std::make_shared<state_type>()) + explicit dynamic_connectable_observable(SOF sof) + : dynamic_observable<T>(sof) + , state(std::make_shared<state_type>()) { - construct(std::forward<SOF>(sof), + construct(std::move(sof), typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); } template<class SF, class CF> dynamic_connectable_observable(SF&& sf, CF&& cf) - : state(std::make_shared<state_type>()) + : dynamic_observable<T>(std::forward<SF>(sf)) + , state(std::make_shared<state_type>()) { - state->on_subscribe = std::forward<SF>(sf); state->on_connect = std::forward<CF>(cf); } - void on_subscribe(subscriber<T> o) const { - state->on_subscribe(std::move(o)); - } - - template<class Subscriber> - typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type - on_subscribe(Subscriber&& o) const { - auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); - state->on_subscribe(make_subscriber<T>( - *so, - make_observer_dynamic<T>( - // on_next - [so](T t){ - so->on_next(t); - }, - // on_error - [so](std::exception_ptr e){ - so->on_error(e); - }, - // on_completed - [so](){ - so->on_completed(); - }))); - } + using dynamic_observable<T>::on_subscribe; void on_connect(composite_subscription cs) const { state->on_connect(std::move(cs)); diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp index 3b248af..84deef9 100644 --- a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp @@ -28,7 +28,7 @@ struct has_on_get_key_for template<class K, class T> class dynamic_grouped_observable - : public rxs::source_base<T> + : public dynamic_observable<T> { public: typedef typename std::decay<K>::type key_type; @@ -38,10 +38,8 @@ private: struct state_type : public std::enable_shared_from_this<state_type> { - typedef std::function<void(subscriber<T>)> onsubscribe_type; typedef std::function<key_type()> ongetkey_type; - onsubscribe_type on_subscribe; ongetkey_type on_get_key; }; std::shared_ptr<state_type> state; @@ -62,9 +60,6 @@ private: template<class SO> void construct(SO&& source, const rxs::tag_source&) { auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source)); - state->on_subscribe = [so](subscriber<T> o) mutable { - so->on_subscribe(std::move(o)); - }; state->on_get_key = [so]() mutable { return so->on_get_key(); }; @@ -77,46 +72,23 @@ public: } template<class SOF> - explicit dynamic_grouped_observable(SOF&& sof) - : state(std::make_shared<state_type>()) + explicit dynamic_grouped_observable(SOF sof) + : dynamic_observable<T>(sof) + , state(std::make_shared<state_type>()) { - construct(std::forward<SOF>(sof), + construct(std::move(sof), typename std::conditional<is_dynamic_grouped_observable<SOF>::value, tag_dynamic_grouped_observable, rxs::tag_source>::type()); } template<class SF, class CF> dynamic_grouped_observable(SF&& sf, CF&& cf) - : state(std::make_shared<state_type>()) + : dynamic_observable<T>(std::forward<SF>(sf)) + , state(std::make_shared<state_type>()) { - state->on_subscribe = std::forward<SF>(sf); state->on_connect = std::forward<CF>(cf); } - void on_subscribe(subscriber<T> o) const { - state->on_subscribe(std::move(o)); - } - - template<class Subscriber> - typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type - on_subscribe(Subscriber&& o) const { - auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); - state->on_subscribe( - make_subscriber<T>( - *so, - // on_next - [so](T t){ - so->on_next(t); - }, - // on_error - [so](std::exception_ptr e){ - so->on_error(e); - }, - // on_completed - [so](){ - so->on_completed(); - }). - as_dynamic()); - } + using dynamic_observable<T>::on_subscribe; key_type on_get_key() const { return state->on_get_key(); diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index dbb3615..39e6bdc 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -104,6 +104,21 @@ public: { } + template<class U, class O> + friend class subscriber; + + template<class O> + subscriber( + const subscriber<T, O>& o, + typename std::enable_if< + !std::is_same<O, observer<T>>::value && + std::is_same<Observer, observer<T>>::value, void**>::type select = nullptr) + : lifetime(o.lifetime) + , destination(o.destination.as_dynamic()) + , id(o.id) + { + } + template<class U> subscriber(trace_id id, composite_subscription cs, U&& o) : lifetime(std::move(cs)) |