// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP) #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace schedulers { namespace detail { struct action_queue { typedef action_queue this_type; typedef scheduler_base::clock_type clock; typedef time_schedulable item_type; private: typedef schedulable_queue queue_item_time; public: struct current_thread_queue_type { std::shared_ptr w; recursion r; queue_item_time q; }; private: #if defined(RXCPP_THREAD_LOCAL) static current_thread_queue_type*& current_thread_queue() { static RXCPP_THREAD_LOCAL current_thread_queue_type* q; return q; } #else static rxu::thread_local_storage& current_thread_queue() { static rxu::thread_local_storage q; return q; } #endif public: static bool owned() { return !!current_thread_queue(); } static const std::shared_ptr& get_worker_interface() { return current_thread_queue()->w; } static recursion& get_recursion() { return current_thread_queue()->r; } static bool empty() { if (!current_thread_queue()) { std::terminate(); } return current_thread_queue()->q.empty(); } static queue_item_time::const_reference top() { if (!current_thread_queue()) { std::terminate(); } return current_thread_queue()->q.top(); } static void pop() { auto& state = current_thread_queue(); if (!state) { std::terminate(); } state->q.pop(); if (state->q.empty()) { // allow recursion state->r.reset(true); } } static void push(item_type item) { auto& state = current_thread_queue(); if (!state) { std::terminate(); } if (!item.what.is_subscribed()) { return; } state->q.push(std::move(item)); // disallow recursion state->r.reset(false); } static std::shared_ptr ensure(std::shared_ptr w) { if (!!current_thread_queue()) { std::terminate(); } // create and publish new queue current_thread_queue() = new current_thread_queue_type(); current_thread_queue()->w = w; return w; } static std::unique_ptr create(std::shared_ptr w) { std::unique_ptr result(new current_thread_queue_type()); result->w = std::move(w); return result; } static void set(current_thread_queue_type* q) { if (!!current_thread_queue()) { std::terminate(); } // publish new queue current_thread_queue() = q; } static void destroy(current_thread_queue_type* q) { delete q; } static void destroy() { if (!current_thread_queue()) { std::terminate(); } #if defined(RXCPP_THREAD_LOCAL) destroy(current_thread_queue()); #else destroy(current_thread_queue().get()); #endif current_thread_queue() = nullptr; } }; } struct current_thread : public scheduler_interface { private: typedef current_thread this_type; current_thread(const this_type&); typedef detail::action_queue queue_type; struct derecurser : public worker_interface { private: typedef current_thread this_type; derecurser(const this_type&); public: derecurser() { } virtual ~derecurser() { } virtual clock_type::time_point now() const { return clock_type::now(); } virtual void schedule(const schedulable& scbl) const { queue_type::push(queue_type::item_type(now(), scbl)); } virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { queue_type::push(queue_type::item_type(when, scbl)); } }; struct current_worker : public worker_interface { private: typedef current_thread this_type; current_worker(const this_type&); public: current_worker() { } virtual ~current_worker() { } virtual clock_type::time_point now() const { return clock_type::now(); } virtual void schedule(const schedulable& scbl) const { schedule(now(), scbl); } virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (!scbl.is_subscribed()) { return; } { // check ownership if (queue_type::owned()) { // already has an owner - delegate queue_type::get_worker_interface()->schedule(when, scbl); return; } // take ownership queue_type::ensure(std::make_shared()); } // release ownership RXCPP_UNWIND_AUTO([]{ queue_type::destroy(); }); const auto& recursor = queue_type::get_recursion().get_recurse(); std::this_thread::sleep_until(when); if (scbl.is_subscribed()) { scbl(recursor); } if (queue_type::empty()) { return; } // loop until queue is empty for ( auto next = queue_type::top().when; (std::this_thread::sleep_until(next), true); next = queue_type::top().when ) { auto what = queue_type::top().what; queue_type::pop(); if (what.is_subscribed()) { what(recursor); } if (queue_type::empty()) { break; } } } }; std::shared_ptr wi; public: current_thread() : wi(std::make_shared()) { } virtual ~current_thread() { } static bool is_schedule_required() { return !queue_type::owned(); } inline bool is_tail_recursion_allowed() const { return queue_type::empty(); } virtual clock_type::time_point now() const { return clock_type::now(); } virtual worker create_worker(composite_subscription cs) const { return worker(std::move(cs), wi); } }; inline const scheduler& make_current_thread() { static scheduler instance = make_scheduler(); return instance; } } } #endif