summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorStuart Dootson <stuart.dootson@gmail.com>2017-01-26 15:07:56 +0000
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-26 07:07:56 -0800
commitad101cf093028f874a5d882967bdb4ccb8032eb8 (patch)
tree66dae720dbb02124a4830e3aa079e16aff3308a0 /Rx
parent40432165a831a8e7f23197a5559916676747610d (diff)
downloadRxCpp-ad101cf093028f874a5d882967bdb4ccb8032eb8.tar.gz
Add callback for when a task is added to a run-loop schedule (#337)
* Add callback for when a task is added to a run-loop schedule * Provide wakeup time in 'new task' callback Modified callback to provide new wakeup time as a callback parameter, so that it can schedule an event to wake the thread owning the run loop, rather than (as was the case) having to iterate through the dispatch loop.
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp12
1 files changed, 11 insertions, 1 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
index 2429ee3..7e522f6 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
@@ -35,6 +35,7 @@ struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
mutable std::mutex lock;
mutable queue_item_time q;
recursion r;
+ std::function<void(clock_type::time_point)> notify_earlier_wakeup;
};
}
@@ -77,8 +78,12 @@ private:
if (scbl.is_subscribed()) {
auto st = state.lock();
std::unique_lock<std::mutex> guard(st->lock);
+ const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
+ (st->q.empty() || when < st->q.top().when);
st->q.push(detail::run_loop_state::item_type(when, scbl));
st->r.reset(false);
+ if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
+ guard.unlock(); // So we can't get attempt to recursively lock the state
}
}
};
@@ -119,7 +124,6 @@ private:
run_loop(const this_type&);
run_loop(this_type&&);
- typedef scheduler::clock_type clock_type;
typedef detail::action_queue queue_type;
typedef detail::run_loop_state::item_type item_type;
@@ -129,6 +133,7 @@ private:
std::shared_ptr<run_loop_scheduler> sc;
public:
+ typedef scheduler::clock_type clock_type;
run_loop()
: state(std::make_shared<detail::run_loop_state>())
, sc(std::make_shared<run_loop_scheduler>(state))
@@ -189,6 +194,11 @@ public:
scheduler get_scheduler() const {
return make_scheduler(sc);
}
+
+ void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) {
+ std::unique_lock<std::mutex> guard(state->lock);
+ state->notify_earlier_wakeup = f;
+ }
};
inline scheduler make_run_loop(const run_loop& r) {