// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP) #define RXCPP_RX_SCHEDULER_TEST_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace schedulers { namespace detail { class test_type : public scheduler_interface { public: typedef scheduler_interface::clock_type clock_type; struct test_type_state : public virtual_time { typedef virtual_time base; using base::schedule_absolute; using base::schedule_relative; clock_type::time_point now() const { return to_time_point(clock_now); } virtual void schedule_absolute(long when, const schedulable& a) const { if (when <= base::clock_now) when = base::clock_now + 1; return base::schedule_absolute(when, a); } virtual long add(long absolute, long relative) const { return absolute + relative; } virtual clock_type::time_point to_time_point(long absolute) const { return clock_type::time_point(std::chrono::milliseconds(absolute)); } virtual long to_relative(clock_type::duration d) const { return static_cast(std::chrono::duration_cast(d).count()); } }; private: mutable std::shared_ptr state; public: struct test_type_worker : public worker_interface { mutable std::shared_ptr state; typedef test_type_state::absolute absolute; typedef test_type_state::relative relative; test_type_worker(std::shared_ptr st) : state(std::move(st)) { } virtual clock_type::time_point now() const { return state->now(); } virtual void schedule(const schedulable& scbl) const { state->schedule_absolute(state->clock(), scbl); } virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { state->schedule_relative(state->to_relative(when - now()), scbl); } void schedule_absolute(absolute when, const schedulable& scbl) const { state->schedule_absolute(when, scbl); } void schedule_relative(relative when, const schedulable& scbl) const { state->schedule_relative(when, scbl); } bool is_enabled() const {return state->is_enabled();} absolute clock() const {return state->clock();} void start() const { state->start(); } void stop() const { state->stop(); } void advance_to(absolute time) const { state->advance_to(time); } void advance_by(relative time) const { state->advance_by(time); } void sleep(relative time) const { state->sleep(time); } template subscriber> make_subscriber() const; }; public: test_type() : state(std::make_shared()) { } virtual clock_type::time_point now() const { return state->now(); } virtual worker create_worker(composite_subscription cs) const { return worker(cs, std::make_shared(state)); } bool is_enabled() const {return state->is_enabled();} long clock() { return state->clock(); } clock_type::time_point to_time_point(long absolute) const { return state->to_time_point(absolute); } std::shared_ptr create_test_type_worker_interface() const { return std::make_shared(state); } template rxt::testable_observable make_hot_observable(std::vector>>> messages) const; template rxt::testable_observable make_cold_observable(std::vector>>> messages) const; }; template class mock_observer : public rxt::detail::test_subject_base { typedef typename rxn::notification notification_type; typedef rxn::recorded recorded_type; public: explicit mock_observer(std::shared_ptr sc) : sc(sc) { } std::shared_ptr sc; std::vector m; virtual void on_subscribe(subscriber) const { std::terminate(); } virtual std::vector subscriptions() const { std::terminate(); } virtual std::vector messages() const { return m; } }; template subscriber> test_type::test_type_worker::make_subscriber() const { typedef typename rxn::notification notification_type; typedef rxn::recorded recorded_type; auto ts = std::make_shared>(state); return rxcpp::make_subscriber(rxt::testable_observer(ts, make_observer_dynamic( // on_next [ts](T value) { ts->m.push_back( recorded_type(ts->sc->clock(), notification_type::on_next(value))); }, // on_error [ts](rxu::error_ptr e) { ts->m.push_back( recorded_type(ts->sc->clock(), notification_type::on_error(e))); }, // on_completed [ts]() { ts->m.push_back( recorded_type(ts->sc->clock(), notification_type::on_completed())); }))); } template class cold_observable : public rxt::detail::test_subject_base { typedef cold_observable this_type; std::shared_ptr sc; typedef rxn::recorded::type> recorded_type; mutable std::vector mv; mutable std::vector sv; mutable worker controller; public: cold_observable(std::shared_ptr sc, worker w, std::vector mv) : sc(sc) , mv(std::move(mv)) , controller(w) { } template cold_observable(std::shared_ptr sc, worker w, Iterator begin, Iterator end) : sc(sc) , mv(begin, end) , controller(w) { } virtual void on_subscribe(subscriber o) const { sv.push_back(rxn::subscription(sc->clock())); auto index = sv.size() - 1; for (auto& message : mv) { auto n = message.value(); sc->schedule_relative(message.time(), make_schedulable( controller, [n, o](const schedulable&) { if (o.is_subscribed()) { n->accept(o); } })); } auto sharedThis = std::static_pointer_cast(this->shared_from_this()); o.add([sharedThis, index]() { sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); }); } virtual std::vector subscriptions() const { return sv; } virtual std::vector messages() const { return mv; } }; template rxt::testable_observable test_type::make_cold_observable(std::vector>>> messages) const { auto co = std::make_shared>(state, create_worker(composite_subscription()), std::move(messages)); return rxt::testable_observable(co); } template class hot_observable : public rxt::detail::test_subject_base { typedef hot_observable this_type; std::shared_ptr sc; typedef rxn::recorded::type> recorded_type; typedef subscriber observer_type; mutable std::vector mv; mutable std::vector sv; mutable std::list observers; mutable worker controller; public: hot_observable(std::shared_ptr sc, worker w, std::vector mv) : sc(sc) , mv(mv) , controller(w) { for (auto& message : mv) { auto n = message.value(); sc->schedule_absolute(message.time(), make_schedulable( controller, [this, n](const schedulable&) { auto local = this->observers; for (auto& o : local) { if (o.is_subscribed()) { n->accept(o); } } })); } } virtual ~hot_observable() {} virtual void on_subscribe(observer_type o) const { auto olocation = observers.insert(observers.end(), o); sv.push_back(rxn::subscription(sc->clock())); auto index = sv.size() - 1; auto sharedThis = std::static_pointer_cast(this->shared_from_this()); o.add([sharedThis, index, olocation]() { sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); sharedThis->observers.erase(olocation); }); } virtual std::vector subscriptions() const { return sv; } virtual std::vector messages() const { return mv; } }; template rxt::testable_observable test_type::make_hot_observable(std::vector>>> messages) const { auto worker = create_worker(composite_subscription()); auto shared = std::make_shared>(state, worker, std::move(messages)); return rxt::testable_observable(shared); } template struct is_create_source_function { struct not_void {}; template static auto check(int) -> decltype((*(CF*)nullptr)()); template static not_void check(...); static const bool value = is_observable>(0))>::value; }; } class test : public scheduler { std::shared_ptr tester; public: explicit test(std::shared_ptr t) : scheduler(std::static_pointer_cast(t)) , tester(t) { } typedef detail::test_type::clock_type clock_type; static const long created_time = 100; static const long subscribed_time = 200; static const long unsubscribed_time = 1000; template struct messages { typedef typename rxn::notification notification_type; typedef rxn::recorded recorded_type; typedef rxn::subscription subscription_type; messages() {} template static recorded_type next(long ticks, U value) { return recorded_type(ticks, notification_type::on_next(std::move(value))); } static recorded_type completed(long ticks) { return recorded_type(ticks, notification_type::on_completed()); } template static recorded_type error(long ticks, Exception&& e) { return recorded_type(ticks, notification_type::on_error(std::forward(e))); } static rxn::subscription subscribe(long subscribe, long unsubscribe) { return rxn::subscription(subscribe, unsubscribe); } }; class test_worker : public worker { std::shared_ptr tester; public: ~test_worker() { } explicit test_worker(composite_subscription cs, std::shared_ptr t) : worker(cs, std::static_pointer_cast(t)) , tester(t) { } bool is_enabled() const {return tester->is_enabled();} long clock() const {return tester->clock();} void schedule_absolute(long when, const schedulable& a) const { tester->schedule_absolute(when, a); } void schedule_relative(long when, const schedulable& a) const { tester->schedule_relative(when, a); } template auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const -> typename std::enable_if< (detail::is_action_function::value || is_subscription::value) && !is_schedulable::value>::type { tester->schedule_absolute(when, make_schedulable(*this, std::forward(a0), std::forward(an)...)); } template auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const -> typename std::enable_if< (detail::is_action_function::value || is_subscription::value) && !is_schedulable::value>::type { tester->schedule_relative(when, make_schedulable(*this, std::forward(a0), std::forward(an)...)); } void advance_to(long time) const { tester->advance_to(time); } void advance_by(long time) const { tester->advance_by(time); } void sleep(long time) const { tester->sleep(time); } template auto start(F createSource, long created, long subscribed, long unsubscribed) const -> subscriber> { struct state_type : public std::enable_shared_from_this { typedef decltype(createSource()) source_type; std::unique_ptr source; subscriber> o; explicit state_type(subscriber> o) : source() , o(o) { } }; auto state = std::make_shared(this->make_subscriber()); schedule_absolute(created, [createSource, state](const schedulable&) { state->source.reset(new typename state_type::source_type(createSource())); }); schedule_absolute(subscribed, [state](const schedulable&) { state->source->subscribe(state->o); }); schedule_absolute(unsubscribed, [state](const schedulable&) { state->o.unsubscribe(); }); tester->start(); return state->o; } template auto start(F&& createSource, long unsubscribed) const -> subscriber> { return start(std::forward(createSource), created_time, subscribed_time, unsubscribed); } template auto start(F&& createSource) const -> subscriber> { return start(std::forward(createSource), created_time, subscribed_time, unsubscribed_time); } template struct start_traits { typedef decltype((*(F*)nullptr)()) source_type; typedef typename source_type::value_type value_type; typedef subscriber> subscriber_type; }; template auto start(F createSource, long created, long subscribed, long unsubscribed) const -> typename std::enable_if::value, start_traits>::type::subscriber_type { return start>>(std::move(createSource), created, subscribed, unsubscribed); } template auto start(F createSource, long unsubscribed) const -> typename std::enable_if::value, start_traits>::type::subscriber_type { return start>>(std::move(createSource), created_time, subscribed_time, unsubscribed); } template auto start(F createSource) const -> typename std::enable_if::value, start_traits>::type::subscriber_type { return start>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time); } void start() const { tester->start(); } template subscriber> make_subscriber() const { return tester->make_subscriber(); } }; clock_type::time_point now() const { return tester->now(); } test_worker create_worker(composite_subscription cs = composite_subscription()) const { return test_worker(cs, tester->create_test_type_worker_interface()); } bool is_enabled() const {return tester->is_enabled();} long clock() const {return tester->clock();} clock_type::time_point to_time_point(long absolute) const { return tester->to_time_point(absolute); } template rxt::testable_observable make_hot_observable(std::vector>>> messages) const{ return tester->make_hot_observable(std::move(messages)); } template auto make_hot_observable(const T (&arr) [size]) const -> decltype(tester->make_hot_observable(std::vector())) { return tester->make_hot_observable(rxu::to_vector(arr)); } template auto make_hot_observable(std::initializer_list il) const -> decltype(tester->make_hot_observable(std::vector())) { return tester->make_hot_observable(std::vector(il)); } template rxt::testable_observable make_cold_observable(std::vector>>> messages) const { return tester->make_cold_observable(std::move(messages)); } template auto make_cold_observable(const T (&arr) [size]) const -> decltype(tester->make_cold_observable(std::vector())) { return tester->make_cold_observable(rxu::to_vector(arr)); } template auto make_cold_observable(std::initializer_list il) const -> decltype(tester->make_cold_observable(std::vector())) { return tester->make_cold_observable(std::vector(il)); } }; inline test make_test() { return test(std::make_shared()); } } inline identity_one_worker identity_test() { static identity_one_worker r(rxsc::make_test()); return r; } } #endif