diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-12 07:35:12 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-18 19:55:10 -0700 |
commit | 83351c7ba1379bdf1978e3aecde5c604bfec0ffe (patch) | |
tree | 6a0d2a0fd22dd80f53492dbb6804438a079c8b72 /Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | |
parent | 6a72451c421947eedbe7feaf1f523af193320a43 (diff) | |
download | RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.gz |
fixes worker leaks (#101 & #132)
uses weak pointer to hold the worker in schedulable, empties queue
when new_thread worker lifetime ends and remove observer when
hot_observable subscription ends.
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index 2e24c0f..bed6ef9 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -24,7 +24,7 @@ private: private: typedef new_worker this_type; - typedef detail::action_queue queue; + typedef detail::action_queue queue_type; new_worker(const this_type&); @@ -57,7 +57,7 @@ private: composite_subscription lifetime; mutable std::mutex lock; mutable std::condition_variable wake; - mutable queue_item_time queue; + mutable queue_item_time q; std::thread worker; recursion r; }; @@ -80,31 +80,34 @@ private: auto keepAlive = state; state->lifetime.add([keepAlive](){ + std::unique_lock<std::mutex> guard(keepAlive->lock); + auto expired = std::move(keepAlive->q); + if (!keepAlive->q.empty()) abort(); keepAlive->wake.notify_one(); }); state->worker = tf([keepAlive](){ // take ownership - queue::ensure(std::make_shared<new_worker>(keepAlive)); + queue_type::ensure(std::make_shared<new_worker>(keepAlive)); // release ownership RXCPP_UNWIND_AUTO([]{ - queue::destroy(); + queue_type::destroy(); }); for(;;) { std::unique_lock<std::mutex> guard(keepAlive->lock); - if (keepAlive->queue.empty()) { + if (keepAlive->q.empty()) { keepAlive->wake.wait(guard, [keepAlive](){ - return !keepAlive->lifetime.is_subscribed() || !keepAlive->queue.empty(); + return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty(); }); } if (!keepAlive->lifetime.is_subscribed()) { break; } - auto& peek = keepAlive->queue.top(); + auto& peek = keepAlive->q.top(); if (!peek.what.is_subscribed()) { - keepAlive->queue.pop(); + keepAlive->q.pop(); continue; } if (clock_type::now() < peek.when) { @@ -112,8 +115,8 @@ private: continue; } auto what = peek.what; - keepAlive->queue.pop(); - keepAlive->r.reset(keepAlive->queue.empty()); + keepAlive->q.pop(); + keepAlive->r.reset(keepAlive->q.empty()); guard.unlock(); what(keepAlive->r.get_recurse()); } @@ -131,7 +134,7 @@ private: virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (scbl.is_subscribed()) { std::unique_lock<std::mutex> guard(state->lock); - state->queue.push(new_worker_state::item_type(when, scbl)); + state->q.push(new_worker_state::item_type(when, scbl)); state->r.reset(false); } state->wake.notify_one(); |