From aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a Mon Sep 17 00:00:00 2001 From: Igor Murashkin Date: Mon, 11 Feb 2019 15:37:26 -0800 Subject: rxcpp: Fix data race in composite_subscription composite_subscription_inner had missing checks which could lead to add/remove/clear racing against unsubscribe. (See the issue for more details). Fixes: #475 --- Rx/v2/src/rxcpp/rx-subscription.hpp | 109 +++++++++++++++++++++++++++++++++--- 1 file changed, 102 insertions(+), 7 deletions(-) (limited to 'Rx') diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index ee4e53e..2d6bcb6 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -117,6 +117,14 @@ private: std::terminate(); } } + + explicit subscription(std::shared_ptr s) + : state(std::move(s)) + { + if (!state) { + std::terminate(); + } + } public: subscription() @@ -178,9 +186,23 @@ public: weak_state_type get_weak() { return state; } + + // Atomically promote weak subscription to strong. + // Calls std::terminate if w has already expired. static subscription lock(weak_state_type w) { return subscription(w); } + + // Atomically try to promote weak subscription to strong. + // Returns an empty maybe<> if w has already expired. + static rxu::maybe maybe_lock(weak_state_type w) { + auto strong_subscription = w.lock(); + if (!strong_subscription) { + return rxu::detail::maybe{}; + } else { + return rxu::detail::maybe{subscription{std::move(strong_subscription)}}; + } + } }; inline bool operator<(const subscription& lhs, const subscription& rhs) { @@ -223,8 +245,14 @@ private: typedef subscription::weak_state_type weak_subscription; struct composite_subscription_state : public std::enable_shared_from_this { + // invariant: cannot access this data without the lock held. std::set subscriptions; + // double checked locking: + // issubscribed must be loaded again after each lock acquisition. + // invariant: + // never call subscription::unsubscribe with lock held. std::mutex lock; + // invariant: transitions from 'true' to 'false' exactly once, at any time. std::atomic issubscribed; ~composite_subscription_state() @@ -242,29 +270,78 @@ private: { } + // Atomically add 's' to the set of subscriptions. + // + // If unsubscribe() has already occurred, this immediately + // calls s.unsubscribe(). + // + // cs.unsubscribe() [must] happens-before s.unsubscribe() + // + // Due to the un-atomic nature of calling 's.unsubscribe()', + // it is possible to observe the unintuitive + // add(s)=>s.unsubscribe() prior + // to any of the unsubscribe()=>sN.unsubscribe(). inline weak_subscription add(subscription s) { - if (!issubscribed) { + if (!issubscribed) { // load.acq [seq_cst] s.unsubscribe(); } else if (s.is_subscribed()) { std::unique_lock guard(lock); - subscriptions.insert(s); + if (!issubscribed) { // load.acq [seq_cst] + // unsubscribe was called concurrently. + guard.unlock(); + // invariant: do not call unsubscribe with lock held. + s.unsubscribe(); + } else { + subscriptions.insert(s); + } } return s.get_weak(); } + // Atomically remove 'w' from the set of subscriptions. + // + // This does nothing if 'w' was already previously removed, + // or refers to an expired value. inline void remove(weak_subscription w) { - if (issubscribed && !w.expired()) { - auto s = subscription::lock(w); + if (issubscribed) { // load.acq [seq_cst] + rxu::maybe maybe_subscription = subscription::maybe_lock(w); + + if (maybe_subscription.empty()) { + // Do nothing if the subscription has already expired. + return; + } + std::unique_lock guard(lock); - subscriptions.erase(std::move(s)); + // invariant: subscriptions must be accessed under the lock. + + if (issubscribed) { // load.acq [seq_cst] + subscription& s = maybe_subscription.get(); + subscriptions.erase(std::move(s)); + } // else unsubscribe() was called concurrently; this becomes a no-op. } } + // Atomically clear all subscriptions that were observably added + // (and not subsequently observably removed). + // + // Un-atomically call unsubscribe on those subscriptions. + // + // forall subscriptions in {add(s1),add(s2),...} + // - {remove(s3), remove(s4), ...}: + // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() + // + // cs.unsubscribe() observed-before cs.clear ==> do nothing. inline void clear() { - if (issubscribed) { + if (issubscribed) { // load.acq [seq_cst] std::unique_lock guard(lock); + if (!issubscribed) { // load.acq [seq_cst] + // unsubscribe was called concurrently. + return; + } + std::set v(std::move(subscriptions)); + // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { @@ -272,11 +349,29 @@ private: } } + // Atomically clear all subscriptions that were observably added + // (and not subsequently observably removed). + // + // Un-atomically call unsubscribe on those subscriptions. + // + // Switches to an 'unsubscribed' state, all subsequent + // adds are immediately unsubscribed. + // + // cs.unsubscribe() [must] happens-before + // cs.add(s) ==> s.unsubscribe() + // + // forall subscriptions in {add(s1),add(s2),...} + // - {remove(s3), remove(s4), ...}: + // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() inline void unsubscribe() { - if (issubscribed.exchange(false)) { + if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst] std::unique_lock guard(lock); + // is_subscribed can only transition to 'false' once, + // does not need an extra atomic access here. + std::set v(std::move(subscriptions)); + // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { -- cgit v1.2.3