summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-subscriber.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-subscriber.hpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp462
1 files changed, 412 insertions, 50 deletions
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 53fbd92..dbb3615 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -18,32 +18,37 @@ struct subscriber_base : public observer_base<T>, public subscription_base
template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T>
{
+ static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
+ static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
typedef subscriber<T, Observer> this_type;
typedef typename std::decay<Observer>::type observer_type;
composite_subscription lifetime;
observer_type destination;
+ trace_id id;
struct nextdetacher
{
~nextdetacher()
{
trace_activity().on_next_return(*that);
- if (that) {
+ if (do_unsubscribe) {
that->unsubscribe();
}
}
nextdetacher(const this_type* that)
: that(that)
+ , do_unsubscribe(true)
{
}
template<class U>
void operator()(U u) {
trace_activity().on_next_enter(*that, u);
that->destination.on_next(std::move(u));
- that = nullptr;
+ do_unsubscribe = false;
}
const this_type* that;
+ bool do_unsubscribe;
};
struct errordetacher
@@ -89,24 +94,31 @@ public:
subscriber(const this_type& o)
: lifetime(o.lifetime)
, destination(o.destination)
+ , id(o.id)
{
}
subscriber(this_type&& o)
: lifetime(std::move(o.lifetime))
, destination(std::move(o.destination))
+ , id(std::move(o.id))
{
}
template<class U>
- subscriber(composite_subscription cs, U&& o)
+ subscriber(trace_id id, composite_subscription cs, U&& o)
: lifetime(std::move(cs))
, destination(std::forward<U>(o))
+ , id(std::move(id))
{
+ static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
+ static_assert(is_observer<U>::value, "must pass observer to subscriber");
+ trace_activity().create_subscriber(*this);
}
this_type& operator=(this_type o) {
lifetime = std::move(o.lifetime);
destination = std::move(o.destination);
+ id = std::move(o.id);
return *this;
}
@@ -122,9 +134,12 @@ public:
composite_subscription& get_subscription() {
return lifetime;
}
+ trace_id get_id() const {
+ return id;
+ }
subscriber<T> as_dynamic() const {
- return subscriber<T>(lifetime, destination.as_dynamic());
+ return subscriber<T>(id, lifetime, destination.as_dynamic());
}
// observer
@@ -192,7 +207,7 @@ auto make_subscriber()
-> typename std::enable_if<
detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>>::type {
- return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
}
@@ -201,14 +216,14 @@ template<class T, class I>
auto make_subscriber(
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(composite_subscription(), o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class Observer>
auto make_subscriber(const Observer& o)
-> typename std::enable_if<
is_observer<Observer>::value,
subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(composite_subscription(), o);
+ return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class Observer>
auto make_subscriber(const Observer& o)
@@ -218,14 +233,14 @@ auto make_subscriber(const Observer& o)
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(composite_subscription(), o);
+ return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
template<class T, class OnNext>
auto make_subscriber(const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
}
@@ -235,7 +250,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe)
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
}
@@ -245,7 +260,7 @@ auto make_subscriber(const OnNext& on, const OnCompleted& oc)
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
}
@@ -256,7 +271,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(composite_subscription(),
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
}
@@ -267,7 +282,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
template<class T>
auto make_subscriber(const composite_subscription& cs)
-> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
- return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
}
@@ -276,14 +291,21 @@ template<class T, class I>
auto make_subscriber(const composite_subscription& cs,
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(cs, o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
+}
+template<class T, class I>
+auto make_subscriber(const composite_subscription& cs,
+ const subscriber<T, I>& s)
+ -> subscriber<T, I> {
+ return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
}
template<class T, class Observer>
auto make_subscriber(const composite_subscription& cs, const Observer& o)
-> typename std::enable_if<
+ !is_subscriber<Observer>::value &&
is_observer<Observer>::value,
subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(cs, o);
+ return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
}
template<class T, class Observer>
auto make_subscriber(const composite_subscription& cs, const Observer& o)
@@ -293,14 +315,14 @@ auto make_subscriber(const composite_subscription& cs, const Observer& o)
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(cs, o);
+ return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o));
}
template<class T, class OnNext>
auto make_subscriber(const composite_subscription& cs, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
}
@@ -310,7 +332,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
}
@@ -320,7 +342,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
}
@@ -331,7 +353,153 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs,
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+}
+
+// explicit id
+//
+
+template<class T>
+auto make_subscriber(trace_id id)
+ -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
+ static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
+}
+
+template<class T>
+auto make_subscriber(trace_id id, const composite_subscription& cs)
+ -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> {
+ return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), cs,
+ observer<T, static_observer<T, detail::OnNextEmpty<T>>>(
+ static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())));
+}
+
+template<class T, class I>
+auto make_subscriber(trace_id id,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o);
+}
+template<class T, class I>
+auto make_subscriber(trace_id id, const composite_subscription& cs,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), cs, o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ return subscriber<T, Observer>(std::move(id), composite_subscription(), o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ return subscriber<T, Observer>(std::move(id), cs, o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o);
+}
+template<class T, class Observer>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ return subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
+}
+template<class T, class OnNext>
+auto make_subscriber(trace_id id, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+}
+template<class T, class OnNext>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+}
+template<class T, class OnNext, class OnError>
+auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+}
+template<class T, class OnNext, class OnError>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+}
+template<class T, class OnNext, class OnCompleted>
+auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+}
+template<class T, class OnNext, class OnCompleted>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs,
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+}
+template<class T, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), composite_subscription(),
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+}
+template<class T, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
}
@@ -342,15 +510,37 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O
template<class T, class OtherT, class OtherObserver, class I>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr,
const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(scbr.get_subscription(), o);
+ -> subscriber<T, observer<T, I>> {
+ auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class I>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
-> typename std::enable_if<
+ !is_subscription<Observer>::value &&
is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(scbr.get_subscription(), o);
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
@@ -359,37 +549,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observ
!is_subscriber<Observer>::value &&
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(scbr.get_subscription(), o);
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
@@ -397,24 +642,56 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(scbr.get_subscription(),
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), scbr.get_subscription(),
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class I>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs,
const observer<T, I>& o)
-> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(cs, o);
+ return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
+}
+template<class T, class OtherT, class OtherObserver, class I>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs,
+ const observer<T, I>& o)
+ -> subscriber<T, observer<T, I>> {
+ return subscriber<T, observer<T, I>>(std::move(id), cs, o);
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
-> typename std::enable_if<
is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(cs, o);
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ is_observer<Observer>::value,
+ subscriber<T, Observer>>::type {
+ auto r = subscriber<T, Observer>(std::move(id), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class Observer>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
@@ -423,37 +700,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos
!is_subscriber<Observer>::value &&
!is_subscription<Observer>::value &&
!is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(cs, o);
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class Observer>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
+ -> typename std::enable_if<
+ !detail::is_on_next_of<T, Observer>::value &&
+ !is_subscriber<Observer>::value &&
+ !is_subscription<Observer>::value &&
+ !is_observer<Observer>::value,
+ subscriber<T, observer<T, Observer>>>::type {
+ auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext>>(
+ static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext>>(
static_observer<T, OnNext>(on)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError>>(
+ static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError>>(
static_observer<T, OnNext, OnError>(on, oe)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
+ static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
@@ -461,18 +793,48 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
- return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs,
+ observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
+ static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
+auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
+ -> typename std::enable_if<
+ detail::is_on_next_of<T, OnNext>::value &&
+ detail::is_on_error<OnError>::value &&
+ detail::is_on_completed<OnCompleted>::value,
+ subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type {
+ auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs,
observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc)));
+ trace_activity().connect(r, scbr);
+ return r;
}
-// override lifetime
-//
template<class T, class Observer>
auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs)
-> subscriber<T, Observer> {
- return subscriber<T, Observer>(cs, scbr.get_observer());
+ auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
+}
+template<class T, class Observer>
+auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs)
+ -> subscriber<T, Observer> {
+ auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
+}
+
+template<class T, class Observer>
+auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id)
+ -> subscriber<T, Observer> {
+ auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
+ trace_activity().connect(r, scbr);
+ return r;
}
}