diff options
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp index 2429ee3..7e522f6 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp @@ -35,6 +35,7 @@ struct run_loop_state : public std::enable_shared_from_this<run_loop_state> mutable std::mutex lock; mutable queue_item_time q; recursion r; + std::function<void(clock_type::time_point)> notify_earlier_wakeup; }; } @@ -77,8 +78,12 @@ private: if (scbl.is_subscribed()) { auto st = state.lock(); std::unique_lock<std::mutex> guard(st->lock); + const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup && + (st->q.empty() || when < st->q.top().when); st->q.push(detail::run_loop_state::item_type(when, scbl)); st->r.reset(false); + if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when); + guard.unlock(); // So we can't get attempt to recursively lock the state } } }; @@ -119,7 +124,6 @@ private: run_loop(const this_type&); run_loop(this_type&&); - typedef scheduler::clock_type clock_type; typedef detail::action_queue queue_type; typedef detail::run_loop_state::item_type item_type; @@ -129,6 +133,7 @@ private: std::shared_ptr<run_loop_scheduler> sc; public: + typedef scheduler::clock_type clock_type; run_loop() : state(std::make_shared<detail::run_loop_state>()) , sc(std::make_shared<run_loop_scheduler>(state)) @@ -189,6 +194,11 @@ public: scheduler get_scheduler() const { return make_scheduler(sc); } + + void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) { + std::unique_lock<std::mutex> guard(state->lock); + state->notify_earlier_wakeup = f; + } }; inline scheduler make_run_loop(const run_loop& r) { |