summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp12
1 files changed, 11 insertions, 1 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
index 2429ee3..7e522f6 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
@@ -35,6 +35,7 @@ struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
mutable std::mutex lock;
mutable queue_item_time q;
recursion r;
+ std::function<void(clock_type::time_point)> notify_earlier_wakeup;
};
}
@@ -77,8 +78,12 @@ private:
if (scbl.is_subscribed()) {
auto st = state.lock();
std::unique_lock<std::mutex> guard(st->lock);
+ const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
+ (st->q.empty() || when < st->q.top().when);
st->q.push(detail::run_loop_state::item_type(when, scbl));
st->r.reset(false);
+ if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
+ guard.unlock(); // So we can't get attempt to recursively lock the state
}
}
};
@@ -119,7 +124,6 @@ private:
run_loop(const this_type&);
run_loop(this_type&&);
- typedef scheduler::clock_type clock_type;
typedef detail::action_queue queue_type;
typedef detail::run_loop_state::item_type item_type;
@@ -129,6 +133,7 @@ private:
std::shared_ptr<run_loop_scheduler> sc;
public:
+ typedef scheduler::clock_type clock_type;
run_loop()
: state(std::make_shared<detail::run_loop_state>())
, sc(std::make_shared<run_loop_scheduler>(state))
@@ -189,6 +194,11 @@ public:
scheduler get_scheduler() const {
return make_scheduler(sc);
}
+
+ void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) {
+ std::unique_lock<std::mutex> guard(state->lock);
+ state->notify_earlier_wakeup = f;
+ }
};
inline scheduler make_run_loop(const run_loop& r) {