diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-20 08:35:26 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-20 08:35:26 -0700 |
commit | fba0f2d04f8d6ac53994a994353f6ae3cb6a2219 (patch) | |
tree | 4be73631eb77c8c53e260a92b5642ff074a2ee00 /Rx/v2/src/rxcpp | |
parent | 1f67a383c47b24ffdf4e97771cebbb89f9c167dd (diff) | |
download | RxCpp-fba0f2d04f8d6ac53994a994353f6ae3cb6a2219.tar.gz |
add synchronize subject and fix lifetime issues
This finally forced a rewrite of subscription. pretty happy with the
result.
flat map removed all sync and takes a new parameter that is applied to
every source and can be used to synchronize them
also added multicast
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-flat_map.hpp | 92 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-multicast.hpp | 97 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-publish.hpp | 45 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-ref_count.hpp | 4 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-notification.hpp | 22 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 62 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observer.hpp | 25 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-scheduler.hpp | 15 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subjects.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 9 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscription.hpp | 442 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 4 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-test.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-subject.hpp | 17 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp | 214 |
17 files changed, 764 insertions, 298 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index 38dd15f..96e7ea6 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -13,11 +13,12 @@ namespace operators { namespace detail { -template<class Observable, class CollectionSelector, class ResultSelector> +template<class Observable, class CollectionSelector, class ResultSelector, class SourceFilter> struct flat_map_traits { typedef typename std::decay<Observable>::type source_type; typedef typename std::decay<CollectionSelector>::type collection_selector_type; typedef typename std::decay<ResultSelector>::type result_selector_type; + typedef typename std::decay<SourceFilter>::type source_filter_type; typedef typename source_type::value_type source_value_type; @@ -47,12 +48,12 @@ struct flat_map_traits { typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; }; -template<class Observable, class CollectionSelector, class ResultSelector> +template<class Observable, class CollectionSelector, class ResultSelector, class SourceFilter> struct flat_map - : public operator_base<typename flat_map_traits<Observable, CollectionSelector, ResultSelector>::value_type> + : public operator_base<typename flat_map_traits<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type> { - typedef flat_map<Observable, CollectionSelector, ResultSelector> this_type; - typedef flat_map_traits<Observable, CollectionSelector, ResultSelector> traits; + typedef flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter> this_type; + typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, SourceFilter> traits; typedef typename traits::source_type source_type; typedef typename traits::collection_selector_type collection_selector_type; @@ -62,22 +63,26 @@ struct flat_map typedef typename traits::collection_type collection_type; typedef typename traits::collection_value_type collection_value_type; + typedef typename traits::source_filter_type source_filter_type; + struct values { - values(source_type o, collection_selector_type s, result_selector_type rs) + values(source_type o, collection_selector_type s, result_selector_type rs, source_filter_type sf) : source(std::move(o)) , selectCollection(std::move(s)) , selectResult(std::move(rs)) + , sourceFilter(std::move(sf)) { } source_type source; collection_selector_type selectCollection; result_selector_type selectResult; + source_filter_type sourceFilter; }; values initial; - flat_map(source_type o, collection_selector_type s, result_selector_type rs) - : initial(std::move(o), std::move(s), std::move(rs)) + flat_map(source_type o, collection_selector_type s, result_selector_type rs, source_filter_type sf) + : initial(std::move(o), std::move(s), std::move(rs), std::move(sf)) { } @@ -99,12 +104,7 @@ struct flat_map } // on_completed on the output must wait until all the // subscriptions have received on_completed - std::atomic<int> pendingCompletions; - // because multiple sources are subscribed to by flat_map, - // calls to the output must be serialized by lock. - // the on_error/on_complete and unsubscribe calls can - // cause lock recursion. - std::recursive_mutex lock; + int pendingCompletions; output_type out; }; // take a copy of the values for each subscription @@ -116,21 +116,26 @@ struct flat_map // inner subscriptions are unsubscribed as well state->out.add(outercs); + auto source = on_exception( + [&](){return state->sourceFilter(state->source);}, + state->out); + if (source.empty()) { + return; + } + ++state->pendingCompletions; // this subscribe does not share the observer subscription // so that when it is unsubscribed the observer can be called // until the inner subscriptions have finished - state->source.subscribe( + source->subscribe( state->out, outercs, // on_next [state](source_value_type st) { - util::detail::maybe<collection_type> selectedCollection; - try { - selectedCollection.reset(state->selectCollection(st)); - } catch(...) { - std::unique_lock<std::recursive_mutex> guard(state->lock); - state->out.on_error(std::current_exception()); + auto selectedCollection = on_exception( + [&](){return state->selectCollection(st);}, + state->out); + if (selectedCollection.empty()) { return; } @@ -144,34 +149,36 @@ struct flat_map state->out.remove(innercstoken); })); + auto selectedSource = on_exception( + [&](){return state->sourceFilter(selectedCollection.get());}, + state->out); + if (selectedSource.empty()) { + return; + } + ++state->pendingCompletions; // this subscribe does not share the source subscription // so that when it is unsubscribed the source will continue - selectedCollection->subscribe( + selectedSource->subscribe( state->out, innercs, // on_next [state, st](collection_value_type ct) { - util::detail::maybe<typename this_type::value_type> selectedResult; - try { - selectedResult.reset(state->selectResult(st, std::move(ct))); - } catch(...) { - std::unique_lock<std::recursive_mutex> guard(state->lock); - state->out.on_error(std::current_exception()); + auto selectedResult = on_exception( + [&](){return state->selectResult(st, std::move(ct));}, + state->out); + if (selectedResult.empty()) { return; } - std::unique_lock<std::recursive_mutex> guard(state->lock); state->out.on_next(std::move(*selectedResult)); }, // on_error [state](std::exception_ptr e) { - std::unique_lock<std::recursive_mutex> guard(state->lock); state->out.on_error(e); }, //on_completed [state](){ if (--state->pendingCompletions == 0) { - std::unique_lock<std::recursive_mutex> guard(state->lock); state->out.on_completed(); } } @@ -179,13 +186,11 @@ struct flat_map }, // on_error [state](std::exception_ptr e) { - std::unique_lock<std::recursive_mutex> guard(state->lock); state->out.on_error(e); }, // on_completed [state]() { if (--state->pendingCompletions == 0) { - std::unique_lock<std::recursive_mutex> guard(state->lock); state->out.on_completed(); } } @@ -193,35 +198,38 @@ struct flat_map } }; -template<class CollectionSelector, class ResultSelector> +template<class CollectionSelector, class ResultSelector, class SourceFilter> class flat_map_factory { typedef typename std::decay<CollectionSelector>::type collection_selector_type; typedef typename std::decay<ResultSelector>::type result_selector_type; + typedef typename std::decay<SourceFilter>::type source_filter_type; collection_selector_type selectorCollection; result_selector_type selectorResult; + source_filter_type sourceFilter; public: - flat_map_factory(collection_selector_type s, result_selector_type rs) + flat_map_factory(collection_selector_type s, result_selector_type rs, source_filter_type sf) : selectorCollection(std::move(rs)) , selectorResult(std::move(s)) + , sourceFilter(std::move(sf)) { } template<class Observable> auto operator()(Observable&& source) - -> observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>> { - return observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>>( - flat_map<Observable, CollectionSelector, ResultSelector>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult))); + -> observable<typename flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type, flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>> { + return observable<typename flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type, flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>>( + flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult), std::move(sourceFilter))); } }; } -template<class CollectionSelector, class ResultSelector> -auto flat_map(CollectionSelector&& s, ResultSelector&& rs) - -> detail::flat_map_factory<CollectionSelector, ResultSelector> { - return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs)); +template<class CollectionSelector, class ResultSelector, class SourceFilter> +auto flat_map(CollectionSelector&& s, ResultSelector&& rs, SourceFilter&& sf) + -> detail::flat_map_factory<CollectionSelector, ResultSelector, SourceFilter> { + return detail::flat_map_factory<CollectionSelector, ResultSelector, SourceFilter>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<SourceFilter>(sf)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp new file mode 100644 index 0000000..7f25071 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP) +#define RXCPP_OPERATORS_RX_MULTICAST_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Observable, class Subject> +struct multicast : public operator_base<T> +{ + typedef typename std::decay<Observable>::type source_type; + typedef typename std::decay<Subject>::type subject_type; + + struct multicast_state : public std::enable_shared_from_this<multicast_state> + { + multicast_state(source_type o, subject_type sub) + : source(std::move(o)) + , subject_value(std::move(sub)) + { + } + source_type source; + subject_type subject_value; + rxu::detail::maybe<typename composite_subscription::weak_subscription> connection; + }; + + std::shared_ptr<multicast_state> state; + + multicast(source_type o, subject_type sub) + : state(std::make_shared<multicast_state>(std::move(o), std::move(sub))) + { + } + template<class Subscriber> + void on_subscribe(Subscriber&& o) { + state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o)); + } + void on_connect(composite_subscription cs) { + if (state->connection.empty()) { + auto destination = state->subject_value.get_subscriber(); + + // the lifetime of each connect is nested in the subject lifetime + state->connection.reset(destination.add(cs)); + + auto localState = state; + + // when the connection is finished it should shutdown the connection + cs.add( + [destination, localState](){ + if (!localState->connection.empty()) { + destination.remove(localState->connection.get()); + localState->connection.reset(); + } + }); + + // use cs not destination for lifetime of subscribe. + state->source.subscribe(cs, destination); + } + } +}; + +template<class Subject> +class multicast_factory +{ + Subject caster; +public: + multicast_factory(Subject sub) + : caster(std::move(sub)) + { + } + template<class Observable> + auto operator()(Observable&& source) + -> connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>> { + return connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>>( + multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>(std::forward<Observable>(source), caster)); + } +}; + +} + +template<class Subject> +inline auto multicast(Subject sub) + -> detail::multicast_factory<Subject> { + return detail::multicast_factory<Subject>(std::move(sub)); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp index fcb89f3..34de51a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp @@ -13,44 +13,6 @@ namespace operators { namespace detail { -template<class T, class Observable, class Subject> -struct publish : public operator_base<T> -{ - typedef typename std::decay<Observable>::type source_type; - typedef typename std::decay<Subject>::type subject_type; - source_type source; - subject_type subject_value; - rxu::detail::maybe<typename composite_subscription::weak_subscription> connection; - - explicit publish(source_type o) - : source(std::move(o)) - { - } - template<class Subscriber> - void on_subscribe(Subscriber&& o) { - subject_value.get_observable().subscribe(std::forward<Subscriber>(o)); - } - void on_connect(composite_subscription cs) { - if (connection.empty()) { - // the lifetime of each connect is independent - auto destination = subject_value.get_subscriber(); - - // when the paramter is unsubscribed it should - // unsubscribe the most recent connection - connection.reset(cs.add(destination.get_subscription())); - - // when the connection is finished it should shutdown the connection - destination.add(make_subscription( - [cs, this](){ - cs.remove(this->connection.get()); - this->connection.reset(); - })); - - source.subscribe(destination); - } - } -}; - template<template<class T> class Subject> class publish_factory { @@ -58,9 +20,10 @@ public: publish_factory() {} template<class Observable> auto operator()(Observable&& source) - -> connectable_observable<typename std::decay<Observable>::type::value_type, publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>> { - return connectable_observable<typename std::decay<Observable>::type::value_type, publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>>( - publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>(std::forward<Observable>(source))); + -> connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>> { + return connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>>( + multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>( + std::forward<Observable>(source), Subject<typename std::decay<Observable>::type::value_type>())); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp index 8d8b1a7..52a3d30 100644 --- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp @@ -31,12 +31,12 @@ struct ref_count : public operator_base<T> template<class Subscriber> void on_subscribe(Subscriber&& o) { auto needConnect = ++subscribers == 1; - o.add(make_subscription( + o.add( [this](){ if (--this->subscribers == 0) { this->connection.unsubscribe(); } - })); + }); source.subscribe(std::forward<Subscriber>(o)); if (needConnect) { connection = source.connect(); diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index b15a3d1..75518ab 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -85,6 +85,7 @@ #include <algorithm> #include <atomic> #include <map> +#include <set> #include <mutex> #include <deque> #include <thread> @@ -94,6 +95,7 @@ #include <queue> #include <chrono> #include <condition_variable> +#include <typeinfo> #include "rx-util.hpp" #include "rx-predef.hpp" diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp index dd1ff78..8e4d27a 100644 --- a/Rx/v2/src/rxcpp/rx-notification.hpp +++ b/Rx/v2/src/rxcpp/rx-notification.hpp @@ -56,6 +56,24 @@ struct notification_base virtual void accept(const observer_type& o) const =0; }; +template<class T> +auto to_stream(std::ostream& os, const T& t, int, int) + -> decltype(os << t) { + return os << t; +} + +template<class T> +auto to_stream(std::ostream& os, const T&, int, ...) + -> decltype(os << typeid(T).name() << "") { + return os << "< " << typeid(T).name() << " does not support ostream>"; +} + +template<class T> +auto to_stream(std::ostream& os, const T&, ...) + -> decltype(os << "") { + return os << "<the value does not support ostream>"; +} + } template<typename T> @@ -71,7 +89,9 @@ private: on_next_notification(T value) : value(std::move(value)) { } virtual void out(std::ostream& os) const { - os << "on_next( " << value << ")"; + os << "on_next( "; + detail::to_stream(os, value, 0, 0); + os << ")"; } virtual bool equals(const typename base::type& other) const { bool result = false; diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 016bba3..5c197a3 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -74,6 +74,20 @@ struct select_chain } +// +// this type is the default used by operators that subscribe to +// multiple sources. It assumes that the sources are already synchronized +// +struct identity_observable +{ + template<class Observable> + auto operator()(Observable o) + -> Observable { + return std::move(o); + static_assert(is_observable<Observable>::value, "only support observables"); + } +}; + template<class T> class dynamic_observable : public rxs::source_base<T> @@ -150,7 +164,7 @@ private: template<class Subscriber> auto detail_subscribe(Subscriber&& scrbr) const - -> decltype(make_subscription(*(typename std::decay<Subscriber>::type*)nullptr)) { + -> composite_subscription { typedef typename std::decay<Subscriber>::type subscriber_type; subscriber_type o = std::forward<Subscriber>(scrbr); @@ -160,7 +174,7 @@ private: static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); if (!o.is_subscribed()) { - return make_subscription(o); + return o.get_subscription(); } auto safe_subscribe = [=]() { @@ -189,7 +203,7 @@ private: safe_subscribe(); } - return make_subscription(o); + return o.get_subscription(); } public: @@ -302,18 +316,46 @@ public: /// template<class CollectionSelector, class ResultSelector> auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const - -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>> { - return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>>( - rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs))); + -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>> { + return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>>( + rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_observable())); + } + + /// flat_map (AKA SelectMany) -> + /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. + /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned. + /// + template<class CollectionSelector, class ResultSelector, class SourceFilter> + auto flat_map(CollectionSelector&& s, ResultSelector&& rs, SourceFilter&& sf) const + -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>> { + return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>>( + rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<SourceFilter>(sf))); + } + + /// multicast -> + /// allows connections to the source to be independent of subscriptions + /// + template<class Subject> + auto multicast(Subject sub) const + -> connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>> { + return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>( + rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub))); + } + + /// synchronize -> + /// turns a cold observable hot and allows connections to the source to be independent of subscriptions + /// + auto synchronize(rxsc::worker w, composite_subscription cs = composite_subscription()) const + -> decltype(multicast(rxsub::synchronize<T>(w, cs))) { + return multicast(rxsub::synchronize<T>(w, cs)); } /// publish -> /// turns a cold observable hot and allows connections to the source to be independent of subscriptions /// - auto publish() const - -> connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>> { - return connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>>( - rxo::detail::publish<T, this_type, rxsub::subject<T>>(*this)); + auto publish(composite_subscription cs = composite_subscription()) const + -> decltype(multicast(rxsub::subject<T>(cs))) { + return multicast(rxsub::subject<T>(cs)); } /// take -> diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp index 6b573ae..b6bad50 100644 --- a/Rx/v2/src/rxcpp/rx-observer.hpp +++ b/Rx/v2/src/rxcpp/rx-observer.hpp @@ -366,6 +366,31 @@ auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc) dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)))); } + +template<class F, class OnError> +auto on_exception(const F& f, const OnError& c) + -> typename std::enable_if<detail::is_on_error<OnError>::value, rxu::detail::maybe<decltype(f())>>::type { + rxu::detail::maybe<decltype(f())> r; + try { + r.reset(f()); + } catch (...) { + c(std::current_exception()); + } + return r; +} + +template<class F, class Subscriber> +auto on_exception(const F& f, const Subscriber& s) + -> typename std::enable_if<is_subscriber<Subscriber>::value, rxu::detail::maybe<decltype(f())>>::type { + rxu::detail::maybe<decltype(f())> r; + try { + r.reset(f()); + } catch (...) { + s.on_error(std::current_exception()); + } + return r; +} + } #endif diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 34c44bf..48c84d5 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -39,6 +39,7 @@ namespace rxo=operators; #include "operators/rx-filter.hpp" #include "operators/rx-map.hpp" #include "operators/rx-flat_map.hpp" +#include "operators/rx-multicast.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" #include "operators/rx-connect_forever.hpp" diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index 754a7b0..5713b58 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -193,7 +193,6 @@ class worker : public worker_base friend bool operator==(const worker&, const worker&); public: typedef scheduler_base::clock_type clock_type; - typedef composite_subscription::shared_subscription shared_subscription; typedef composite_subscription::weak_subscription weak_subscription; worker() @@ -217,10 +216,7 @@ public: inline bool is_subscribed() const { return lifetime.is_subscribed(); } - inline weak_subscription add(shared_subscription s) const { - return lifetime.add(std::move(s)); - } - inline weak_subscription add(dynamic_subscription s) const { + inline weak_subscription add(subscription s) const { return lifetime.add(std::move(s)); } inline void remove(weak_subscription w) const { @@ -452,7 +448,6 @@ class schedulable : public schedulable_base public: typedef composite_subscription::weak_subscription weak_subscription; - typedef composite_subscription::shared_subscription shared_subscription; typedef scheduler_base::clock_type clock_type; ~schedulable() @@ -543,11 +538,13 @@ public: inline bool is_subscribed() const { return lifetime.is_subscribed(); } - inline weak_subscription add(shared_subscription s) const { + inline weak_subscription add(subscription s) const { return lifetime.add(std::move(s)); } - inline weak_subscription add(dynamic_subscription s) const { - return lifetime.add(std::move(s)); + template<class F> + auto add(F f) const + -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type { + return lifetime.add(make_subscription(std::move(f))); } inline void remove(weak_subscription w) const { return lifetime.remove(std::move(w)); diff --git a/Rx/v2/src/rxcpp/rx-subjects.hpp b/Rx/v2/src/rxcpp/rx-subjects.hpp index 1289835..3c3fb32 100644 --- a/Rx/v2/src/rxcpp/rx-subjects.hpp +++ b/Rx/v2/src/rxcpp/rx-subjects.hpp @@ -17,5 +17,6 @@ namespace rxsub=subjects; } #include "subjects/rx-subject.hpp" +#include "subjects/rx-synchronize.hpp" -#endif
\ No newline at end of file +#endif diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 7c3207d..939d76c 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -43,7 +43,6 @@ class subscriber : public subscriber_base<T> subscriber(); public: typedef typename composite_subscription::weak_subscription weak_subscription; - typedef typename composite_subscription::shared_subscription shared_subscription; subscriber(const this_type& o) : lifetime(o.lifetime) @@ -132,11 +131,13 @@ public: bool is_subscribed() const { return lifetime.is_subscribed(); } - weak_subscription add(shared_subscription s) const { + weak_subscription add(subscription s) const { return lifetime.add(std::move(s)); } - weak_subscription add(dynamic_subscription s) const { - return lifetime.add(std::move(s)); + template<class F> + auto add(F f) const + -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { + return lifetime.add(make_subscription(std::move(f))); } void remove(weak_subscription w) const { return lifetime.remove(std::move(w)); diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index 40700e8..527d52b 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -9,6 +9,21 @@ namespace rxcpp { +namespace detail { + +template<class F> +struct is_unsubscribe_function +{ + struct not_void {}; + template<class CF> + static auto check(int) -> decltype((*(CF*)nullptr)()); + template<class CF> + static not_void check(...); + + static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value; +}; + +} struct tag_subscription {}; struct subscription_base {typedef tag_subscription subscription_tag;}; @@ -23,39 +38,8 @@ public: static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_subscription*>::value; }; -class dynamic_subscription : public subscription_base -{ - typedef std::function<void()> unsubscribe_call_type; - unsubscribe_call_type unsubscribe_call; - dynamic_subscription() - { - } -public: - dynamic_subscription(const dynamic_subscription& o) - : unsubscribe_call(o.unsubscribe_call) - { - } - dynamic_subscription(dynamic_subscription&& o) - : unsubscribe_call(std::move(o.unsubscribe_call)) - { - } - template<class I> - dynamic_subscription(I i, typename std::enable_if<is_subscription<I>::value && !std::is_same<I, dynamic_subscription>::value, void**>::type selector = nullptr) - : unsubscribe_call([i](){ - i.unsubscribe();}) - { - } - dynamic_subscription(unsubscribe_call_type s) - : unsubscribe_call(std::move(s)) - { - } - void unsubscribe() const { - unsubscribe_call(); - } -}; - template<class Unsubscribe> -class static_subscription : public subscription_base +class static_subscription { typedef typename std::decay<Unsubscribe>::type unsubscribe_call_type; unsubscribe_call_type unsubscribe_call; @@ -80,178 +64,243 @@ public: } }; -template<class I> class subscription : public subscription_base { - typedef typename std::decay<I>::type inner_t; - inner_t inner; - mutable bool issubscribed; + class base_subscription_state : public std::enable_shared_from_this<base_subscription_state> + { + base_subscription_state(); + public: + + explicit base_subscription_state(bool initial) + : issubscribed(initial) + { + } + virtual void unsubscribe() { + issubscribed = false; + } + std::atomic<bool> issubscribed; + }; public: - subscription(inner_t inner) - : inner(std::move(inner)) - , issubscribed(true) + typedef std::weak_ptr<base_subscription_state> weak_state_type; + +private: + template<class I> + struct subscription_state : public base_subscription_state { - } - bool is_subscribed() const { - return issubscribed; - } - void unsubscribe() const { - if (issubscribed) { - inner.unsubscribe(); + typedef typename std::decay<I>::type inner_t; + subscription_state(inner_t i) + : base_subscription_state(true) + , inner(std::move(i)) + { + } + virtual void unsubscribe() { + if (issubscribed.exchange(false)) { + inner.unsubscribe(); + } + } + inner_t inner; + }; + std::shared_ptr<base_subscription_state> state; + + friend bool operator<(const subscription&, const subscription&); + friend bool operator==(const subscription&, const subscription&); + + subscription(weak_state_type w) + : state(w.lock()) + { + if (!state) { + abort(); } - issubscribed = false; } -}; -template<> -class subscription<void> : public subscription_base -{ public: + subscription() + : state(std::make_shared<base_subscription_state>(false)) + { + if (!state) { + abort(); + } + } + template<class U> + explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr) + : state(std::make_shared<subscription_state<U>>(std::move(u))) + { + if (!state) { + abort(); + } + } + template<class U> + explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr) + // intentionally slice + : state(std::move(static_cast<subscription&>(u).state)) + { + if (!state) { + abort(); + } + } + subscription(const subscription& o) + : state(o.state) + { + if (!state) { + abort(); + } + } + subscription(subscription&& o) + : state(std::move(o.state)) { + if (!state) { + abort(); + } + } + subscription& operator=(subscription o) { + state = std::move(o.state); + return *this; } bool is_subscribed() const { - return false; + if (!state) { + abort(); + } + return state->issubscribed; } void unsubscribe() const { + if (!state) { + abort(); + } + auto keepAlive = state; + state->unsubscribe(); + } + + weak_state_type get_weak() { + return state; + } + static subscription lock(weak_state_type w) { + return subscription(w); } }; + +inline bool operator<(const subscription& lhs, const subscription& rhs) { + return lhs.state < rhs.state; +} +inline bool operator==(const subscription& lhs, const subscription& rhs) { + return lhs.state == rhs.state; +} +inline bool operator!=(const subscription& lhs, const subscription& rhs) { + return !(lhs == rhs); +} + + inline auto make_subscription() - -> subscription<void> { - return subscription<void>(); + -> subscription { + return subscription(); } template<class I> auto make_subscription(I&& i) - -> typename std::enable_if<is_subscription<I>::value, - subscription<I>>::type { - return subscription<I>(std::forward<I>(i)); + -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value, + subscription>::type { + return subscription(std::forward<I>(i)); } template<class Unsubscribe> auto make_subscription(Unsubscribe&& u) - -> typename std::enable_if<!is_subscription<Unsubscribe>::value, - subscription< static_subscription<Unsubscribe>>>::type { - return subscription< static_subscription<Unsubscribe>>( - static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u))); + -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value, + subscription>::type { + return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u))); } -class composite_subscription : public subscription_base +namespace detail { + +struct tag_composite_subscription_empty {}; + +class composite_subscription_inner { -public: - typedef std::shared_ptr<dynamic_subscription> shared_subscription; - typedef std::weak_ptr<dynamic_subscription> weak_subscription; private: - struct tag_empty {}; - struct state_t : public std::enable_shared_from_this<state_t> + typedef subscription::weak_state_type weak_subscription; + struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state> { - std::vector<shared_subscription> subscriptions; - std::recursive_mutex lock; - bool issubscribed; + std::set<subscription> subscriptions; + std::mutex lock; + std::atomic<bool> issubscribed; - state_t() - : issubscribed(true) + ~composite_subscription_state() { + std::unique_lock<decltype(lock)> guard(lock); + subscriptions.clear(); } - state_t(tag_empty&&) - : issubscribed(false) + composite_subscription_state() + : issubscribed(true) { } - - inline bool is_subscribed() { - return issubscribed; - } - - inline weak_subscription add(dynamic_subscription s) { - return add(std::make_shared<dynamic_subscription>(std::move(s))); + composite_subscription_state(tag_composite_subscription_empty) + : issubscribed(false) + { } - inline weak_subscription add(shared_subscription s) { - std::unique_lock<decltype(lock)> guard(lock); - + inline weak_subscription add(subscription s) { if (!issubscribed) { - s->unsubscribe(); - } else { - auto end = std::end(subscriptions); - auto it = std::find(std::begin(subscriptions), end, s); - if (it == end) - { - subscriptions.emplace_back(s); - } + s.unsubscribe(); + } else if (s.is_subscribed()) { + std::unique_lock<decltype(lock)> guard(lock); + subscriptions.insert(s); } - return s; + return s.get_weak(); } inline void remove(weak_subscription w) { - std::unique_lock<decltype(lock)> guard(lock); - if (issubscribed && !w.expired()) { - auto s = w.lock(); - if (s) - { - auto end = std::end(subscriptions); - auto it = std::find(std::begin(subscriptions), end, s); - if (it != end) - { - subscriptions.erase(it); - } - } + std::unique_lock<decltype(lock)> guard(lock); + subscriptions.erase(subscription::lock(w)); } } inline void clear() { - std::unique_lock<decltype(lock)> guard(lock); - if (issubscribed) { - std::vector<shared_subscription> v(std::move(subscriptions)); + std::unique_lock<decltype(lock)> guard(lock); + + std::set<subscription> v(std::move(subscriptions)); + guard.unlock(); std::for_each(v.begin(), v.end(), - [](shared_subscription& s) { - s->unsubscribe(); }); + [](const subscription& s) { + s.unsubscribe(); }); } } inline void unsubscribe() { - std::unique_lock<decltype(lock)> guard(lock); + if (issubscribed.exchange(false)) { + std::unique_lock<decltype(lock)> guard(lock); - if (issubscribed) { - issubscribed = false; - std::vector<shared_subscription> v(std::move(subscriptions)); + std::set<subscription> v(std::move(subscriptions)); + guard.unlock(); std::for_each(v.begin(), v.end(), - [](shared_subscription& s) { - s->unsubscribe(); }); + [](const subscription& s) { + s.unsubscribe(); }); } } }; - mutable std::shared_ptr<state_t> state; +public: + typedef std::shared_ptr<composite_subscription_state> shared_state_type; - static std::shared_ptr<state_t> shared_empty; +protected: + mutable shared_state_type state; - composite_subscription(std::shared_ptr<state_t> s) - : state(std::move(s)) +public: + composite_subscription_inner() + : state(std::make_shared<composite_subscription_state>()) { - if (!state) { - abort(); - } } - - friend bool operator==(const composite_subscription&, const composite_subscription&); - -public: - - composite_subscription() - : state(std::make_shared<state_t>()) + composite_subscription_inner(tag_composite_subscription_empty et) + : state(std::make_shared<composite_subscription_state>(et)) { - if (!state) { - abort(); - } } - composite_subscription(const composite_subscription& o) + + composite_subscription_inner(const composite_subscription_inner& o) : state(o.state) { if (!state) { abort(); } } - composite_subscription(composite_subscription&& o) + composite_subscription_inner(composite_subscription_inner&& o) : state(std::move(o.state)) { if (!state) { @@ -259,75 +308,120 @@ public: } } - composite_subscription& operator=(const composite_subscription& o) + composite_subscription_inner& operator=(composite_subscription_inner o) { - state = o.state; + state = std::move(o.state); if (!state) { abort(); } return *this; } - composite_subscription& operator=(composite_subscription&& o) - { - state = std::move(o.state); + + inline weak_subscription add(subscription s) const { if (!state) { abort(); } - return *this; - } - - static inline composite_subscription empty() { - return composite_subscription(shared_empty); - } - - inline bool is_subscribed() const { - return state->is_subscribed(); - } - inline weak_subscription add(shared_subscription s) const { - return state->add(std::move(s)); - } - inline weak_subscription add(dynamic_subscription s) const { + if (s == static_cast<const subscription&>(*this)) { + // do not nest the same subscription + abort(); + //return s.get_weak(); + } return state->add(std::move(s)); } inline void remove(weak_subscription w) const { + if (!state) { + abort(); + } state->remove(std::move(w)); } inline void clear() const { + if (!state) { + abort(); + } state->clear(); } - inline void unsubscribe() const { + inline void unsubscribe() { + if (!state) { + abort(); + } state->unsubscribe(); } }; -inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) { - return lhs.state == rhs.state; -} -inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) { - return !(lhs == rhs); } -//static -RXCPP_SELECT_ANY std::shared_ptr<composite_subscription::state_t> composite_subscription::shared_empty = std::make_shared<composite_subscription::state_t>(composite_subscription::tag_empty()); +class composite_subscription + : protected detail::composite_subscription_inner + , public subscription +{ + typedef detail::composite_subscription_inner inner_type; +public: + typedef subscription::weak_state_type weak_subscription; + static composite_subscription shared_empty; -namespace detail { + composite_subscription(detail::tag_composite_subscription_empty et) + : inner_type(et) + , subscription() // use empty base + { + } -struct tag_subscription_resolution -{ - template<class LHS> - struct predicate +public: + + composite_subscription() + : inner_type() + , subscription(static_cast<const inner_type&>(*this)) { - static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value; - }; - struct default_type { - inline operator composite_subscription() const { - return composite_subscription(); - } - }; + } + + composite_subscription(const composite_subscription& o) + : inner_type(o) + , subscription(static_cast<const subscription&>(o)) + { + } + composite_subscription(composite_subscription&& o) + : inner_type(std::move(o)) + , subscription(std::move(static_cast<subscription&>(o))) + { + } + + composite_subscription& operator=(composite_subscription o) + { + inner_type::operator=(std::move(o)); + subscription::operator=(std::move(static_cast<subscription&>(o))); + return *this; + } + + static inline composite_subscription empty() { + return shared_empty; + } + + using subscription::is_subscribed; + using subscription::unsubscribe; + + using inner_type::add; + using inner_type::remove; + using inner_type::clear; + + template<class F> + auto add(F f) const + -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { + return inner_type::add(make_subscription(std::move(f))); + } }; +inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) { + return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs); } +inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) { + return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs); +} +inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) { + return !(lhs == rhs); +} + +//static +RXCPP_SELECT_ANY composite_subscription composite_subscription::shared_empty = composite_subscription(detail::tag_composite_subscription_empty()); } diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 6b37ca3..76b7f9e 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -227,12 +227,12 @@ public: } maybe& operator=(const T& other) { - set(other); + reset(other); return *this; } maybe& operator=(const maybe& other) { if (const T* pother = other.get()) { - set(*pother); + reset(*pother); } else { reset(); } diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp index 411bd43..6d3cf6f 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp @@ -251,9 +251,9 @@ public: } auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this()); - o.add(dynamic_subscription([sharedThis, index]() { + o.add([sharedThis, index]() { sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); - })); + }); } virtual std::vector<rxn::subscription> subscriptions() const { @@ -313,9 +313,9 @@ public: auto index = sv.size() - 1; auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this()); - o.add(dynamic_subscription([sharedThis, index]() { + o.add([sharedThis, index]() { sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); - })); + }); } virtual std::vector<rxn::subscription> subscriptions() const { diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index dffa37c..e892828 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -2,8 +2,8 @@ #pragma once -#if !defined(RXCPP_RX_SCHEDULER_SUBJECT_HPP) -#define RXCPP_RX_SCHEDULER_SUBJECT_HPP +#if !defined(RXCPP_RX_SUBJECT_HPP) +#define RXCPP_RX_SUBJECT_HPP #include "../rx-includes.hpp" @@ -200,7 +200,11 @@ class subject detail::multicast_observer<T> s; public: - explicit subject(composite_subscription cs = composite_subscription()) + subject() + : s(lifetime) + { + } + explicit subject(composite_subscription cs) : lifetime(cs) , s(cs) { @@ -210,11 +214,8 @@ public: return s.has_observers(); } - subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber(composite_subscription cs = composite_subscription()) const { - auto lt = lifetime; - auto token = lt.add(cs); - cs.add(make_subscription([token, lt](){lt.remove(token);})); - return make_subscriber<T>(cs, observer<T, detail::multicast_observer<T>>(s)); + subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const { + return make_subscriber<T>(lifetime, observer<T, detail::multicast_observer<T>>(s)); } observable<T> get_observable() const { diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp new file mode 100644 index 0000000..60ce46a --- /dev/null +++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp @@ -0,0 +1,214 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_RX_SYNCHRONIZE_HPP) +#define RXCPP_RX_SYNCHRONIZE_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace subjects { + +namespace detail { + +template<class T> +class synchronize_observer : public detail::multicast_observer<T> +{ + typedef synchronize_observer<T> this_type; + typedef detail::multicast_observer<T> base_type; + + struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state> + { + typedef rxn::notification<T> notification_type; + typedef typename notification_type::type base_notification_type; + typedef std::deque<base_notification_type> queue_type; + + struct mode + { + enum type { + Invalid = 0, + Processing, + Empty, + Disposed + }; + }; + + mutable std::mutex lock; + mutable std::condition_variable wake; + mutable queue_type queue; + composite_subscription lifetime; + rxsc::worker processor; + mutable typename mode::type current; + subscriber<T> destination; + + void ensure_processing(std::unique_lock<std::mutex>& guard) const { + if (!guard.owns_lock()) { + abort(); + } + if (current == mode::Empty) { + current = mode::Processing; + auto keepAlive = this->shared_from_this(); + processor.schedule(lifetime, [keepAlive, this](const rxsc::schedulable& self){ + try { + std::unique_lock<std::mutex> guard(lock); + if (!lifetime.is_subscribed() || !destination.is_subscribed()) { + current = mode::Disposed; + queue.clear(); + guard.unlock(); + lifetime.unsubscribe(); + destination.unsubscribe(); + return; + } + if (queue.empty()) { + current = mode::Empty; + return; + } + auto notification = std::move(queue.front()); + queue.pop_front(); + guard.unlock(); + notification->accept(destination); + self(); + } catch(...) { + destination.on_error(std::current_exception()); + std::unique_lock<std::mutex> guard(lock); + current = mode::Empty; + } + }); + } + } + + synchronize_observer_state(rxsc::worker w, composite_subscription cs, subscriber<T> scbr) + : lifetime(cs) + , processor(w) + , current(mode::Empty) + , destination(scbr) + { + } + + template<class V> + void on_next(V v) const { + if (lifetime.is_subscribed()) { + std::unique_lock<std::mutex> guard(lock); + queue.push_back(notification_type::on_next(std::move(v))); + wake.notify_one(); + ensure_processing(guard); + } + } + void on_error(std::exception_ptr e) const { + if (lifetime.is_subscribed()) { + std::unique_lock<std::mutex> guard(lock); + queue.push_back(notification_type::on_error(e)); + wake.notify_one(); + ensure_processing(guard); + } + } + void on_completed() const { + if (lifetime.is_subscribed()) { + std::unique_lock<std::mutex> guard(lock); + queue.push_back(notification_type::on_completed()); + wake.notify_one(); + ensure_processing(guard); + } + } + }; + + std::shared_ptr<synchronize_observer_state> state; + +public: + synchronize_observer(rxsc::worker w, composite_subscription cs) + : base_type(cs) + , state(std::make_shared<synchronize_observer_state>( + w, cs, make_subscriber<T>(cs, make_observer_dynamic<T>( *static_cast<base_type*>(this) )))) + {} + + template<class V> + void on_next(V v) const { + state->on_next(std::move(v)); + } + void on_error(std::exception_ptr e) const { + state->on_error(e); + } + void on_completed() const { + state->on_completed(); + } +}; + +} + +template<class T> +class synchronize +{ + rxsc::worker controller; + composite_subscription lifetime; + detail::synchronize_observer<T> s; + +public: + explicit synchronize(rxsc::worker w, composite_subscription cs = composite_subscription()) + : controller(w) + , lifetime(cs) + , s(w, cs) + { + } + + bool has_observers() const { + return s.has_observers(); + } + + subscriber<T> get_subscriber() const { + return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::synchronize_observer<T>>(s))); + } + + observable<T> get_observable() const { + return make_observable_dynamic<T>([this](subscriber<T> o){ + this->s.add(std::move(o)); + }); + } +}; + +// +// this type is used by operators that subscribe to +// multiple sources to ensure that the notifications are serialized +// +class synchronize_observable +{ + rxsc::scheduler factory; + rxsc::worker controller; +public: + synchronize_observable(rxsc::scheduler sc) + : factory(sc) + , controller(sc.create_worker()) + { + } + synchronize_observable(const synchronize_observable& o) + : factory(o.factory) + // new worker for each copy. this spreads work across threads + // but keeps each use serialized + , controller(factory.create_worker()) + { + } + synchronize_observable(synchronize_observable&& o) + : factory(std::move(o.factory)) + , controller(std::move(o.controller)) + { + } + synchronize_observable& operator=(synchronize_observable o) + { + factory = std::move(o.factory); + controller = std::move(o.controller); + return *this; + } + template<class Observable> + auto operator()(Observable o) + -> decltype(o.synchronize(controller).ref_count()) { + return o.synchronize(controller).ref_count(); + static_assert(is_observable<Observable>::value, "can only synchronize observables"); + } +}; + +} + +} + +#endif |