diff options
author | Igor Murashkin <iam@google.com> | 2019-02-20 13:52:07 -0800 |
---|---|---|
committer | Igor Murashkin <iam@google.com> | 2019-02-20 13:52:07 -0800 |
commit | bf1edc83fbbcb99293c1ef12d1c057dd38577914 (patch) | |
tree | 30696c133fbbe4ac173b40626b1515929dc7ced6 /Rx/v2/src/rxcpp | |
parent | 2f3177dd770845c830419ef07b1b90a5a24d0649 (diff) | |
parent | aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a (diff) | |
download | RxCpp-bf1edc83fbbcb99293c1ef12d1c057dd38577914.tar.gz |
android: Merge branch 'upstream-master' into master
Updates to aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a in upstream.
Change-Id: Ic56ec9d83b5611e734dfdbebec7c3538388f5c82
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 10 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscription.hpp | 109 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 6 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp | 30 |
5 files changed, 134 insertions, 22 deletions
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 1f5c5e4..1eb47db 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -27,6 +27,8 @@ #define RXCPP_USE_EXCEPTIONS 1 #endif +#define RXCPP_NORETURN __declspec(noreturn) + #elif defined(__clang__) #if __has_feature(cxx_rvalue_references) @@ -42,6 +44,12 @@ #define RXCPP_USE_EXCEPTIONS 1 #endif +#if __has_feature(cxx_attributes) +#define RXCPP_NORETURN [[noreturn]] +#else +#define RXCPP_NORETURN __attribute__ ((noreturn)) +#endif + #elif defined(__GNUG__) #define GCC_VERSION (__GNUC__ * 10000 + \ @@ -64,6 +72,8 @@ #define RXCPP_USE_EXCEPTIONS 1 #endif +#define RXCPP_NORETURN __attribute__ ((noreturn)) + #endif // diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index ee4e53e..2d6bcb6 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -117,6 +117,14 @@ private: std::terminate(); } } + + explicit subscription(std::shared_ptr<base_subscription_state> s) + : state(std::move(s)) + { + if (!state) { + std::terminate(); + } + } public: subscription() @@ -178,9 +186,23 @@ public: weak_state_type get_weak() { return state; } + + // Atomically promote weak subscription to strong. + // Calls std::terminate if w has already expired. static subscription lock(weak_state_type w) { return subscription(w); } + + // Atomically try to promote weak subscription to strong. + // Returns an empty maybe<> if w has already expired. + static rxu::maybe<subscription> maybe_lock(weak_state_type w) { + auto strong_subscription = w.lock(); + if (!strong_subscription) { + return rxu::detail::maybe<subscription>{}; + } else { + return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}}; + } + } }; inline bool operator<(const subscription& lhs, const subscription& rhs) { @@ -223,8 +245,14 @@ private: typedef subscription::weak_state_type weak_subscription; struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state> { + // invariant: cannot access this data without the lock held. std::set<subscription> subscriptions; + // double checked locking: + // issubscribed must be loaded again after each lock acquisition. + // invariant: + // never call subscription::unsubscribe with lock held. std::mutex lock; + // invariant: transitions from 'true' to 'false' exactly once, at any time. std::atomic<bool> issubscribed; ~composite_subscription_state() @@ -242,29 +270,78 @@ private: { } + // Atomically add 's' to the set of subscriptions. + // + // If unsubscribe() has already occurred, this immediately + // calls s.unsubscribe(). + // + // cs.unsubscribe() [must] happens-before s.unsubscribe() + // + // Due to the un-atomic nature of calling 's.unsubscribe()', + // it is possible to observe the unintuitive + // add(s)=>s.unsubscribe() prior + // to any of the unsubscribe()=>sN.unsubscribe(). inline weak_subscription add(subscription s) { - if (!issubscribed) { + if (!issubscribed) { // load.acq [seq_cst] s.unsubscribe(); } else if (s.is_subscribed()) { std::unique_lock<decltype(lock)> guard(lock); - subscriptions.insert(s); + if (!issubscribed) { // load.acq [seq_cst] + // unsubscribe was called concurrently. + guard.unlock(); + // invariant: do not call unsubscribe with lock held. + s.unsubscribe(); + } else { + subscriptions.insert(s); + } } return s.get_weak(); } + // Atomically remove 'w' from the set of subscriptions. + // + // This does nothing if 'w' was already previously removed, + // or refers to an expired value. inline void remove(weak_subscription w) { - if (issubscribed && !w.expired()) { - auto s = subscription::lock(w); + if (issubscribed) { // load.acq [seq_cst] + rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w); + + if (maybe_subscription.empty()) { + // Do nothing if the subscription has already expired. + return; + } + std::unique_lock<decltype(lock)> guard(lock); - subscriptions.erase(std::move(s)); + // invariant: subscriptions must be accessed under the lock. + + if (issubscribed) { // load.acq [seq_cst] + subscription& s = maybe_subscription.get(); + subscriptions.erase(std::move(s)); + } // else unsubscribe() was called concurrently; this becomes a no-op. } } + // Atomically clear all subscriptions that were observably added + // (and not subsequently observably removed). + // + // Un-atomically call unsubscribe on those subscriptions. + // + // forall subscriptions in {add(s1),add(s2),...} + // - {remove(s3), remove(s4), ...}: + // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() + // + // cs.unsubscribe() observed-before cs.clear ==> do nothing. inline void clear() { - if (issubscribed) { + if (issubscribed) { // load.acq [seq_cst] std::unique_lock<decltype(lock)> guard(lock); + if (!issubscribed) { // load.acq [seq_cst] + // unsubscribe was called concurrently. + return; + } + std::set<subscription> v(std::move(subscriptions)); + // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { @@ -272,11 +349,29 @@ private: } } + // Atomically clear all subscriptions that were observably added + // (and not subsequently observably removed). + // + // Un-atomically call unsubscribe on those subscriptions. + // + // Switches to an 'unsubscribed' state, all subsequent + // adds are immediately unsubscribed. + // + // cs.unsubscribe() [must] happens-before + // cs.add(s) ==> s.unsubscribe() + // + // forall subscriptions in {add(s1),add(s2),...} + // - {remove(s3), remove(s4), ...}: + // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() inline void unsubscribe() { - if (issubscribed.exchange(false)) { + if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst] std::unique_lock<decltype(lock)> guard(lock); + // is_subscribed can only transition to 'false' once, + // does not need an extra atomic access here. + std::set<subscription> v(std::move(subscriptions)); + // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 9ce455f..e5867e5 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -899,7 +899,7 @@ error_ptr make_error_ptr(E&& e) { } // Replace std::rethrow_exception to be compatible with our error_ptr typedef. -[[noreturn]] inline void rethrow_exception(error_ptr e) { +RXCPP_NORETURN inline void rethrow_exception(error_ptr e) { #if RXCPP_USE_EXCEPTIONS std::rethrow_exception(e); #else @@ -917,7 +917,7 @@ error_ptr make_error_ptr(E&& e) { // A replacement for the "throw" keyword which is illegal when // exceptions are disabled with -fno-exceptions. template <typename E> -[[noreturn]] inline void throw_exception(E&& e) { +RXCPP_NORETURN inline void throw_exception(E&& e) { #if RXCPP_USE_EXCEPTIONS throw std::forward<E>(e); #else @@ -930,7 +930,7 @@ template <typename E> // TODO: Do we really need this? rxu::rethrow_exception(rxu::current_exception()) // would have the same semantics in either case. -[[noreturn]] inline void rethrow_current_exception() { +RXCPP_NORETURN inline void rethrow_current_exception() { #if RXCPP_USE_EXCEPTIONS std::rethrow_exception(std::current_exception()); #else diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index e31ed55..5145e92 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -72,6 +72,7 @@ private: state->lifetime.add([keepAlive](){ std::unique_lock<std::mutex> guard(keepAlive->lock); auto expired = std::move(keepAlive->q); + keepAlive->q = new_worker_state::queue_item_time{}; if (!keepAlive->q.empty()) std::terminate(); keepAlive->wake.notify_one(); diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp index b7f7d68..020b0f8 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp @@ -43,6 +43,7 @@ class replay_observer : public detail::multicast_observer<T> mutable std::list<time_point_type> time_points; mutable count_type count; mutable period_type period; + mutable composite_subscription replayLifetime; public: mutable coordination_type coordination; mutable coordinator_type coordinator; @@ -56,9 +57,13 @@ class replay_observer : public detail::multicast_observer<T> } public: - explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator) + ~replay_observer_state(){ + replayLifetime.unsubscribe(); + } + explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime) : count(_count) , period(_period) + , replayLifetime(_replayLifetime) , coordination(std::move(_coordination)) , coordinator(std::move(_coordinator)) { @@ -66,6 +71,7 @@ class replay_observer : public detail::multicast_observer<T> void add(T v) const { std::unique_lock<std::mutex> guard(lock); + if (!count.empty()) { if (values.size() == count.get()) remove_oldest(); @@ -89,11 +95,12 @@ class replay_observer : public detail::multicast_observer<T> std::shared_ptr<replay_observer_state> state; public: - replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs) - : base_type(cs) + replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime) + : base_type(subscriberLifetime) { - auto coordinator = coordination.create_coordinator(cs); - state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator)); + replayLifetime.add(subscriberLifetime); + auto coordinator = coordination.create_coordinator(replayLifetime); + state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime)); } subscriber<T> get_subscriber() const { @@ -129,22 +136,22 @@ class replay public: explicit replay(Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(), cn, cs) + : s(count_type(), period_type(), cn, cs, composite_subscription{}) { } replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(std::move(count)), period_type(), cn, cs) + : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{}) { } replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(period), cn, cs) + : s(count_type(), period_type(period), cn, cs, composite_subscription{}) { } replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(count), period_type(period), cn, cs) + : s(count_type(count), period_type(period), cn, cs, composite_subscription{}) { } @@ -163,9 +170,8 @@ public: observable<T> get_observable() const { auto keepAlive = s; auto observable = make_observable_dynamic<T>([=](subscriber<T> o){ - if (keepAlive.get_subscription().is_subscribed()) { - for (auto&& value: get_values()) - o.on_next(value); + for (auto&& value: get_values()) { + o.on_next(value); } keepAlive.add(keepAlive.get_subscriber(), std::move(o)); }); |