diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-subscriber.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 462 |
1 files changed, 412 insertions, 50 deletions
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 53fbd92..dbb3615 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -18,32 +18,37 @@ struct subscriber_base : public observer_base<T>, public subscription_base template<class T, class Observer = observer<T>> class subscriber : public subscriber_base<T> { + static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers"); + static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>"); typedef subscriber<T, Observer> this_type; typedef typename std::decay<Observer>::type observer_type; composite_subscription lifetime; observer_type destination; + trace_id id; struct nextdetacher { ~nextdetacher() { trace_activity().on_next_return(*that); - if (that) { + if (do_unsubscribe) { that->unsubscribe(); } } nextdetacher(const this_type* that) : that(that) + , do_unsubscribe(true) { } template<class U> void operator()(U u) { trace_activity().on_next_enter(*that, u); that->destination.on_next(std::move(u)); - that = nullptr; + do_unsubscribe = false; } const this_type* that; + bool do_unsubscribe; }; struct errordetacher @@ -89,24 +94,31 @@ public: subscriber(const this_type& o) : lifetime(o.lifetime) , destination(o.destination) + , id(o.id) { } subscriber(this_type&& o) : lifetime(std::move(o.lifetime)) , destination(std::move(o.destination)) + , id(std::move(o.id)) { } template<class U> - subscriber(composite_subscription cs, U&& o) + subscriber(trace_id id, composite_subscription cs, U&& o) : lifetime(std::move(cs)) , destination(std::forward<U>(o)) + , id(std::move(id)) { + static_assert(!is_subscriber<U>::value, "cannot nest subscribers"); + static_assert(is_observer<U>::value, "must pass observer to subscriber"); + trace_activity().create_subscriber(*this); } this_type& operator=(this_type o) { lifetime = std::move(o.lifetime); destination = std::move(o.destination); + id = std::move(o.id); return *this; } @@ -122,9 +134,12 @@ public: composite_subscription& get_subscription() { return lifetime; } + trace_id get_id() const { + return id; + } subscriber<T> as_dynamic() const { - return subscriber<T>(lifetime, destination.as_dynamic()); + return subscriber<T>(id, lifetime, destination.as_dynamic()); } // observer @@ -192,7 +207,7 @@ auto make_subscriber() -> typename std::enable_if< detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value, subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>>::type { - return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, detail::OnNextEmpty<T>>>( static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); } @@ -201,14 +216,14 @@ template<class T, class I> auto make_subscriber( const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(composite_subscription(), o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class Observer> auto make_subscriber(const Observer& o) -> typename std::enable_if< is_observer<Observer>::value, subscriber<T, Observer>>::type { - return subscriber<T, Observer>(composite_subscription(), o); + return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class Observer> auto make_subscriber(const Observer& o) @@ -218,14 +233,14 @@ auto make_subscriber(const Observer& o) !is_subscription<Observer>::value && !is_observer<Observer>::value, subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(composite_subscription(), o); + return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } template<class T, class OnNext> auto make_subscriber(const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); } @@ -235,7 +250,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe) detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); } @@ -245,7 +260,7 @@ auto make_subscriber(const OnNext& on, const OnCompleted& oc) detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); } @@ -256,7 +271,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(composite_subscription(), + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), composite_subscription(), observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); } @@ -267,7 +282,7 @@ auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) template<class T> auto make_subscriber(const composite_subscription& cs) -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { - return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(cs, + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, detail::OnNextEmpty<T>>>( static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); } @@ -276,14 +291,21 @@ template<class T, class I> auto make_subscriber(const composite_subscription& cs, const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(cs, o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); +} +template<class T, class I> +auto make_subscriber(const composite_subscription& cs, + const subscriber<T, I>& s) + -> subscriber<T, I> { + return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer()); } template<class T, class Observer> auto make_subscriber(const composite_subscription& cs, const Observer& o) -> typename std::enable_if< + !is_subscriber<Observer>::value && is_observer<Observer>::value, subscriber<T, Observer>>::type { - return subscriber<T, Observer>(cs, o); + return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); } template<class T, class Observer> auto make_subscriber(const composite_subscription& cs, const Observer& o) @@ -293,14 +315,14 @@ auto make_subscriber(const composite_subscription& cs, const Observer& o) !is_subscription<Observer>::value && !is_observer<Observer>::value, subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(cs, o); + return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o)); } template<class T, class OnNext> auto make_subscriber(const composite_subscription& cs, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); } @@ -310,7 +332,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); } @@ -320,7 +342,7 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); } @@ -331,7 +353,153 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs, + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); +} + +// explicit id +// + +template<class T> +auto make_subscriber(trace_id id) + -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, detail::OnNextEmpty<T>>>( + static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); +} + +template<class T> +auto make_subscriber(trace_id id, const composite_subscription& cs) + -> subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>> { + return subscriber<T, observer<T, static_observer<T, detail::OnNextEmpty<T>>>>(std::move(id), cs, + observer<T, static_observer<T, detail::OnNextEmpty<T>>>( + static_observer<T, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()))); +} + +template<class T, class I> +auto make_subscriber(trace_id id, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o); +} +template<class T, class I> +auto make_subscriber(trace_id id, const composite_subscription& cs, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), cs, o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + return subscriber<T, Observer>(std::move(id), composite_subscription(), o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + return subscriber<T, Observer>(std::move(id), cs, o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o); +} +template<class T, class Observer> +auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + return subscriber<T, observer<T, Observer>>(std::move(id), cs, o); +} +template<class T, class OnNext> +auto make_subscriber(trace_id id, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); +} +template<class T, class OnNext> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); +} +template<class T, class OnNext, class OnError> +auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); +} +template<class T, class OnNext, class OnError> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); +} +template<class T, class OnNext, class OnCompleted> +auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); +} +template<class T, class OnNext, class OnCompleted> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs, + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); +} +template<class T, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), composite_subscription(), + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); +} +template<class T, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); } @@ -342,15 +510,37 @@ auto make_subscriber(const composite_subscription& cs, const OnNext& on, const O template<class T, class OtherT, class OtherObserver, class I> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(scbr.get_subscription(), o); + -> subscriber<T, observer<T, I>> { + auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class I> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) -> typename std::enable_if< + !is_subscription<Observer>::value && is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(scbr.get_subscription(), o); + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) @@ -359,37 +549,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observ !is_subscriber<Observer>::value && !is_subscription<Observer>::value && !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(scbr.get_subscription(), o); + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o)); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc) @@ -397,24 +642,56 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(scbr.get_subscription(), + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), scbr.get_subscription(), observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class I> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const observer<T, I>& o) -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(cs, o); + return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); +} +template<class T, class OtherT, class OtherObserver, class I> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, + const observer<T, I>& o) + -> subscriber<T, observer<T, I>> { + return subscriber<T, observer<T, I>>(std::move(id), cs, o); } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) -> typename std::enable_if< is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(cs, o); + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + is_observer<Observer>::value, + subscriber<T, Observer>>::type { + auto r = subscriber<T, Observer>(std::move(id), cs, o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class Observer> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) @@ -423,37 +700,92 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos !is_subscriber<Observer>::value && !is_subscription<Observer>::value && !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(cs, o); + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class Observer> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) + -> typename std::enable_if< + !detail::is_on_next_of<T, Observer>::value && + !is_subscriber<Observer>::value && + !is_subscription<Observer>::value && + !is_observer<Observer>::value, + subscriber<T, observer<T, Observer>>>::type { + auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext>>( + static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value, + subscriber<T, observer<T, static_observer<T, OnNext>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext>>>(std::move(id), cs, observer<T, static_observer<T, OnNext>>( static_observer<T, OnNext>(on))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError>>( + static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError>>( static_observer<T, OnNext, OnError>(on, oe))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) -> typename std::enable_if< detail::is_on_next_of<T, OnNext>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( + static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>( static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc))); + trace_activity().connect(r, scbr); + return r; } template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) @@ -461,18 +793,48 @@ auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const compos detail::is_on_next_of<T, OnNext>::value && detail::is_on_error<OnError>::value && detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { - return subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(cs, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(trace_id::make_next_id_subscriber(), cs, + observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( + static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> +auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) + -> typename std::enable_if< + detail::is_on_next_of<T, OnNext>::value && + detail::is_on_error<OnError>::value && + detail::is_on_completed<OnCompleted>::value, + subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>>::type { + auto r = subscriber<T, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>(std::move(id), cs, observer<T, static_observer<T, OnNext, OnError, OnCompleted>>( static_observer<T, OnNext, OnError, OnCompleted>(on, oe, oc))); + trace_activity().connect(r, scbr); + return r; } -// override lifetime -// template<class T, class Observer> auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs) -> subscriber<T, Observer> { - return subscriber<T, Observer>(cs, scbr.get_observer()); + auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; +} +template<class T, class Observer> +auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs) + -> subscriber<T, Observer> { + auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; +} + +template<class T, class Observer> +auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id) + -> subscriber<T, Observer> { + auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer()); + trace_activity().connect(r, scbr); + return r; } } |