summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp
diff options
context:
space:
mode:
authorIgor Murashkin <iam@google.com>2019-02-20 13:52:07 -0800
committerIgor Murashkin <iam@google.com>2019-02-20 13:52:07 -0800
commitbf1edc83fbbcb99293c1ef12d1c057dd38577914 (patch)
tree30696c133fbbe4ac173b40626b1515929dc7ced6 /Rx/v2/src/rxcpp
parent2f3177dd770845c830419ef07b1b90a5a24d0649 (diff)
parentaac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a (diff)
downloadRxCpp-bf1edc83fbbcb99293c1ef12d1c057dd38577914.tar.gz
android: Merge branch 'upstream-master' into master
Updates to aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a in upstream. Change-Id: Ic56ec9d83b5611e734dfdbebec7c3538388f5c82
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp10
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp109
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp6
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp1
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp30
5 files changed, 134 insertions, 22 deletions
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 1f5c5e4..1eb47db 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -27,6 +27,8 @@
#define RXCPP_USE_EXCEPTIONS 1
#endif
+#define RXCPP_NORETURN __declspec(noreturn)
+
#elif defined(__clang__)
#if __has_feature(cxx_rvalue_references)
@@ -42,6 +44,12 @@
#define RXCPP_USE_EXCEPTIONS 1
#endif
+#if __has_feature(cxx_attributes)
+#define RXCPP_NORETURN [[noreturn]]
+#else
+#define RXCPP_NORETURN __attribute__ ((noreturn))
+#endif
+
#elif defined(__GNUG__)
#define GCC_VERSION (__GNUC__ * 10000 + \
@@ -64,6 +72,8 @@
#define RXCPP_USE_EXCEPTIONS 1
#endif
+#define RXCPP_NORETURN __attribute__ ((noreturn))
+
#endif
//
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index ee4e53e..2d6bcb6 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -117,6 +117,14 @@ private:
std::terminate();
}
}
+
+ explicit subscription(std::shared_ptr<base_subscription_state> s)
+ : state(std::move(s))
+ {
+ if (!state) {
+ std::terminate();
+ }
+ }
public:
subscription()
@@ -178,9 +186,23 @@ public:
weak_state_type get_weak() {
return state;
}
+
+ // Atomically promote weak subscription to strong.
+ // Calls std::terminate if w has already expired.
static subscription lock(weak_state_type w) {
return subscription(w);
}
+
+ // Atomically try to promote weak subscription to strong.
+ // Returns an empty maybe<> if w has already expired.
+ static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
+ auto strong_subscription = w.lock();
+ if (!strong_subscription) {
+ return rxu::detail::maybe<subscription>{};
+ } else {
+ return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
+ }
+ }
};
inline bool operator<(const subscription& lhs, const subscription& rhs) {
@@ -223,8 +245,14 @@ private:
typedef subscription::weak_state_type weak_subscription;
struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
{
+ // invariant: cannot access this data without the lock held.
std::set<subscription> subscriptions;
+ // double checked locking:
+ // issubscribed must be loaded again after each lock acquisition.
+ // invariant:
+ // never call subscription::unsubscribe with lock held.
std::mutex lock;
+ // invariant: transitions from 'true' to 'false' exactly once, at any time.
std::atomic<bool> issubscribed;
~composite_subscription_state()
@@ -242,29 +270,78 @@ private:
{
}
+ // Atomically add 's' to the set of subscriptions.
+ //
+ // If unsubscribe() has already occurred, this immediately
+ // calls s.unsubscribe().
+ //
+ // cs.unsubscribe() [must] happens-before s.unsubscribe()
+ //
+ // Due to the un-atomic nature of calling 's.unsubscribe()',
+ // it is possible to observe the unintuitive
+ // add(s)=>s.unsubscribe() prior
+ // to any of the unsubscribe()=>sN.unsubscribe().
inline weak_subscription add(subscription s) {
- if (!issubscribed) {
+ if (!issubscribed) { // load.acq [seq_cst]
s.unsubscribe();
} else if (s.is_subscribed()) {
std::unique_lock<decltype(lock)> guard(lock);
- subscriptions.insert(s);
+ if (!issubscribed) { // load.acq [seq_cst]
+ // unsubscribe was called concurrently.
+ guard.unlock();
+ // invariant: do not call unsubscribe with lock held.
+ s.unsubscribe();
+ } else {
+ subscriptions.insert(s);
+ }
}
return s.get_weak();
}
+ // Atomically remove 'w' from the set of subscriptions.
+ //
+ // This does nothing if 'w' was already previously removed,
+ // or refers to an expired value.
inline void remove(weak_subscription w) {
- if (issubscribed && !w.expired()) {
- auto s = subscription::lock(w);
+ if (issubscribed) { // load.acq [seq_cst]
+ rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
+
+ if (maybe_subscription.empty()) {
+ // Do nothing if the subscription has already expired.
+ return;
+ }
+
std::unique_lock<decltype(lock)> guard(lock);
- subscriptions.erase(std::move(s));
+ // invariant: subscriptions must be accessed under the lock.
+
+ if (issubscribed) { // load.acq [seq_cst]
+ subscription& s = maybe_subscription.get();
+ subscriptions.erase(std::move(s));
+ } // else unsubscribe() was called concurrently; this becomes a no-op.
}
}
+ // Atomically clear all subscriptions that were observably added
+ // (and not subsequently observably removed).
+ //
+ // Un-atomically call unsubscribe on those subscriptions.
+ //
+ // forall subscriptions in {add(s1),add(s2),...}
+ // - {remove(s3), remove(s4), ...}:
+ // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
+ //
+ // cs.unsubscribe() observed-before cs.clear ==> do nothing.
inline void clear() {
- if (issubscribed) {
+ if (issubscribed) { // load.acq [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);
+ if (!issubscribed) { // load.acq [seq_cst]
+ // unsubscribe was called concurrently.
+ return;
+ }
+
std::set<subscription> v(std::move(subscriptions));
+ // invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
@@ -272,11 +349,29 @@ private:
}
}
+ // Atomically clear all subscriptions that were observably added
+ // (and not subsequently observably removed).
+ //
+ // Un-atomically call unsubscribe on those subscriptions.
+ //
+ // Switches to an 'unsubscribed' state, all subsequent
+ // adds are immediately unsubscribed.
+ //
+ // cs.unsubscribe() [must] happens-before
+ // cs.add(s) ==> s.unsubscribe()
+ //
+ // forall subscriptions in {add(s1),add(s2),...}
+ // - {remove(s3), remove(s4), ...}:
+ // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
inline void unsubscribe() {
- if (issubscribed.exchange(false)) {
+ if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);
+ // is_subscribed can only transition to 'false' once,
+ // does not need an extra atomic access here.
+
std::set<subscription> v(std::move(subscriptions));
+ // invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 9ce455f..e5867e5 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -899,7 +899,7 @@ error_ptr make_error_ptr(E&& e) {
}
// Replace std::rethrow_exception to be compatible with our error_ptr typedef.
-[[noreturn]] inline void rethrow_exception(error_ptr e) {
+RXCPP_NORETURN inline void rethrow_exception(error_ptr e) {
#if RXCPP_USE_EXCEPTIONS
std::rethrow_exception(e);
#else
@@ -917,7 +917,7 @@ error_ptr make_error_ptr(E&& e) {
// A replacement for the "throw" keyword which is illegal when
// exceptions are disabled with -fno-exceptions.
template <typename E>
-[[noreturn]] inline void throw_exception(E&& e) {
+RXCPP_NORETURN inline void throw_exception(E&& e) {
#if RXCPP_USE_EXCEPTIONS
throw std::forward<E>(e);
#else
@@ -930,7 +930,7 @@ template <typename E>
// TODO: Do we really need this? rxu::rethrow_exception(rxu::current_exception())
// would have the same semantics in either case.
-[[noreturn]] inline void rethrow_current_exception() {
+RXCPP_NORETURN inline void rethrow_current_exception() {
#if RXCPP_USE_EXCEPTIONS
std::rethrow_exception(std::current_exception());
#else
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index e31ed55..5145e92 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -72,6 +72,7 @@ private:
state->lifetime.add([keepAlive](){
std::unique_lock<std::mutex> guard(keepAlive->lock);
auto expired = std::move(keepAlive->q);
+ keepAlive->q = new_worker_state::queue_item_time{};
if (!keepAlive->q.empty()) std::terminate();
keepAlive->wake.notify_one();
diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
index b7f7d68..020b0f8 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
@@ -43,6 +43,7 @@ class replay_observer : public detail::multicast_observer<T>
mutable std::list<time_point_type> time_points;
mutable count_type count;
mutable period_type period;
+ mutable composite_subscription replayLifetime;
public:
mutable coordination_type coordination;
mutable coordinator_type coordinator;
@@ -56,9 +57,13 @@ class replay_observer : public detail::multicast_observer<T>
}
public:
- explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator)
+ ~replay_observer_state(){
+ replayLifetime.unsubscribe();
+ }
+ explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
: count(_count)
, period(_period)
+ , replayLifetime(_replayLifetime)
, coordination(std::move(_coordination))
, coordinator(std::move(_coordinator))
{
@@ -66,6 +71,7 @@ class replay_observer : public detail::multicast_observer<T>
void add(T v) const {
std::unique_lock<std::mutex> guard(lock);
+
if (!count.empty()) {
if (values.size() == count.get())
remove_oldest();
@@ -89,11 +95,12 @@ class replay_observer : public detail::multicast_observer<T>
std::shared_ptr<replay_observer_state> state;
public:
- replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs)
- : base_type(cs)
+ replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
+ : base_type(subscriberLifetime)
{
- auto coordinator = coordination.create_coordinator(cs);
- state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator));
+ replayLifetime.add(subscriberLifetime);
+ auto coordinator = coordination.create_coordinator(replayLifetime);
+ state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
}
subscriber<T> get_subscriber() const {
@@ -129,22 +136,22 @@ class replay
public:
explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(), period_type(), cn, cs)
+ : s(count_type(), period_type(), cn, cs, composite_subscription{})
{
}
replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(std::move(count)), period_type(), cn, cs)
+ : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
{
}
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(), period_type(period), cn, cs)
+ : s(count_type(), period_type(period), cn, cs, composite_subscription{})
{
}
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(count), period_type(period), cn, cs)
+ : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
{
}
@@ -163,9 +170,8 @@ public:
observable<T> get_observable() const {
auto keepAlive = s;
auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
- if (keepAlive.get_subscription().is_subscribed()) {
- for (auto&& value: get_values())
- o.on_next(value);
+ for (auto&& value: get_values()) {
+ o.on_next(value);
}
keepAlive.add(keepAlive.get_subscriber(), std::move(o));
});