summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-09-01 22:38:02 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-09-01 22:38:02 -0700
commitff573fcd26752d14b5e00b7a4e42f147ae7bc92e (patch)
treec26204dc5014c0323c215055a94fc78e2a230d75 /Rx
parent30a87eb8980b16b8212832866a1ec3d447cefc4c (diff)
downloadRxCpp-ff573fcd26752d14b5e00b7a4e42f147ae7bc92e.tar.gz
allow conversions to dynamic
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp4
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp43
-rw-r--r--Rx/v2/src/rxcpp/rx-grouped_observable.hpp44
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp15
4 files changed, 33 insertions, 73 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index dcba7ed..5ef609b 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -59,6 +59,7 @@ struct group_by
typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
typedef typename traits_type::key_selector_type key_selector_type;
typedef typename traits_type::marble_selector_type marble_selector_type;
+ typedef typename traits_type::marble_type marble_type;
typedef typename traits_type::predicate_type predicate_type;
typedef typename traits_type::subject_type subject_type;
typedef typename traits_type::key_type key_type;
@@ -83,7 +84,7 @@ struct group_by
{
}
- struct group_by_observable
+ struct group_by_observable : public rxs::source_base<marble_type>
{
subject_type subject;
key_type key;
@@ -109,7 +110,6 @@ struct group_by
{
typedef group_by_observer<Subscriber> this_type;
typedef typename traits_type::grouped_observable_type value_type;
- typedef typename traits_type::marble_type marble_type;
typedef typename std::decay<Subscriber>::type dest_type;
typedef observer<T, this_type> observer_type;
dest_type dest;
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 0352605..731d844 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -28,15 +28,13 @@ struct has_on_connect
template<class T>
class dynamic_connectable_observable
- : public rxs::source_base<T>
+ : public dynamic_observable<T>
{
struct state_type
: public std::enable_shared_from_this<state_type>
{
- typedef std::function<void(subscriber<T>)> onsubscribe_type;
typedef std::function<void(composite_subscription)> onconnect_type;
- onsubscribe_type on_subscribe;
onconnect_type on_connect;
};
std::shared_ptr<state_type> state;
@@ -54,9 +52,6 @@ class dynamic_connectable_observable
template<class SO>
void construct(SO&& source, rxs::tag_source&&) {
auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
- state->on_subscribe = [so](subscriber<T> o) mutable {
- so->on_subscribe(std::move(o));
- };
state->on_connect = [so](composite_subscription cs) mutable {
so->on_connect(std::move(cs));
};
@@ -71,45 +66,23 @@ public:
}
template<class SOF>
- explicit dynamic_connectable_observable(SOF&& sof)
- : state(std::make_shared<state_type>())
+ explicit dynamic_connectable_observable(SOF sof)
+ : dynamic_observable<T>(sof)
+ , state(std::make_shared<state_type>())
{
- construct(std::forward<SOF>(sof),
+ construct(std::move(sof),
typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
}
template<class SF, class CF>
dynamic_connectable_observable(SF&& sf, CF&& cf)
- : state(std::make_shared<state_type>())
+ : dynamic_observable<T>(std::forward<SF>(sf))
+ , state(std::make_shared<state_type>())
{
- state->on_subscribe = std::forward<SF>(sf);
state->on_connect = std::forward<CF>(cf);
}
- void on_subscribe(subscriber<T> o) const {
- state->on_subscribe(std::move(o));
- }
-
- template<class Subscriber>
- typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
- on_subscribe(Subscriber&& o) const {
- auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
- state->on_subscribe(make_subscriber<T>(
- *so,
- make_observer_dynamic<T>(
- // on_next
- [so](T t){
- so->on_next(t);
- },
- // on_error
- [so](std::exception_ptr e){
- so->on_error(e);
- },
- // on_completed
- [so](){
- so->on_completed();
- })));
- }
+ using dynamic_observable<T>::on_subscribe;
void on_connect(composite_subscription cs) const {
state->on_connect(std::move(cs));
diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
index 3b248af..84deef9 100644
--- a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
@@ -28,7 +28,7 @@ struct has_on_get_key_for
template<class K, class T>
class dynamic_grouped_observable
- : public rxs::source_base<T>
+ : public dynamic_observable<T>
{
public:
typedef typename std::decay<K>::type key_type;
@@ -38,10 +38,8 @@ private:
struct state_type
: public std::enable_shared_from_this<state_type>
{
- typedef std::function<void(subscriber<T>)> onsubscribe_type;
typedef std::function<key_type()> ongetkey_type;
- onsubscribe_type on_subscribe;
ongetkey_type on_get_key;
};
std::shared_ptr<state_type> state;
@@ -62,9 +60,6 @@ private:
template<class SO>
void construct(SO&& source, const rxs::tag_source&) {
auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
- state->on_subscribe = [so](subscriber<T> o) mutable {
- so->on_subscribe(std::move(o));
- };
state->on_get_key = [so]() mutable {
return so->on_get_key();
};
@@ -77,46 +72,23 @@ public:
}
template<class SOF>
- explicit dynamic_grouped_observable(SOF&& sof)
- : state(std::make_shared<state_type>())
+ explicit dynamic_grouped_observable(SOF sof)
+ : dynamic_observable<T>(sof)
+ , state(std::make_shared<state_type>())
{
- construct(std::forward<SOF>(sof),
+ construct(std::move(sof),
typename std::conditional<is_dynamic_grouped_observable<SOF>::value, tag_dynamic_grouped_observable, rxs::tag_source>::type());
}
template<class SF, class CF>
dynamic_grouped_observable(SF&& sf, CF&& cf)
- : state(std::make_shared<state_type>())
+ : dynamic_observable<T>(std::forward<SF>(sf))
+ , state(std::make_shared<state_type>())
{
- state->on_subscribe = std::forward<SF>(sf);
state->on_connect = std::forward<CF>(cf);
}
- void on_subscribe(subscriber<T> o) const {
- state->on_subscribe(std::move(o));
- }
-
- template<class Subscriber>
- typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
- on_subscribe(Subscriber&& o) const {
- auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
- state->on_subscribe(
- make_subscriber<T>(
- *so,
- // on_next
- [so](T t){
- so->on_next(t);
- },
- // on_error
- [so](std::exception_ptr e){
- so->on_error(e);
- },
- // on_completed
- [so](){
- so->on_completed();
- }).
- as_dynamic());
- }
+ using dynamic_observable<T>::on_subscribe;
key_type on_get_key() const {
return state->on_get_key();
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index dbb3615..39e6bdc 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -104,6 +104,21 @@ public:
{
}
+ template<class U, class O>
+ friend class subscriber;
+
+ template<class O>
+ subscriber(
+ const subscriber<T, O>& o,
+ typename std::enable_if<
+ !std::is_same<O, observer<T>>::value &&
+ std::is_same<Observer, observer<T>>::value, void**>::type select = nullptr)
+ : lifetime(o.lifetime)
+ , destination(o.destination.as_dynamic())
+ , id(o.id)
+ {
+ }
+
template<class U>
subscriber(trace_id id, composite_subscription cs, U&& o)
: lifetime(std::move(cs))