summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2015-06-12 07:35:12 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2015-06-18 19:55:10 -0700
commit83351c7ba1379bdf1978e3aecde5c604bfec0ffe (patch)
tree6a0d2a0fd22dd80f53492dbb6804438a079c8b72 /Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
parent6a72451c421947eedbe7feaf1f523af193320a43 (diff)
downloadRxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.gz
fixes worker leaks (#101 & #132)
uses weak pointer to hold the worker in schedulable, empties queue when new_thread worker lifetime ends and remove observer when hot_observable subscription ends.
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp')
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp25
1 files changed, 14 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index 2e24c0f..bed6ef9 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -24,7 +24,7 @@ private:
private:
typedef new_worker this_type;
- typedef detail::action_queue queue;
+ typedef detail::action_queue queue_type;
new_worker(const this_type&);
@@ -57,7 +57,7 @@ private:
composite_subscription lifetime;
mutable std::mutex lock;
mutable std::condition_variable wake;
- mutable queue_item_time queue;
+ mutable queue_item_time q;
std::thread worker;
recursion r;
};
@@ -80,31 +80,34 @@ private:
auto keepAlive = state;
state->lifetime.add([keepAlive](){
+ std::unique_lock<std::mutex> guard(keepAlive->lock);
+ auto expired = std::move(keepAlive->q);
+ if (!keepAlive->q.empty()) abort();
keepAlive->wake.notify_one();
});
state->worker = tf([keepAlive](){
// take ownership
- queue::ensure(std::make_shared<new_worker>(keepAlive));
+ queue_type::ensure(std::make_shared<new_worker>(keepAlive));
// release ownership
RXCPP_UNWIND_AUTO([]{
- queue::destroy();
+ queue_type::destroy();
});
for(;;) {
std::unique_lock<std::mutex> guard(keepAlive->lock);
- if (keepAlive->queue.empty()) {
+ if (keepAlive->q.empty()) {
keepAlive->wake.wait(guard, [keepAlive](){
- return !keepAlive->lifetime.is_subscribed() || !keepAlive->queue.empty();
+ return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
});
}
if (!keepAlive->lifetime.is_subscribed()) {
break;
}
- auto& peek = keepAlive->queue.top();
+ auto& peek = keepAlive->q.top();
if (!peek.what.is_subscribed()) {
- keepAlive->queue.pop();
+ keepAlive->q.pop();
continue;
}
if (clock_type::now() < peek.when) {
@@ -112,8 +115,8 @@ private:
continue;
}
auto what = peek.what;
- keepAlive->queue.pop();
- keepAlive->r.reset(keepAlive->queue.empty());
+ keepAlive->q.pop();
+ keepAlive->r.reset(keepAlive->q.empty());
guard.unlock();
what(keepAlive->r.get_recurse());
}
@@ -131,7 +134,7 @@ private:
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
if (scbl.is_subscribed()) {
std::unique_lock<std::mutex> guard(state->lock);
- state->queue.push(new_worker_state::item_type(when, scbl));
+ state->q.push(new_worker_state::item_type(when, scbl));
state->r.reset(false);
}
state->wake.notify_one();