diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-23 18:41:35 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-23 18:41:35 -0700 |
commit | e477c5a3f2613a244bae4963498b7ddb2443f605 (patch) | |
tree | 97b1919dcb35f613dba934651e287adb2eab63b3 /Rx | |
parent | 01a89e0d1e8fa8c547b8e180d4c8d32c4c14e2dd (diff) | |
download | RxCpp-e477c5a3f2613a244bae4963498b7ddb2443f605.tar.gz |
lifetime improvements
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 82 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp | 28 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-scheduler.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-subject.hpp | 17 |
4 files changed, 91 insertions, 44 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index 3d25e78..0efeb14 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -68,6 +68,22 @@ struct observe_on , destination(std::move(d)) { } + + void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const { + if (!guard.owns_lock()) { + abort(); + } + if (current == mode::Errored || current == mode::Disposed) {return;} + current = end; + queue_type fill_expired; + swap(fill_expired, fill_queue); + queue_type drain_expired; + swap(drain_expired, drain_queue); + RXCPP_UNWIND_AUTO([&](){guard.lock();}); + guard.unlock(); + lifetime.unsubscribe(); + destination.unsubscribe(); + } void ensure_processing(std::unique_lock<std::mutex>& guard) const { if (!guard.owns_lock()) { @@ -75,42 +91,43 @@ struct observe_on } if (current == mode::Empty) { current = mode::Processing; + + if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) { + finish(guard, mode::Disposed); + } auto keepAlive = this->shared_from_this(); - + auto drain = [keepAlive, this](const rxsc::schedulable& self){ using std::swap; try { - if (drain_queue.empty() || !destination.is_subscribed()) { - std::unique_lock<std::mutex> guard(lock); - if (!destination.is_subscribed() || - (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { - current = mode::Disposed; - queue_type expired; - swap(expired, fill_queue); - guard.unlock(); - lifetime.unsubscribe(); - destination.unsubscribe(); - return; - } - if (drain_queue.empty()) { - if (fill_queue.empty()) { - current = mode::Empty; + for (;;) { + if (drain_queue.empty() || !destination.is_subscribed()) { + std::unique_lock<std::mutex> guard(lock); + if (!destination.is_subscribed() || + (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { + finish(guard, mode::Disposed); return; } - swap(fill_queue, drain_queue); + if (drain_queue.empty()) { + if (fill_queue.empty()) { + current = mode::Empty; + return; + } + swap(fill_queue, drain_queue); + } } + auto notification = std::move(drain_queue.front()); + drain_queue.pop_front(); + notification->accept(destination); + std::unique_lock<std::mutex> guard(lock); + self(); + if (lifetime.is_subscribed()) break; } - auto notification = std::move(drain_queue.front()); - drain_queue.pop_front(); - notification->accept(destination); - self(); } catch(...) { destination.on_error(std::current_exception()); std::unique_lock<std::mutex> guard(lock); - current = mode::Errored; - queue_type expired; - swap(expired, fill_queue); + finish(guard, mode::Errored); } }; @@ -118,15 +135,12 @@ struct observe_on [&](){return coordinator.act(drain);}, destination); if (selectedDrain.empty()) { - current = mode::Errored; - using std::swap; - queue_type expired; - swap(expired, fill_queue); + finish(guard, mode::Errored); return; } auto processor = coordinator.get_worker(); - + RXCPP_UNWIND_AUTO([&](){guard.lock();}); guard.unlock(); @@ -143,16 +157,19 @@ struct observe_on void on_next(source_value_type v) const { std::unique_lock<std::mutex> guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_next(std::move(v))); state->ensure_processing(guard); } void on_error(std::exception_ptr e) const { std::unique_lock<std::mutex> guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_error(e)); state->ensure_processing(guard); } void on_completed() const { std::unique_lock<std::mutex> guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_completed()); state->ensure_processing(guard); } @@ -163,7 +180,7 @@ struct observe_on this_type o(d, std::move(coor), cs); auto keepAlive = o.state; - cs.add([keepAlive](){ + cs.add([=](){ std::unique_lock<std::mutex> guard(keepAlive->lock); keepAlive->ensure_processing(guard); }); @@ -262,6 +279,11 @@ public: } }; +inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { + static observe_on_one_worker r(rxsc::make_run_loop(rl)); + return r; +} + inline observe_on_one_worker observe_on_event_loop() { static observe_on_one_worker r(rxsc::make_event_loop()); return r; diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp index 17fe35a..fb38902 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp @@ -52,14 +52,12 @@ struct subscribe_on : public operator_base<T> : public std::enable_shared_from_this<subscribe_on_state_type> , public subscribe_on_values { - subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg) + subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg) : subscribe_on_values(i) - , coordinator(std::move(coor)) , out(oarg) { } composite_subscription source_lifetime; - coordinator_type coordinator; output_type out; private: subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE; @@ -72,33 +70,39 @@ struct subscribe_on : public operator_base<T> auto controller = coordinator.get_worker(); // take a copy of the values for each subscription - auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(coordinator), std::move(s)); + auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s)); + + auto sl = state->source_lifetime; + auto ol = state->out.get_subscription(); auto disposer = [=](const rxsc::schedulable&){ - state->source_lifetime.unsubscribe(); - state->out.unsubscribe(); + sl.unsubscribe(); + ol.unsubscribe(); coordinator_lifetime.unsubscribe(); }; auto selectedDisposer = on_exception( - [&](){return state->coordinator.act(disposer);}, + [&](){return coordinator.act(disposer);}, state->out); if (selectedDisposer.empty()) { return; } - - state->out.add([=](){ - controller.schedule(selectedDisposer.get()); - }); + state->source_lifetime.add([=](){ controller.schedule(selectedDisposer.get()); }); + state->out.add([=](){ + sl.unsubscribe(); + ol.unsubscribe(); + coordinator_lifetime.unsubscribe(); + }); + auto producer = [=](const rxsc::schedulable&){ state->source.subscribe(state->source_lifetime, state->out); }; auto selectedProducer = on_exception( - [&](){return state->coordinator.act(producer);}, + [&](){return coordinator.act(producer);}, state->out); if (selectedProducer.empty()) { return; diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index b842775..a9e029f 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -329,7 +329,7 @@ inline bool operator==(const worker& lhs, const worker& rhs) { inline bool operator!=(const worker& lhs, const worker& rhs) { return !(lhs == rhs); } - + class weak_worker { detail::worker_interface_weak_ptr inner; @@ -344,7 +344,7 @@ public: , lifetime(owner.lifetime) { } - + worker lock() const { return worker(lifetime, inner.lock()); } @@ -419,6 +419,9 @@ inline scheduler make_scheduler(ArgN&&... an) { return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...))); } +inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) { + return scheduler(si); +} class schedulable : public schedulable_base { @@ -912,6 +915,7 @@ namespace rxsc=schedulers; } #include "schedulers/rx-currentthread.hpp" +#include "schedulers/rx-runloop.hpp" #include "schedulers/rx-newthread.hpp" #include "schedulers/rx-eventloop.hpp" #include "schedulers/rx-immediate.hpp" diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index 9c4339c..96b7213 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -26,6 +26,7 @@ class multicast_observer enum type { Invalid = 0, Casting, + Disposed, Completed, Errored }; @@ -102,6 +103,15 @@ public: explicit multicast_observer(composite_subscription cs) : b(std::make_shared<binder_type>(cs)) { + auto keepAlive = b; + b->state->lifetime.add([keepAlive](){ + if (keepAlive->state->current == mode::Casting){ + keepAlive->state->current = mode::Disposed; + keepAlive->current_completer.reset(); + keepAlive->completer.reset(); + ++keepAlive->state->generation; + } + }); } trace_id get_id() const { return b->id; @@ -144,6 +154,13 @@ public: return; } break; + case mode::Disposed: + { + guard.unlock(); + o.unsubscribe(); + return; + } + break; default: abort(); } |