// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_COORDINATION_HPP) #define RXCPP_RX_COORDINATION_HPP #include "rx-includes.hpp" namespace rxcpp { struct tag_coordinator {}; struct coordinator_base {typedef tag_coordinator coordinator_tag;}; template struct is_coordinator : public std::false_type {}; template struct is_coordinator::type> : public std::is_convertible {}; struct tag_coordination {}; struct coordination_base {typedef tag_coordination coordination_tag;}; namespace detail { template struct is_coordination : public std::false_type {}; template struct is_coordination::type> : public std::is_convertible {}; } template> struct is_coordination : detail::is_coordination { }; template> using coordination_tag_t = typename DecayedCoordination::coordination_tag; template class coordinator : public coordinator_base { public: typedef Input input_type; private: struct not_supported {typedef not_supported type;}; template struct get_observable { typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type; }; template struct get_subscriber { typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type; }; template struct get_action_function { typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type; }; public: input_type input; template struct get { typedef typename std::conditional< rxsc::detail::is_action_function::value, get_action_function, typename std::conditional< is_observable::value, get_observable, typename std::conditional< is_subscriber::value, get_subscriber, not_supported>::type>::type>::type::type type; }; coordinator(Input i) : input(i) {} rxsc::worker get_worker() const { return input.get_worker(); } rxsc::scheduler get_scheduler() const { return input.get_scheduler(); } template auto in(Observable o) const -> typename get_observable::type { return input.in(std::move(o)); static_assert(is_observable::value, "can only synchronize observables"); } template auto out(Subscriber s) const -> typename get_subscriber::type { return input.out(std::move(s)); static_assert(is_subscriber::value, "can only synchronize subscribers"); } template auto act(F f) const -> typename get_action_function::type { return input.act(std::move(f)); static_assert(rxsc::detail::is_action_function::value, "can only synchronize action functions"); } }; class identity_one_worker : public coordination_base { rxsc::scheduler factory; class input_type { rxsc::worker controller; rxsc::scheduler factory; public: explicit input_type(rxsc::worker w) : controller(w) , factory(rxsc::make_same_worker(w)) { } inline rxsc::worker get_worker() const { return controller; } inline rxsc::scheduler get_scheduler() const { return factory; } inline rxsc::scheduler::clock_type::time_point now() const { return factory.now(); } template auto in(Observable o) const -> Observable { return o; } template auto out(Subscriber s) const -> Subscriber { return s; } template auto act(F f) const -> F { return f; } }; public: explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {} typedef coordinator coordinator_type; inline rxsc::scheduler::clock_type::time_point now() const { return factory.now(); } inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const { auto w = factory.create_worker(std::move(cs)); return coordinator_type(input_type(std::move(w))); } }; inline identity_one_worker identity_immediate() { static identity_one_worker r(rxsc::make_immediate()); return r; } inline identity_one_worker identity_current_thread() { static identity_one_worker r(rxsc::make_current_thread()); return r; } inline identity_one_worker identity_same_worker(rxsc::worker w) { return identity_one_worker(rxsc::make_same_worker(w)); } class serialize_one_worker : public coordination_base { rxsc::scheduler factory; template struct serialize_action { F dest; std::shared_ptr lock; serialize_action(F d, std::shared_ptr m) : dest(std::move(d)) , lock(std::move(m)) { if (!lock) { std::terminate(); } } auto operator()(const rxsc::schedulable& scbl) const -> decltype(dest(scbl)) { std::unique_lock guard(*lock); return dest(scbl); } }; template struct serialize_observer { typedef serialize_observer this_type; typedef rxu::decay_t dest_type; typedef typename dest_type::value_type value_type; typedef observer observer_type; dest_type dest; std::shared_ptr lock; serialize_observer(dest_type d, std::shared_ptr m) : dest(std::move(d)) , lock(std::move(m)) { if (!lock) { std::terminate(); } } void on_next(value_type v) const { std::unique_lock guard(*lock); dest.on_next(v); } void on_error(rxu::error_ptr e) const { std::unique_lock guard(*lock); dest.on_error(e); } void on_completed() const { std::unique_lock guard(*lock); dest.on_completed(); } template static subscriber make(const Subscriber& s, std::shared_ptr m) { return make_subscriber(s, observer_type(this_type(s.get_observer(), std::move(m)))); } }; class input_type { rxsc::worker controller; rxsc::scheduler factory; std::shared_ptr lock; public: explicit input_type(rxsc::worker w, std::shared_ptr m) : controller(w) , factory(rxsc::make_same_worker(w)) , lock(std::move(m)) { } inline rxsc::worker get_worker() const { return controller; } inline rxsc::scheduler get_scheduler() const { return factory; } inline rxsc::scheduler::clock_type::time_point now() const { return factory.now(); } template auto in(Observable o) const -> Observable { return o; } template auto out(const Subscriber& s) const -> decltype(serialize_observer::make(s, lock)) { return serialize_observer::make(s, lock); } template auto act(F f) const -> serialize_action { return serialize_action(std::move(f), lock); } }; public: explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {} typedef coordinator coordinator_type; inline rxsc::scheduler::clock_type::time_point now() const { return factory.now(); } inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const { auto w = factory.create_worker(std::move(cs)); std::shared_ptr lock = std::make_shared(); return coordinator_type(input_type(std::move(w), std::move(lock))); } }; inline serialize_one_worker serialize_event_loop() { static serialize_one_worker r(rxsc::make_event_loop()); return r; } inline serialize_one_worker serialize_new_thread() { static serialize_one_worker r(rxsc::make_new_thread()); return r; } inline serialize_one_worker serialize_same_worker(rxsc::worker w) { return serialize_one_worker(rxsc::make_same_worker(w)); } } #endif