diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-29 23:46:04 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-01 14:03:55 -0700 |
commit | 09b16daca270ed0ade1f41f76c0910d6442e95de (patch) | |
tree | fba873aeed4c42c57dde9c5cccc7b1203ae8443f /Rx/v2 | |
parent | 15807b31b06a4558b55356a3d69c0287f6177f7d (diff) | |
download | RxCpp-09b16daca270ed0ade1f41f76c0910d6442e95de.tar.gz |
fix virtual_time and test scheduler
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-scheduler.hpp | 26 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-test.hpp | 332 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp | 57 | ||||
-rw-r--r-- | Rx/v2/test/operators/filter.cpp | 23 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 25 | ||||
-rw-r--r-- | Rx/v2/test/operators/map.cpp | 3 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 23 | ||||
-rw-r--r-- | Rx/v2/test/operators/take.cpp | 66 | ||||
-rw-r--r-- | Rx/v2/test/subjects/subject.cpp | 87 |
9 files changed, 363 insertions, 279 deletions
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index a57851c..4b01c36 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -462,32 +462,6 @@ public: : scoped(false) { } - schedulable(const schedulable& o) - : lifetime(o.lifetime) - , controller(o.controller) - , activity(o.activity) - , scoped(o.scoped) - , action_scope(o.scoped ? controller.add(lifetime) : weak_subscription()) - { - } - schedulable(schedulable&& o) - : lifetime(std::move(o.lifetime)) - , controller(std::move(o.controller)) - , activity(std::move(o.activity)) - , scoped(o.scoped) - , action_scope(std::move(o.action_scope)) - { - o.scoped = false; - } - schedulable& operator =(schedulable o) { - using std::swap; - swap(lifetime, o.lifetime); - swap(controller, o.controller); - swap(activity, o.activity); - swap(scoped, o.scoped); - swap(action_scope, o.action_scope); - return *this; - } /// action and worker share lifetime schedulable(worker q, action a) diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp index d3308d5..411bd43 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp @@ -13,54 +13,141 @@ namespace schedulers { namespace detail { -class test_type : public virtual_time<long, long> +class test_type : public scheduler_interface { public: - typedef virtual_time<long, long> base; - typedef base::clock_type clock_type; - typedef std::shared_ptr<test_type> shared; - using base::schedule_absolute; - using base::schedule_relative; + typedef scheduler_interface::clock_type clock_type; - virtual worker get_worker() const + struct test_type_state : public virtual_time<long, long> { - return base::get_worker(); - } + typedef virtual_time<long, long> base; + + using base::schedule_absolute; + using base::schedule_relative; + + clock_type::time_point now() const { + return to_time_point(clock_now); + } + + virtual void schedule_absolute(long when, const schedulable& a) const + { + if (when <= base::clock_now) + when = base::clock_now + 1; - virtual void schedule_absolute(long when, const schedulable& a) const + return base::schedule_absolute(when, a); + } + + virtual long add(long absolute, long relative) const + { + return absolute + relative; + } + + virtual clock_type::time_point to_time_point(long absolute) const + { + return clock_type::time_point(clock_type::duration(absolute)); + } + + virtual long to_relative(clock_type::duration d) const + { + return static_cast<long>(d.count()); + } + }; + +private: + mutable std::shared_ptr<test_type_state> state; + +public: + struct test_type_worker : public worker_interface { - if (when <= base::clock_now) - when = base::clock_now + 1; + mutable std::shared_ptr<test_type_state> state; - return base::schedule_absolute(when, a); - } + typedef test_type_state::absolute absolute; + typedef test_type_state::relative relative; + + test_type_worker(std::shared_ptr<test_type_state> st) + : state(std::move(st)) + { + } + + virtual clock_type::time_point now() const { + return state->now(); + } + + virtual void schedule(const schedulable& scbl) const { + state->schedule_absolute(state->clock(), scbl); + } + + virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { + state->schedule_absolute(state->to_relative(when - now()), scbl); + } + + + void schedule_absolute(absolute when, const schedulable& scbl) const { + state->schedule_absolute(when, scbl); + } + + void schedule_relative(relative when, const schedulable& scbl) const { + state->schedule_relative(when, scbl); + } + + bool is_enabled() const {return state->is_enabled();} + absolute clock() const {return state->clock();} + + void start() const + { + state->start(); + } + + void stop() const + { + state->stop(); + } + + void advance_to(absolute time) const + { + state->advance_to(time); + } - virtual long add(long absolute, long relative) const + void advance_by(relative time) const + { + state->advance_by(time); + } + + void sleep(relative time) const + { + state->sleep(time); + } + + template<class T> + subscriber<T, rxt::testable_observer<T>> make_subscriber() const; + }; + +public: + test_type() + : state(new test_type_state()) { - return absolute + relative; } - virtual clock_type::time_point to_time_point(long absolute) const - { - return clock_type::time_point(clock_type::duration(absolute)); + virtual clock_type::time_point now() const { + return state->now(); } - virtual long to_relative(clock_type::duration d) const - { - return static_cast<long>(d.count()); + virtual worker create_worker(composite_subscription cs) const { + std::shared_ptr<test_type_worker> wi(new test_type_worker(state)); + return worker(cs, wi); } - using base::start; + std::shared_ptr<test_type_worker> create_test_type_worker_interface() const { + std::shared_ptr<test_type_worker> wi(new test_type_worker(state)); + return wi; + } template<class T> rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const; template<class T> rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const; - - template<class T> - subscriber<T, rxt::testable_observer<T>> make_subscriber() const; }; template<class T> @@ -71,12 +158,12 @@ class mock_observer typedef rxn::recorded<typename notification_type::type> recorded_type; public: - mock_observer(typename test_type::shared sc) + explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc) : sc(sc) { } - typename test_type::shared sc; + std::shared_ptr<test_type::test_type_state> sc; std::vector<recorded_type> m; virtual void on_subscribe(subscriber<T>) const { @@ -92,12 +179,12 @@ public: }; template<class T> -subscriber<T, rxt::testable_observer<T>> test_type::make_subscriber() const +subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const { typedef typename rxn::notification<T> notification_type; typedef rxn::recorded<typename notification_type::type> recorded_type; - std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this())))); + std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(state)); return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>( // on_next @@ -125,30 +212,32 @@ class cold_observable : public rxt::detail::test_subject_base<T> { typedef cold_observable<T> this_type; - typename test_type::shared sc; + std::shared_ptr<test_type::test_type_state> sc; typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; mutable std::vector<recorded_type> mv; mutable std::vector<rxn::subscription> sv; + mutable worker controller; public: - cold_observable(typename test_type::shared sc, std::vector<recorded_type> mv) + cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv) : sc(sc) , mv(std::move(mv)) + , controller(w) { } template<class Iterator> - cold_observable(typename test_type::shared sc, Iterator begin, Iterator end) + cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end) : sc(sc) , mv(begin, end) + , controller(w) { } virtual void on_subscribe(subscriber<T> o) const { sv.push_back(rxn::subscription(sc->clock())); auto index = sv.size() - 1; - auto controller = sc->create_worker(composite_subscription()); for (auto& message : mv) { auto n = message.value(); @@ -179,7 +268,7 @@ public: template<class T> rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const { - auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this())), std::move(messages))); + auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(state, create_worker(composite_subscription()), std::move(messages))); return rxt::testable_observable<T>(co); } @@ -188,20 +277,21 @@ class hot_observable : public rxt::detail::test_subject_base<T> { typedef hot_observable<T> this_type; - typename test_type::shared sc; + std::shared_ptr<test_type::test_type_state> sc; typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; typedef subscriber<T> observer_type; mutable std::vector<recorded_type> mv; mutable std::vector<rxn::subscription> sv; mutable std::vector<observer_type> observers; + mutable worker controller; public: - hot_observable(typename test_type::shared sc, std::vector<recorded_type> mv) + hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv) : sc(sc) , mv(mv) + , controller(w) { - auto controller = sc->create_worker(composite_subscription()); for (auto& message : mv) { auto n = message.value(); sc->schedule_absolute(message.time(), make_schedulable( @@ -240,18 +330,18 @@ public: template<class T> rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const { - return rxt::testable_observable<T>(std::make_shared<hot_observable<T>>( - std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this())), std::move(messages))); + return rxt::testable_observable<T>( + std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages))); } } class test : public scheduler { - detail::test_type::shared tester; + std::shared_ptr<detail::test_type> tester; public: - explicit test(detail::test_type::shared t) + explicit test(std::shared_ptr<detail::test_type> t) : scheduler(std::static_pointer_cast<scheduler_interface>(t)) , tester(t) { @@ -305,85 +395,110 @@ public: ~messages(); }; - void schedule_absolute(long when, const schedulable& a) const { - tester->schedule_absolute(when, a); - } + class test_worker : public worker + { + std::shared_ptr<detail::test_type::test_type_worker> tester; + public: - void schedule_relative(long when, const schedulable& a) const { - tester->schedule_relative(when, a); - } + explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t) + : worker(cs, std::static_pointer_cast<worker_interface>(t)) + , tester(t) + { + } - template<class Arg0, class... ArgN> - auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const - -> typename std::enable_if< - (detail::is_action_function<Arg0>::value || - is_subscription<Arg0>::value) && - !is_schedulable<Arg0>::value>::type { - tester->schedule_absolute(when, make_schedulable(tester->get_worker(), std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); - } + void schedule_absolute(long when, const schedulable& a) const { + tester->schedule_absolute(when, a); + } - template<class Arg0, class... ArgN> - auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const - -> typename std::enable_if< - (detail::is_action_function<Arg0>::value || - is_subscription<Arg0>::value) && - !is_schedulable<Arg0>::value>::type { - tester->schedule_relative(when, make_schedulable(tester->get_worker(), std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); - } + void schedule_relative(long when, const schedulable& a) const { + tester->schedule_relative(when, a); + } - template<class T, class F> - auto start(F&& createSource, long created, long subscribed, long unsubscribed) const - -> subscriber<T, rxt::testable_observer<T>> - { - typename std::decay<F>::type createSrc = std::forward<F>(createSource); + template<class Arg0, class... ArgN> + auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const + -> typename std::enable_if< + (detail::is_action_function<Arg0>::value || + is_subscription<Arg0>::value) && + !is_schedulable<Arg0>::value>::type { + tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); + } - struct state_type - : public std::enable_shared_from_this<state_type> - { - typedef decltype(std::forward<F>(createSrc)()) source_type; + template<class Arg0, class... ArgN> + auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const + -> typename std::enable_if< + (detail::is_action_function<Arg0>::value || + is_subscription<Arg0>::value) && + !is_schedulable<Arg0>::value>::type { + tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); + } - std::unique_ptr<source_type> source; - subscriber<T, rxt::testable_observer<T>> o; + template<class T, class F> + auto start(F&& createSource, long created, long subscribed, long unsubscribed) const + -> subscriber<T, rxt::testable_observer<T>> + { + typename std::decay<F>::type createSrc = std::forward<F>(createSource); - explicit state_type(subscriber<T, rxt::testable_observer<T>> o) - : source() - , o(o) + struct state_type + : public std::enable_shared_from_this<state_type> { - } - }; - std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>())); + typedef decltype(std::forward<F>(createSrc)()) source_type; + + std::unique_ptr<source_type> source; + subscriber<T, rxt::testable_observer<T>> o; + + explicit state_type(subscriber<T, rxt::testable_observer<T>> o) + : source() + , o(o) + { + } + }; + std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>())); + + schedule_absolute(created, [createSrc, state](const schedulable& scbl) { + state->source.reset(new typename state_type::source_type(createSrc())); + }); + schedule_absolute(subscribed, [state](const schedulable& scbl) { + state->source->subscribe(state->o); + }); + schedule_absolute(unsubscribed, [state](const schedulable& scbl) { + state->o.unsubscribe(); + }); + + tester->start(); + + return state->o; + } - schedule_absolute(created, [createSrc, state](const schedulable& scbl) { - state->source.reset(new typename state_type::source_type(createSrc())); - }); - schedule_absolute(subscribed, [state](const schedulable& scbl) { - state->source->subscribe(state->o); - }); - schedule_absolute(unsubscribed, [state](const schedulable& scbl) { - state->o.unsubscribe(); - }); + template<class T, class F> + auto start(F&& createSource, long unsubscribed) const + -> subscriber<T, rxt::testable_observer<T>> + { + return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed); + } - tester->start(); + template<class T, class F> + auto start(F&& createSource) const + -> subscriber<T, rxt::testable_observer<T>> + { + return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time); + } - return state->o; - } + void start() const { + tester->start(); + } - template<class T, class F> - auto start(F&& createSource, long unsubscribed) const - -> subscriber<T, rxt::testable_observer<T>> - { - return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed); - } + template<class T> + subscriber<T, rxt::testable_observer<T>> make_subscriber() const { + return tester->make_subscriber<T>(); + } + }; - template<class T, class F> - auto start(F&& createSource) const - -> subscriber<T, rxt::testable_observer<T>> - { - return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time); + clock_type::time_point now() const { + return tester->now(); } - void start() const { - tester->start(); + test_worker create_worker(composite_subscription cs = composite_subscription()) const { + return test_worker(cs, tester->create_test_type_worker_interface()); } template<class T> @@ -407,11 +522,6 @@ public: -> decltype(tester->make_cold_observable(std::vector<T>())) { return tester->make_cold_observable(rxu::to_vector(arr)); } - - template<class T> - subscriber<T, rxt::testable_observer<T>> make_subscriber() const { - return tester->make_subscriber<T>(); - } }; template<class T> diff --git a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp index 34fc8a3..bc42e5c 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp @@ -14,7 +14,7 @@ namespace schedulers { namespace detail { template<class Absolute, class Relative> -struct virtual_time_base : public scheduler_interface +struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>> { private: typedef virtual_time_base<Absolute, Relative> this_type; @@ -22,30 +22,6 @@ private: mutable bool isenabled; - struct virtual_worker : public worker_interface - { - std::shared_ptr<virtual_time_base> controller; - - virtual_worker(std::shared_ptr<virtual_time_base> vb) - : controller(std::move(vb)) - { - } - - virtual clock_type::time_point now() const { - return controller->to_time_point(controller->clock_now); - } - - virtual void schedule(const schedulable& scbl) const { - controller->schedule_absolute(controller->clock_now, scbl); - } - - virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { - controller->schedule_absolute(controller->to_relative(when - now()), scbl); - } - }; - - std::shared_ptr<> - public: typedef Absolute absolute; typedef Relative relative; @@ -80,6 +56,7 @@ protected: virtual bool empty() const =0; public: + virtual void schedule_absolute(absolute, const schedulable&) const =0; virtual void schedule_relative(relative when, const schedulable& a) const { @@ -129,6 +106,7 @@ public: if (!isenabled) { isenabled = true; + rxsc::recursion r; while (!empty() && isenabled) { auto next = top(); pop(); @@ -136,7 +114,7 @@ public: if (next.when > clock_now) { clock_now = next.when; } - next.what.get_action()(next.what); + next.what(r.get_recurse()); } else { isenabled = false; @@ -181,28 +159,13 @@ public: clock_now = dt; } - virtual clock_type::time_point now() const { - return to_time_point(clock_now); - } - - virtual worker create_worker(composite_subscription cs) const { - std::shared_ptr<virtual_worker> wi(new virtual_worker( - std::static_pointer_cast<virtual_time_base>( - std::const_pointer_cast<scheduler_interface>(shared_from_this())))); - return worker(cs, wi); - } - }; } template<class Absolute, class Relative> -class virtual_time : public detail::virtual_time_base<Absolute, Relative> +struct virtual_time : public detail::virtual_time_base<Absolute, Relative> { -private: - typedef virtual_time this_type; - virtual_time(const this_type&); - typedef detail::virtual_time_base<Absolute, Relative> base; typedef typename base::item_type item_type; @@ -212,8 +175,6 @@ private: mutable queue_item_time queue; - worker controller; - public: virtual ~virtual_time() { @@ -222,16 +183,12 @@ public: protected: virtual_time() { - controller = base::create_worker(composite_subscription()); } explicit virtual_time(typename base::absolute initialClock) : base(initialClock) { - controller = base::create_worker(composite_subscription()); } - virtual worker get_worker() const {return controller;} - virtual item_type top() const { return queue.top(); } @@ -249,7 +206,7 @@ protected: { // use a separate subscription here so that a's subscription is not affected auto run = make_schedulable( - get_worker(), + a.get_worker(), composite_subscription(), [a](const schedulable& scbl) { rxsc::recursion r; @@ -261,10 +218,10 @@ protected: }); queue.push(item_type(when, run)); } - }; + } } diff --git a/Rx/v2/test/operators/filter.cpp b/Rx/v2/test/operators/filter.cpp index ac101ef..3e9e61d 100644 --- a/Rx/v2/test/operators/filter.cpp +++ b/Rx/v2/test/operators/filter.cpp @@ -31,6 +31,7 @@ bool IsPrime(int x) SCENARIO("filter stops on completion", "[filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -61,7 +62,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){ auto xs = sc.make_hot_observable(messages); WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [&xs, &invoked]() { #if 0 && RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -119,6 +120,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -146,7 +148,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [&xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -199,6 +201,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ SCENARIO("filter stops on error", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -232,7 +235,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -286,6 +289,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -319,7 +323,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [ex, xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -377,6 +381,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -406,13 +411,13 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]") }; auto xs = sc.make_hot_observable(rxu::to_vector(messages)); - auto res = sc.make_subscriber<int>(); + auto res = w.make_subscriber<int>(); rx::observable<int, rx::dynamic_observable<int>> ys; WHEN("filtered to ints that are primes"){ - sc.schedule_absolute(rxsc::test::created_time, + w.schedule_absolute(rxsc::test::created_time, [&invoked, &res, &ys, &xs](const rxsc::schedulable& scbl) { #if RXCPP_USE_OBSERVABLE_MEMBERS ys = xs @@ -433,15 +438,15 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]") #endif }); - sc.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) { + w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) { ys.subscribe(res); }); - sc.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) { + w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) { res.unsubscribe(); }); - sc.start(); + w.start(); THEN("the output only contains primes"){ record items[] = { diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 7e6401e..ead0b0e 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -12,7 +12,7 @@ namespace rxt=rxcpp::test; #include "catch.hpp" -static const int static_tripletCount = 500; +static const int static_tripletCount = 2; SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; @@ -32,6 +32,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){ for(int y = x; y <= z; ++y) { ++c; + std::cout << z << "," << y << "," << x << std::endl; if(x*x + y*y == z*z) { //result += (x + y + z); @@ -60,6 +61,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ using namespace std::chrono; typedef steady_clock clock; + std::vector<std::tuple<int, int, int>> tried; int c = 0; int ct = 0; int n = 1; @@ -67,10 +69,12 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ auto triples = rxs::range(1) .flat_map( - [&c](int z){ return rxs::range(1, z) + [&c, &tried](int z){ return rxs::range(1, z) .flat_map( - [&c, z](int x){ return rxs::range(x, z) - .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) + [&c, &tried, z](int x){ return rxs::range(x, z) + .filter([&c, &tried, z, x](int y){++c; + tried.push_back(std::make_tuple(z, y, x)); + return x*x + y*y == z*z;}) .map([z, x](int y){return std::make_tuple(x, y, z);});}, [](int x, std::tuple<int,int,int> triplet){return triplet;});}, [](int z, std::tuple<int,int,int> triplet){return triplet;}); @@ -81,6 +85,10 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ //int x,y,z; std::tie(x,y,z) = triplet; std::cout << x << "," << y << "," << z << std::endl; }, [](std::exception_ptr){abort();}); + std::sort(tried.begin(), tried.end()); + for (auto& t : tried) { + int x,y,z; std::tie(z,y,x) = t; std::cout << z << "," << y << "," << x << std::endl; + } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - duration_cast<milliseconds>(start.time_since_epoch()); @@ -93,6 +101,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -127,7 +136,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map( @@ -193,6 +202,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -227,7 +237,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) @@ -294,6 +304,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -330,7 +341,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp index 354fd24..db53387 100644 --- a/Rx/v2/test/operators/map.cpp +++ b/Rx/v2/test/operators/map.cpp @@ -16,6 +16,7 @@ namespace rxt=rxcpp::test; SCENARIO("map stops on completion", "[map][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -41,7 +42,7 @@ SCENARIO("map stops on completion", "[map][operators]"){ WHEN("mapped to ints that are one larger"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, &invoked]() { return xs .map([&invoked](int x) { diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index f043d18..02d799b 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -50,6 +50,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){ SCENARIO("publish", "[publish][multicast][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -77,24 +78,24 @@ SCENARIO("publish", "[publish][multicast][operators]"){ }; auto xs = sc.make_hot_observable(messages); - auto res = sc.make_subscriber<int>(); + auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ - sc.schedule_absolute(rxsc::test::created_time, + w.schedule_absolute(rxsc::test::created_time, [&invoked, &ys, &xs](const rxsc::schedulable& scbl){ ys = xs.publish().as_dynamic(); //ys = xs.publish_last().as_dynamic(); }); - sc.schedule_absolute(rxsc::test::subscribed_time, + w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl){ ys.subscribe(res); }); - sc.schedule_absolute(rxsc::test::unsubscribed_time, + w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl){ res.unsubscribe(); }); @@ -102,11 +103,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(300, + w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(400, + w.schedule_absolute(400, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); @@ -115,11 +116,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(500, + w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(550, + w.schedule_absolute(550, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); @@ -128,17 +129,17 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(650, + w.schedule_absolute(650, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(800, + w.schedule_absolute(800, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); } - sc.start(); + w.start(); THEN("the output only contains items sent while subscribed"){ record items[] = { diff --git a/Rx/v2/test/operators/take.cpp b/Rx/v2/test/operators/take.cpp index 6e8ed00..e44bd5b 100644 --- a/Rx/v2/test/operators/take.cpp +++ b/Rx/v2/test/operators/take.cpp @@ -15,6 +15,7 @@ namespace rxt=rxcpp::test; SCENARIO("take 2", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -35,7 +36,7 @@ SCENARIO("take 2", "[take][operators]"){ WHEN("2 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(2) @@ -71,6 +72,7 @@ SCENARIO("take 2", "[take][operators]"){ SCENARIO("take, complete after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -104,7 +106,7 @@ SCENARIO("take, complete after", "[take][operators]"){ WHEN("20 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(20) @@ -155,6 +157,7 @@ SCENARIO("take, complete after", "[take][operators]"){ SCENARIO("take, complete same", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -189,7 +192,7 @@ SCENARIO("take, complete same", "[take][operators]"){ WHEN("17 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(17) @@ -240,6 +243,7 @@ SCENARIO("take, complete same", "[take][operators]"){ SCENARIO("take, complete before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -274,7 +278,7 @@ SCENARIO("take, complete before", "[take][operators]"){ WHEN("10 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(10) @@ -318,6 +322,7 @@ SCENARIO("take, complete before", "[take][operators]"){ SCENARIO("take, error after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -354,7 +359,7 @@ SCENARIO("take, error after", "[take][operators]"){ WHEN("20 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(20) @@ -405,6 +410,7 @@ SCENARIO("take, error after", "[take][operators]"){ SCENARIO("take, error same", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -439,7 +445,7 @@ SCENARIO("take, error same", "[take][operators]"){ WHEN("17 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(17) @@ -490,6 +496,7 @@ SCENARIO("take, error same", "[take][operators]"){ SCENARIO("take, error before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -524,7 +531,7 @@ SCENARIO("take, error before", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -561,6 +568,7 @@ SCENARIO("take, error before", "[take][operators]"){ SCENARIO("take, dispose before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -594,7 +602,7 @@ SCENARIO("take, dispose before", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -630,6 +638,7 @@ SCENARIO("take, dispose before", "[take][operators]"){ SCENARIO("take, dispose after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -663,7 +672,7 @@ SCENARIO("take, dispose after", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -703,6 +712,7 @@ SCENARIO("take, dispose after", "[take][operators]"){ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -730,7 +740,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, ys]() { return xs .take_until(ys) @@ -775,6 +785,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -802,7 +813,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -847,6 +858,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -875,7 +887,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]") WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -920,6 +932,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]") SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -946,7 +959,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -993,6 +1006,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1018,7 +1032,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1065,6 +1079,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1087,7 +1102,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1130,6 +1145,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1153,7 +1169,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1196,6 +1212,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1217,7 +1234,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1257,6 +1274,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1277,7 +1295,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1317,6 +1335,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1341,7 +1360,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1384,6 +1403,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1410,7 +1430,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r, &sourceNotDisposed]() { return l .map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v;}) @@ -1442,6 +1462,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1468,7 +1489,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal" WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r, &signalNotDisposed]() { return l .take_until(r @@ -1501,6 +1522,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal" SCENARIO("take_until, error some", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1525,7 +1547,7 @@ SCENARIO("take_until, error some", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp index d98bd6b..ca75e59 100644 --- a/Rx/v2/test/subjects/subject.cpp +++ b/Rx/v2/test/subjects/subject.cpp @@ -256,6 +256,7 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){ GIVEN("a subject and an infinite source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -279,40 +280,40 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){ rxsub::subject<int> s; - auto results1 = sc.make_subscriber<int>(); + auto results1 = w.make_subscriber<int>(); - auto results2 = sc.make_subscriber<int>(); + auto results2 = w.make_subscriber<int>(); - auto results3 = sc.make_subscriber<int>(); + auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto o = s.get_subscriber(); - sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ s = rxsub::subject<int>(); o = s.get_subscriber();}); - sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ xs.subscribe(o);}); - sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ + w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ o.unsubscribe();}); - sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results1);}); - sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results2);}); - sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results3);}); - sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ results2.unsubscribe();}); - sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ results3.unsubscribe();}); - sc.start(); + w.start(); THEN("result1 contains expected messages"){ record items[] = { @@ -353,6 +354,7 @@ SCENARIO("subject - finite source", "[subject][subjects]"){ GIVEN("a subject and an finite source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -377,40 +379,40 @@ SCENARIO("subject - finite source", "[subject][subjects]"){ rxsub::subject<int> s; - auto results1 = sc.make_subscriber<int>(); + auto results1 = w.make_subscriber<int>(); - auto results2 = sc.make_subscriber<int>(); + auto results2 = w.make_subscriber<int>(); - auto results3 = sc.make_subscriber<int>(); + auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto o = s.get_subscriber(); - sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ s = rxsub::subject<int>(); o = s.get_subscriber();}); - sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ xs.subscribe(o);}); - sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ + w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ o.unsubscribe();}); - sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results1);}); - sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results2);}); - sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results3);}); - sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ results2.unsubscribe();}); - sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ results3.unsubscribe();}); - sc.start(); + w.start(); THEN("result1 contains expected messages"){ record items[] = { @@ -452,6 +454,7 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){ GIVEN("a subject and a source with an error"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -478,40 +481,40 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){ rxsub::subject<int> s; - auto results1 = sc.make_subscriber<int>(); + auto results1 = w.make_subscriber<int>(); - auto results2 = sc.make_subscriber<int>(); + auto results2 = w.make_subscriber<int>(); - auto results3 = sc.make_subscriber<int>(); + auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto o = s.get_subscriber(); - sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){ s = rxsub::subject<int>(); o = s.get_subscriber();}); - sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ + w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){ xs.subscribe(o);}); - sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ + w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){ o.unsubscribe();}); - sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results1);}); - sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results2);}); - sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){ s.get_observable().subscribe(results3);}); - sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ + w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){ results2.unsubscribe();}); - sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ + w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){ results1.unsubscribe();}); - sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ + w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){ results3.unsubscribe();}); - sc.start(); + w.start(); THEN("result1 contains expected messages"){ record items[] = { |