summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2015-06-23 18:41:35 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2015-06-23 18:41:35 -0700
commite477c5a3f2613a244bae4963498b7ddb2443f605 (patch)
tree97b1919dcb35f613dba934651e287adb2eab63b3 /Rx
parent01a89e0d1e8fa8c547b8e180d4c8d32c4c14e2dd (diff)
downloadRxCpp-e477c5a3f2613a244bae4963498b7ddb2443f605.tar.gz
lifetime improvements
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp82
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp28
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp8
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp17
4 files changed, 91 insertions, 44 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index 3d25e78..0efeb14 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -68,6 +68,22 @@ struct observe_on
, destination(std::move(d))
{
}
+
+ void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
+ if (!guard.owns_lock()) {
+ abort();
+ }
+ if (current == mode::Errored || current == mode::Disposed) {return;}
+ current = end;
+ queue_type fill_expired;
+ swap(fill_expired, fill_queue);
+ queue_type drain_expired;
+ swap(drain_expired, drain_queue);
+ RXCPP_UNWIND_AUTO([&](){guard.lock();});
+ guard.unlock();
+ lifetime.unsubscribe();
+ destination.unsubscribe();
+ }
void ensure_processing(std::unique_lock<std::mutex>& guard) const {
if (!guard.owns_lock()) {
@@ -75,42 +91,43 @@ struct observe_on
}
if (current == mode::Empty) {
current = mode::Processing;
+
+ if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
+ finish(guard, mode::Disposed);
+ }
auto keepAlive = this->shared_from_this();
-
+
auto drain = [keepAlive, this](const rxsc::schedulable& self){
using std::swap;
try {
- if (drain_queue.empty() || !destination.is_subscribed()) {
- std::unique_lock<std::mutex> guard(lock);
- if (!destination.is_subscribed() ||
- (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
- current = mode::Disposed;
- queue_type expired;
- swap(expired, fill_queue);
- guard.unlock();
- lifetime.unsubscribe();
- destination.unsubscribe();
- return;
- }
- if (drain_queue.empty()) {
- if (fill_queue.empty()) {
- current = mode::Empty;
+ for (;;) {
+ if (drain_queue.empty() || !destination.is_subscribed()) {
+ std::unique_lock<std::mutex> guard(lock);
+ if (!destination.is_subscribed() ||
+ (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
+ finish(guard, mode::Disposed);
return;
}
- swap(fill_queue, drain_queue);
+ if (drain_queue.empty()) {
+ if (fill_queue.empty()) {
+ current = mode::Empty;
+ return;
+ }
+ swap(fill_queue, drain_queue);
+ }
}
+ auto notification = std::move(drain_queue.front());
+ drain_queue.pop_front();
+ notification->accept(destination);
+ std::unique_lock<std::mutex> guard(lock);
+ self();
+ if (lifetime.is_subscribed()) break;
}
- auto notification = std::move(drain_queue.front());
- drain_queue.pop_front();
- notification->accept(destination);
- self();
} catch(...) {
destination.on_error(std::current_exception());
std::unique_lock<std::mutex> guard(lock);
- current = mode::Errored;
- queue_type expired;
- swap(expired, fill_queue);
+ finish(guard, mode::Errored);
}
};
@@ -118,15 +135,12 @@ struct observe_on
[&](){return coordinator.act(drain);},
destination);
if (selectedDrain.empty()) {
- current = mode::Errored;
- using std::swap;
- queue_type expired;
- swap(expired, fill_queue);
+ finish(guard, mode::Errored);
return;
}
auto processor = coordinator.get_worker();
-
+
RXCPP_UNWIND_AUTO([&](){guard.lock();});
guard.unlock();
@@ -143,16 +157,19 @@ struct observe_on
void on_next(source_value_type v) const {
std::unique_lock<std::mutex> guard(state->lock);
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
state->fill_queue.push_back(notification_type::on_next(std::move(v)));
state->ensure_processing(guard);
}
void on_error(std::exception_ptr e) const {
std::unique_lock<std::mutex> guard(state->lock);
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
state->fill_queue.push_back(notification_type::on_error(e));
state->ensure_processing(guard);
}
void on_completed() const {
std::unique_lock<std::mutex> guard(state->lock);
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
state->fill_queue.push_back(notification_type::on_completed());
state->ensure_processing(guard);
}
@@ -163,7 +180,7 @@ struct observe_on
this_type o(d, std::move(coor), cs);
auto keepAlive = o.state;
- cs.add([keepAlive](){
+ cs.add([=](){
std::unique_lock<std::mutex> guard(keepAlive->lock);
keepAlive->ensure_processing(guard);
});
@@ -262,6 +279,11 @@ public:
}
};
+inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
+ static observe_on_one_worker r(rxsc::make_run_loop(rl));
+ return r;
+}
+
inline observe_on_one_worker observe_on_event_loop() {
static observe_on_one_worker r(rxsc::make_event_loop());
return r;
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
index 17fe35a..fb38902 100644
--- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
@@ -52,14 +52,12 @@ struct subscribe_on : public operator_base<T>
: public std::enable_shared_from_this<subscribe_on_state_type>
, public subscribe_on_values
{
- subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg)
+ subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg)
: subscribe_on_values(i)
- , coordinator(std::move(coor))
, out(oarg)
{
}
composite_subscription source_lifetime;
- coordinator_type coordinator;
output_type out;
private:
subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
@@ -72,33 +70,39 @@ struct subscribe_on : public operator_base<T>
auto controller = coordinator.get_worker();
// take a copy of the values for each subscription
- auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(coordinator), std::move(s));
+ auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
+
+ auto sl = state->source_lifetime;
+ auto ol = state->out.get_subscription();
auto disposer = [=](const rxsc::schedulable&){
- state->source_lifetime.unsubscribe();
- state->out.unsubscribe();
+ sl.unsubscribe();
+ ol.unsubscribe();
coordinator_lifetime.unsubscribe();
};
auto selectedDisposer = on_exception(
- [&](){return state->coordinator.act(disposer);},
+ [&](){return coordinator.act(disposer);},
state->out);
if (selectedDisposer.empty()) {
return;
}
-
- state->out.add([=](){
- controller.schedule(selectedDisposer.get());
- });
+
state->source_lifetime.add([=](){
controller.schedule(selectedDisposer.get());
});
+ state->out.add([=](){
+ sl.unsubscribe();
+ ol.unsubscribe();
+ coordinator_lifetime.unsubscribe();
+ });
+
auto producer = [=](const rxsc::schedulable&){
state->source.subscribe(state->source_lifetime, state->out);
};
auto selectedProducer = on_exception(
- [&](){return state->coordinator.act(producer);},
+ [&](){return coordinator.act(producer);},
state->out);
if (selectedProducer.empty()) {
return;
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index b842775..a9e029f 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -329,7 +329,7 @@ inline bool operator==(const worker& lhs, const worker& rhs) {
inline bool operator!=(const worker& lhs, const worker& rhs) {
return !(lhs == rhs);
}
-
+
class weak_worker
{
detail::worker_interface_weak_ptr inner;
@@ -344,7 +344,7 @@ public:
, lifetime(owner.lifetime)
{
}
-
+
worker lock() const {
return worker(lifetime, inner.lock());
}
@@ -419,6 +419,9 @@ inline scheduler make_scheduler(ArgN&&... an) {
return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
}
+inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
+ return scheduler(si);
+}
class schedulable : public schedulable_base
{
@@ -912,6 +915,7 @@ namespace rxsc=schedulers;
}
#include "schedulers/rx-currentthread.hpp"
+#include "schedulers/rx-runloop.hpp"
#include "schedulers/rx-newthread.hpp"
#include "schedulers/rx-eventloop.hpp"
#include "schedulers/rx-immediate.hpp"
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index 9c4339c..96b7213 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -26,6 +26,7 @@ class multicast_observer
enum type {
Invalid = 0,
Casting,
+ Disposed,
Completed,
Errored
};
@@ -102,6 +103,15 @@ public:
explicit multicast_observer(composite_subscription cs)
: b(std::make_shared<binder_type>(cs))
{
+ auto keepAlive = b;
+ b->state->lifetime.add([keepAlive](){
+ if (keepAlive->state->current == mode::Casting){
+ keepAlive->state->current = mode::Disposed;
+ keepAlive->current_completer.reset();
+ keepAlive->completer.reset();
+ ++keepAlive->state->generation;
+ }
+ });
}
trace_id get_id() const {
return b->id;
@@ -144,6 +154,13 @@ public:
return;
}
break;
+ case mode::Disposed:
+ {
+ guard.unlock();
+ o.unsubscribe();
+ return;
+ }
+ break;
default:
abort();
}