summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-04-29 23:46:04 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-05-01 14:03:55 -0700
commit09b16daca270ed0ade1f41f76c0910d6442e95de (patch)
treefba873aeed4c42c57dde9c5cccc7b1203ae8443f /Rx
parent15807b31b06a4558b55356a3d69c0287f6177f7d (diff)
downloadRxCpp-09b16daca270ed0ade1f41f76c0910d6442e95de.tar.gz
fix virtual_time and test scheduler
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp26
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-test.hpp332
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp57
-rw-r--r--Rx/v2/test/operators/filter.cpp23
-rw-r--r--Rx/v2/test/operators/flat_map.cpp25
-rw-r--r--Rx/v2/test/operators/map.cpp3
-rw-r--r--Rx/v2/test/operators/publish.cpp23
-rw-r--r--Rx/v2/test/operators/take.cpp66
-rw-r--r--Rx/v2/test/subjects/subject.cpp87
9 files changed, 363 insertions, 279 deletions
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index a57851c..4b01c36 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -462,32 +462,6 @@ public:
: scoped(false)
{
}
- schedulable(const schedulable& o)
- : lifetime(o.lifetime)
- , controller(o.controller)
- , activity(o.activity)
- , scoped(o.scoped)
- , action_scope(o.scoped ? controller.add(lifetime) : weak_subscription())
- {
- }
- schedulable(schedulable&& o)
- : lifetime(std::move(o.lifetime))
- , controller(std::move(o.controller))
- , activity(std::move(o.activity))
- , scoped(o.scoped)
- , action_scope(std::move(o.action_scope))
- {
- o.scoped = false;
- }
- schedulable& operator =(schedulable o) {
- using std::swap;
- swap(lifetime, o.lifetime);
- swap(controller, o.controller);
- swap(activity, o.activity);
- swap(scoped, o.scoped);
- swap(action_scope, o.action_scope);
- return *this;
- }
/// action and worker share lifetime
schedulable(worker q, action a)
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
index d3308d5..411bd43 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
@@ -13,54 +13,141 @@ namespace schedulers {
namespace detail {
-class test_type : public virtual_time<long, long>
+class test_type : public scheduler_interface
{
public:
- typedef virtual_time<long, long> base;
- typedef base::clock_type clock_type;
- typedef std::shared_ptr<test_type> shared;
- using base::schedule_absolute;
- using base::schedule_relative;
+ typedef scheduler_interface::clock_type clock_type;
- virtual worker get_worker() const
+ struct test_type_state : public virtual_time<long, long>
{
- return base::get_worker();
- }
+ typedef virtual_time<long, long> base;
+
+ using base::schedule_absolute;
+ using base::schedule_relative;
+
+ clock_type::time_point now() const {
+ return to_time_point(clock_now);
+ }
+
+ virtual void schedule_absolute(long when, const schedulable& a) const
+ {
+ if (when <= base::clock_now)
+ when = base::clock_now + 1;
- virtual void schedule_absolute(long when, const schedulable& a) const
+ return base::schedule_absolute(when, a);
+ }
+
+ virtual long add(long absolute, long relative) const
+ {
+ return absolute + relative;
+ }
+
+ virtual clock_type::time_point to_time_point(long absolute) const
+ {
+ return clock_type::time_point(clock_type::duration(absolute));
+ }
+
+ virtual long to_relative(clock_type::duration d) const
+ {
+ return static_cast<long>(d.count());
+ }
+ };
+
+private:
+ mutable std::shared_ptr<test_type_state> state;
+
+public:
+ struct test_type_worker : public worker_interface
{
- if (when <= base::clock_now)
- when = base::clock_now + 1;
+ mutable std::shared_ptr<test_type_state> state;
- return base::schedule_absolute(when, a);
- }
+ typedef test_type_state::absolute absolute;
+ typedef test_type_state::relative relative;
+
+ test_type_worker(std::shared_ptr<test_type_state> st)
+ : state(std::move(st))
+ {
+ }
+
+ virtual clock_type::time_point now() const {
+ return state->now();
+ }
+
+ virtual void schedule(const schedulable& scbl) const {
+ state->schedule_absolute(state->clock(), scbl);
+ }
+
+ virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
+ state->schedule_absolute(state->to_relative(when - now()), scbl);
+ }
+
+
+ void schedule_absolute(absolute when, const schedulable& scbl) const {
+ state->schedule_absolute(when, scbl);
+ }
+
+ void schedule_relative(relative when, const schedulable& scbl) const {
+ state->schedule_relative(when, scbl);
+ }
+
+ bool is_enabled() const {return state->is_enabled();}
+ absolute clock() const {return state->clock();}
+
+ void start() const
+ {
+ state->start();
+ }
+
+ void stop() const
+ {
+ state->stop();
+ }
+
+ void advance_to(absolute time) const
+ {
+ state->advance_to(time);
+ }
- virtual long add(long absolute, long relative) const
+ void advance_by(relative time) const
+ {
+ state->advance_by(time);
+ }
+
+ void sleep(relative time) const
+ {
+ state->sleep(time);
+ }
+
+ template<class T>
+ subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
+ };
+
+public:
+ test_type()
+ : state(new test_type_state())
{
- return absolute + relative;
}
- virtual clock_type::time_point to_time_point(long absolute) const
- {
- return clock_type::time_point(clock_type::duration(absolute));
+ virtual clock_type::time_point now() const {
+ return state->now();
}
- virtual long to_relative(clock_type::duration d) const
- {
- return static_cast<long>(d.count());
+ virtual worker create_worker(composite_subscription cs) const {
+ std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
+ return worker(cs, wi);
}
- using base::start;
+ std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
+ std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
+ return wi;
+ }
template<class T>
rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
template<class T>
rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
-
- template<class T>
- subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
};
template<class T>
@@ -71,12 +158,12 @@ class mock_observer
typedef rxn::recorded<typename notification_type::type> recorded_type;
public:
- mock_observer(typename test_type::shared sc)
+ explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
: sc(sc)
{
}
- typename test_type::shared sc;
+ std::shared_ptr<test_type::test_type_state> sc;
std::vector<recorded_type> m;
virtual void on_subscribe(subscriber<T>) const {
@@ -92,12 +179,12 @@ public:
};
template<class T>
-subscriber<T, rxt::testable_observer<T>> test_type::make_subscriber() const
+subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
{
typedef typename rxn::notification<T> notification_type;
typedef rxn::recorded<typename notification_type::type> recorded_type;
- std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this()))));
+ std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(state));
return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
// on_next
@@ -125,30 +212,32 @@ class cold_observable
: public rxt::detail::test_subject_base<T>
{
typedef cold_observable<T> this_type;
- typename test_type::shared sc;
+ std::shared_ptr<test_type::test_type_state> sc;
typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
mutable std::vector<recorded_type> mv;
mutable std::vector<rxn::subscription> sv;
+ mutable worker controller;
public:
- cold_observable(typename test_type::shared sc, std::vector<recorded_type> mv)
+ cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
: sc(sc)
, mv(std::move(mv))
+ , controller(w)
{
}
template<class Iterator>
- cold_observable(typename test_type::shared sc, Iterator begin, Iterator end)
+ cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
: sc(sc)
, mv(begin, end)
+ , controller(w)
{
}
virtual void on_subscribe(subscriber<T> o) const {
sv.push_back(rxn::subscription(sc->clock()));
auto index = sv.size() - 1;
- auto controller = sc->create_worker(composite_subscription());
for (auto& message : mv) {
auto n = message.value();
@@ -179,7 +268,7 @@ public:
template<class T>
rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
{
- auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this())), std::move(messages)));
+ auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(state, create_worker(composite_subscription()), std::move(messages)));
return rxt::testable_observable<T>(co);
}
@@ -188,20 +277,21 @@ class hot_observable
: public rxt::detail::test_subject_base<T>
{
typedef hot_observable<T> this_type;
- typename test_type::shared sc;
+ std::shared_ptr<test_type::test_type_state> sc;
typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
typedef subscriber<T> observer_type;
mutable std::vector<recorded_type> mv;
mutable std::vector<rxn::subscription> sv;
mutable std::vector<observer_type> observers;
+ mutable worker controller;
public:
- hot_observable(typename test_type::shared sc, std::vector<recorded_type> mv)
+ hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
: sc(sc)
, mv(mv)
+ , controller(w)
{
- auto controller = sc->create_worker(composite_subscription());
for (auto& message : mv) {
auto n = message.value();
sc->schedule_absolute(message.time(), make_schedulable(
@@ -240,18 +330,18 @@ public:
template<class T>
rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
{
- return rxt::testable_observable<T>(std::make_shared<hot_observable<T>>(
- std::const_pointer_cast<test_type>(std::static_pointer_cast<const test_type>(shared_from_this())), std::move(messages)));
+ return rxt::testable_observable<T>(
+ std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)));
}
}
class test : public scheduler
{
- detail::test_type::shared tester;
+ std::shared_ptr<detail::test_type> tester;
public:
- explicit test(detail::test_type::shared t)
+ explicit test(std::shared_ptr<detail::test_type> t)
: scheduler(std::static_pointer_cast<scheduler_interface>(t))
, tester(t)
{
@@ -305,85 +395,110 @@ public:
~messages();
};
- void schedule_absolute(long when, const schedulable& a) const {
- tester->schedule_absolute(when, a);
- }
+ class test_worker : public worker
+ {
+ std::shared_ptr<detail::test_type::test_type_worker> tester;
+ public:
- void schedule_relative(long when, const schedulable& a) const {
- tester->schedule_relative(when, a);
- }
+ explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
+ : worker(cs, std::static_pointer_cast<worker_interface>(t))
+ , tester(t)
+ {
+ }
- template<class Arg0, class... ArgN>
- auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type {
- tester->schedule_absolute(when, make_schedulable(tester->get_worker(), std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
- }
+ void schedule_absolute(long when, const schedulable& a) const {
+ tester->schedule_absolute(when, a);
+ }
- template<class Arg0, class... ArgN>
- auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type {
- tester->schedule_relative(when, make_schedulable(tester->get_worker(), std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
- }
+ void schedule_relative(long when, const schedulable& a) const {
+ tester->schedule_relative(when, a);
+ }
- template<class T, class F>
- auto start(F&& createSource, long created, long subscribed, long unsubscribed) const
- -> subscriber<T, rxt::testable_observer<T>>
- {
- typename std::decay<F>::type createSrc = std::forward<F>(createSource);
+ template<class Arg0, class... ArgN>
+ auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
+ -> typename std::enable_if<
+ (detail::is_action_function<Arg0>::value ||
+ is_subscription<Arg0>::value) &&
+ !is_schedulable<Arg0>::value>::type {
+ tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
+ }
- struct state_type
- : public std::enable_shared_from_this<state_type>
- {
- typedef decltype(std::forward<F>(createSrc)()) source_type;
+ template<class Arg0, class... ArgN>
+ auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
+ -> typename std::enable_if<
+ (detail::is_action_function<Arg0>::value ||
+ is_subscription<Arg0>::value) &&
+ !is_schedulable<Arg0>::value>::type {
+ tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
+ }
- std::unique_ptr<source_type> source;
- subscriber<T, rxt::testable_observer<T>> o;
+ template<class T, class F>
+ auto start(F&& createSource, long created, long subscribed, long unsubscribed) const
+ -> subscriber<T, rxt::testable_observer<T>>
+ {
+ typename std::decay<F>::type createSrc = std::forward<F>(createSource);
- explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
- : source()
- , o(o)
+ struct state_type
+ : public std::enable_shared_from_this<state_type>
{
- }
- };
- std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>()));
+ typedef decltype(std::forward<F>(createSrc)()) source_type;
+
+ std::unique_ptr<source_type> source;
+ subscriber<T, rxt::testable_observer<T>> o;
+
+ explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
+ : source()
+ , o(o)
+ {
+ }
+ };
+ std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>()));
+
+ schedule_absolute(created, [createSrc, state](const schedulable& scbl) {
+ state->source.reset(new typename state_type::source_type(createSrc()));
+ });
+ schedule_absolute(subscribed, [state](const schedulable& scbl) {
+ state->source->subscribe(state->o);
+ });
+ schedule_absolute(unsubscribed, [state](const schedulable& scbl) {
+ state->o.unsubscribe();
+ });
+
+ tester->start();
+
+ return state->o;
+ }
- schedule_absolute(created, [createSrc, state](const schedulable& scbl) {
- state->source.reset(new typename state_type::source_type(createSrc()));
- });
- schedule_absolute(subscribed, [state](const schedulable& scbl) {
- state->source->subscribe(state->o);
- });
- schedule_absolute(unsubscribed, [state](const schedulable& scbl) {
- state->o.unsubscribe();
- });
+ template<class T, class F>
+ auto start(F&& createSource, long unsubscribed) const
+ -> subscriber<T, rxt::testable_observer<T>>
+ {
+ return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
+ }
- tester->start();
+ template<class T, class F>
+ auto start(F&& createSource) const
+ -> subscriber<T, rxt::testable_observer<T>>
+ {
+ return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
+ }
- return state->o;
- }
+ void start() const {
+ tester->start();
+ }
- template<class T, class F>
- auto start(F&& createSource, long unsubscribed) const
- -> subscriber<T, rxt::testable_observer<T>>
- {
- return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
- }
+ template<class T>
+ subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
+ return tester->make_subscriber<T>();
+ }
+ };
- template<class T, class F>
- auto start(F&& createSource) const
- -> subscriber<T, rxt::testable_observer<T>>
- {
- return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
+ clock_type::time_point now() const {
+ return tester->now();
}
- void start() const {
- tester->start();
+ test_worker create_worker(composite_subscription cs = composite_subscription()) const {
+ return test_worker(cs, tester->create_test_type_worker_interface());
}
template<class T>
@@ -407,11 +522,6 @@ public:
-> decltype(tester->make_cold_observable(std::vector<T>())) {
return tester->make_cold_observable(rxu::to_vector(arr));
}
-
- template<class T>
- subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
- return tester->make_subscriber<T>();
- }
};
template<class T>
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
index 34fc8a3..bc42e5c 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
@@ -14,7 +14,7 @@ namespace schedulers {
namespace detail {
template<class Absolute, class Relative>
-struct virtual_time_base : public scheduler_interface
+struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>>
{
private:
typedef virtual_time_base<Absolute, Relative> this_type;
@@ -22,30 +22,6 @@ private:
mutable bool isenabled;
- struct virtual_worker : public worker_interface
- {
- std::shared_ptr<virtual_time_base> controller;
-
- virtual_worker(std::shared_ptr<virtual_time_base> vb)
- : controller(std::move(vb))
- {
- }
-
- virtual clock_type::time_point now() const {
- return controller->to_time_point(controller->clock_now);
- }
-
- virtual void schedule(const schedulable& scbl) const {
- controller->schedule_absolute(controller->clock_now, scbl);
- }
-
- virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
- controller->schedule_absolute(controller->to_relative(when - now()), scbl);
- }
- };
-
- std::shared_ptr<>
-
public:
typedef Absolute absolute;
typedef Relative relative;
@@ -80,6 +56,7 @@ protected:
virtual bool empty() const =0;
public:
+
virtual void schedule_absolute(absolute, const schedulable&) const =0;
virtual void schedule_relative(relative when, const schedulable& a) const {
@@ -129,6 +106,7 @@ public:
if (!isenabled) {
isenabled = true;
+ rxsc::recursion r;
while (!empty() && isenabled) {
auto next = top();
pop();
@@ -136,7 +114,7 @@ public:
if (next.when > clock_now) {
clock_now = next.when;
}
- next.what.get_action()(next.what);
+ next.what(r.get_recurse());
}
else {
isenabled = false;
@@ -181,28 +159,13 @@ public:
clock_now = dt;
}
- virtual clock_type::time_point now() const {
- return to_time_point(clock_now);
- }
-
- virtual worker create_worker(composite_subscription cs) const {
- std::shared_ptr<virtual_worker> wi(new virtual_worker(
- std::static_pointer_cast<virtual_time_base>(
- std::const_pointer_cast<scheduler_interface>(shared_from_this()))));
- return worker(cs, wi);
- }
-
};
}
template<class Absolute, class Relative>
-class virtual_time : public detail::virtual_time_base<Absolute, Relative>
+struct virtual_time : public detail::virtual_time_base<Absolute, Relative>
{
-private:
- typedef virtual_time this_type;
- virtual_time(const this_type&);
-
typedef detail::virtual_time_base<Absolute, Relative> base;
typedef typename base::item_type item_type;
@@ -212,8 +175,6 @@ private:
mutable queue_item_time queue;
- worker controller;
-
public:
virtual ~virtual_time()
{
@@ -222,16 +183,12 @@ public:
protected:
virtual_time()
{
- controller = base::create_worker(composite_subscription());
}
explicit virtual_time(typename base::absolute initialClock)
: base(initialClock)
{
- controller = base::create_worker(composite_subscription());
}
- virtual worker get_worker() const {return controller;}
-
virtual item_type top() const {
return queue.top();
}
@@ -249,7 +206,7 @@ protected:
{
// use a separate subscription here so that a's subscription is not affected
auto run = make_schedulable(
- get_worker(),
+ a.get_worker(),
composite_subscription(),
[a](const schedulable& scbl) {
rxsc::recursion r;
@@ -261,10 +218,10 @@ protected:
});
queue.push(item_type(when, run));
}
-
};
+
}
}
diff --git a/Rx/v2/test/operators/filter.cpp b/Rx/v2/test/operators/filter.cpp
index ac101ef..3e9e61d 100644
--- a/Rx/v2/test/operators/filter.cpp
+++ b/Rx/v2/test/operators/filter.cpp
@@ -31,6 +31,7 @@ bool IsPrime(int x)
SCENARIO("filter stops on completion", "[filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -61,7 +62,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){
auto xs = sc.make_hot_observable(messages);
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[&xs, &invoked]() {
#if 0 && RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -119,6 +120,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){
SCENARIO("filter stops on disposal", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -146,7 +148,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[&xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -199,6 +201,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){
SCENARIO("filter stops on error", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -232,7 +235,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -286,6 +289,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){
SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -319,7 +323,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[ex, xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -377,6 +381,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -406,13 +411,13 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]")
};
auto xs = sc.make_hot_observable(rxu::to_vector(messages));
- auto res = sc.make_subscriber<int>();
+ auto res = w.make_subscriber<int>();
rx::observable<int, rx::dynamic_observable<int>> ys;
WHEN("filtered to ints that are primes"){
- sc.schedule_absolute(rxsc::test::created_time,
+ w.schedule_absolute(rxsc::test::created_time,
[&invoked, &res, &ys, &xs](const rxsc::schedulable& scbl) {
#if RXCPP_USE_OBSERVABLE_MEMBERS
ys = xs
@@ -433,15 +438,15 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]")
#endif
});
- sc.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) {
+ w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) {
ys.subscribe(res);
});
- sc.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) {
+ w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) {
res.unsubscribe();
});
- sc.start();
+ w.start();
THEN("the output only contains primes"){
record items[] = {
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 7e6401e..ead0b0e 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -12,7 +12,7 @@ namespace rxt=rxcpp::test;
#include "catch.hpp"
-static const int static_tripletCount = 500;
+static const int static_tripletCount = 2;
SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
@@ -32,6 +32,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
for(int y = x; y <= z; ++y)
{
++c;
+ std::cout << z << "," << y << "," << x << std::endl;
if(x*x + y*y == z*z)
{
//result += (x + y + z);
@@ -60,6 +61,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
using namespace std::chrono;
typedef steady_clock clock;
+ std::vector<std::tuple<int, int, int>> tried;
int c = 0;
int ct = 0;
int n = 1;
@@ -67,10 +69,12 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
auto triples =
rxs::range(1)
.flat_map(
- [&c](int z){ return rxs::range(1, z)
+ [&c, &tried](int z){ return rxs::range(1, z)
.flat_map(
- [&c, z](int x){ return rxs::range(x, z)
- .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
+ [&c, &tried, z](int x){ return rxs::range(x, z)
+ .filter([&c, &tried, z, x](int y){++c;
+ tried.push_back(std::make_tuple(z, y, x));
+ return x*x + y*y == z*z;})
.map([z, x](int y){return std::make_tuple(x, y, z);});},
[](int x, std::tuple<int,int,int> triplet){return triplet;});},
[](int z, std::tuple<int,int,int> triplet){return triplet;});
@@ -81,6 +85,10 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
//int x,y,z; std::tie(x,y,z) = triplet; std::cout << x << "," << y << "," << z << std::endl;
},
[](std::exception_ptr){abort();});
+ std::sort(tried.begin(), tried.end());
+ for (auto& t : tried) {
+ int x,y,z; std::tie(z,y,x) = t; std::cout << z << "," << y << "," << x << std::endl;
+ }
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
duration_cast<milliseconds>(start.time_since_epoch());
@@ -93,6 +101,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
SCENARIO("flat_map completes", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -127,7 +136,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map(
@@ -193,6 +202,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -227,7 +237,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map([&](int){return ys;}, [](int, std::string s){return s;})
@@ -294,6 +304,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -330,7 +341,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map([&](int){return ys;}, [](int, std::string s){return s;})
diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp
index 354fd24..db53387 100644
--- a/Rx/v2/test/operators/map.cpp
+++ b/Rx/v2/test/operators/map.cpp
@@ -16,6 +16,7 @@ namespace rxt=rxcpp::test;
SCENARIO("map stops on completion", "[map][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -41,7 +42,7 @@ SCENARIO("map stops on completion", "[map][operators]"){
WHEN("mapped to ints that are one larger"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, &invoked]() {
return xs
.map([&invoked](int x) {
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index f043d18..02d799b 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -50,6 +50,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){
SCENARIO("publish", "[publish][multicast][operators]"){
GIVEN("a test hot observable of longs"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -77,24 +78,24 @@ SCENARIO("publish", "[publish][multicast][operators]"){
};
auto xs = sc.make_hot_observable(messages);
- auto res = sc.make_subscriber<int>();
+ auto res = w.make_subscriber<int>();
rx::connectable_observable<int> ys;
WHEN("subscribed and then connected"){
- sc.schedule_absolute(rxsc::test::created_time,
+ w.schedule_absolute(rxsc::test::created_time,
[&invoked, &ys, &xs](const rxsc::schedulable& scbl){
ys = xs.publish().as_dynamic();
//ys = xs.publish_last().as_dynamic();
});
- sc.schedule_absolute(rxsc::test::subscribed_time,
+ w.schedule_absolute(rxsc::test::subscribed_time,
[&ys, &res](const rxsc::schedulable& scbl){
ys.subscribe(res);
});
- sc.schedule_absolute(rxsc::test::unsubscribed_time,
+ w.schedule_absolute(rxsc::test::unsubscribed_time,
[&res](const rxsc::schedulable& scbl){
res.unsubscribe();
});
@@ -102,11 +103,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(300,
+ w.schedule_absolute(300,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(400,
+ w.schedule_absolute(400,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
@@ -115,11 +116,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(500,
+ w.schedule_absolute(500,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(550,
+ w.schedule_absolute(550,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
@@ -128,17 +129,17 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(650,
+ w.schedule_absolute(650,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(800,
+ w.schedule_absolute(800,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
}
- sc.start();
+ w.start();
THEN("the output only contains items sent while subscribed"){
record items[] = {
diff --git a/Rx/v2/test/operators/take.cpp b/Rx/v2/test/operators/take.cpp
index 6e8ed00..e44bd5b 100644
--- a/Rx/v2/test/operators/take.cpp
+++ b/Rx/v2/test/operators/take.cpp
@@ -15,6 +15,7 @@ namespace rxt=rxcpp::test;
SCENARIO("take 2", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -35,7 +36,7 @@ SCENARIO("take 2", "[take][operators]"){
WHEN("2 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(2)
@@ -71,6 +72,7 @@ SCENARIO("take 2", "[take][operators]"){
SCENARIO("take, complete after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -104,7 +106,7 @@ SCENARIO("take, complete after", "[take][operators]"){
WHEN("20 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(20)
@@ -155,6 +157,7 @@ SCENARIO("take, complete after", "[take][operators]"){
SCENARIO("take, complete same", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -189,7 +192,7 @@ SCENARIO("take, complete same", "[take][operators]"){
WHEN("17 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(17)
@@ -240,6 +243,7 @@ SCENARIO("take, complete same", "[take][operators]"){
SCENARIO("take, complete before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -274,7 +278,7 @@ SCENARIO("take, complete before", "[take][operators]"){
WHEN("10 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(10)
@@ -318,6 +322,7 @@ SCENARIO("take, complete before", "[take][operators]"){
SCENARIO("take, error after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -354,7 +359,7 @@ SCENARIO("take, error after", "[take][operators]"){
WHEN("20 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(20)
@@ -405,6 +410,7 @@ SCENARIO("take, error after", "[take][operators]"){
SCENARIO("take, error same", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -439,7 +445,7 @@ SCENARIO("take, error same", "[take][operators]"){
WHEN("17 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(17)
@@ -490,6 +496,7 @@ SCENARIO("take, error same", "[take][operators]"){
SCENARIO("take, error before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -524,7 +531,7 @@ SCENARIO("take, error before", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -561,6 +568,7 @@ SCENARIO("take, error before", "[take][operators]"){
SCENARIO("take, dispose before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -594,7 +602,7 @@ SCENARIO("take, dispose before", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -630,6 +638,7 @@ SCENARIO("take, dispose before", "[take][operators]"){
SCENARIO("take, dispose after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -663,7 +672,7 @@ SCENARIO("take, dispose after", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -703,6 +712,7 @@ SCENARIO("take, dispose after", "[take][operators]"){
SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -730,7 +740,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, ys]() {
return xs
.take_until(ys)
@@ -775,6 +785,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -802,7 +813,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -847,6 +858,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -875,7 +887,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]")
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -920,6 +932,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]")
SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -946,7 +959,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -993,6 +1006,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators
SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1018,7 +1032,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1065,6 +1079,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators
SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1087,7 +1102,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1130,6 +1145,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1153,7 +1169,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1196,6 +1212,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1217,7 +1234,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1257,6 +1274,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1277,7 +1295,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1317,6 +1335,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1341,7 +1360,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1384,6 +1403,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat
SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1410,7 +1430,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r, &sourceNotDisposed]() {
return l
.map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v;})
@@ -1442,6 +1462,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un
SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1468,7 +1489,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal"
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r, &signalNotDisposed]() {
return l
.take_until(r
@@ -1501,6 +1522,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal"
SCENARIO("take_until, error some", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1525,7 +1547,7 @@ SCENARIO("take_until, error some", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp
index d98bd6b..ca75e59 100644
--- a/Rx/v2/test/subjects/subject.cpp
+++ b/Rx/v2/test/subjects/subject.cpp
@@ -256,6 +256,7 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){
GIVEN("a subject and an infinite source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -279,40 +280,40 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){
rxsub::subject<int> s;
- auto results1 = sc.make_subscriber<int>();
+ auto results1 = w.make_subscriber<int>();
- auto results2 = sc.make_subscriber<int>();
+ auto results2 = w.make_subscriber<int>();
- auto results3 = sc.make_subscriber<int>();
+ auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto o = s.get_subscriber();
- sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
s = rxsub::subject<int>(); o = s.get_subscriber();});
- sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
xs.subscribe(o);});
- sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
o.unsubscribe();});
- sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results1);});
- sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results2);});
- sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results3);});
- sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
results2.unsubscribe();});
- sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
results3.unsubscribe();});
- sc.start();
+ w.start();
THEN("result1 contains expected messages"){
record items[] = {
@@ -353,6 +354,7 @@ SCENARIO("subject - finite source", "[subject][subjects]"){
GIVEN("a subject and an finite source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -377,40 +379,40 @@ SCENARIO("subject - finite source", "[subject][subjects]"){
rxsub::subject<int> s;
- auto results1 = sc.make_subscriber<int>();
+ auto results1 = w.make_subscriber<int>();
- auto results2 = sc.make_subscriber<int>();
+ auto results2 = w.make_subscriber<int>();
- auto results3 = sc.make_subscriber<int>();
+ auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto o = s.get_subscriber();
- sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
s = rxsub::subject<int>(); o = s.get_subscriber();});
- sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
xs.subscribe(o);});
- sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
o.unsubscribe();});
- sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results1);});
- sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results2);});
- sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results3);});
- sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
results2.unsubscribe();});
- sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
results3.unsubscribe();});
- sc.start();
+ w.start();
THEN("result1 contains expected messages"){
record items[] = {
@@ -452,6 +454,7 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){
GIVEN("a subject and a source with an error"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -478,40 +481,40 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){
rxsub::subject<int> s;
- auto results1 = sc.make_subscriber<int>();
+ auto results1 = w.make_subscriber<int>();
- auto results2 = sc.make_subscriber<int>();
+ auto results2 = w.make_subscriber<int>();
- auto results3 = sc.make_subscriber<int>();
+ auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto o = s.get_subscriber();
- sc.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(100, [&s, &o](const rxsc::schedulable& scbl){
s = rxsub::subject<int>(); o = s.get_subscriber();});
- sc.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable& scbl){
xs.subscribe(o);});
- sc.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
+ w.schedule_absolute(1000, [&o](const rxsc::schedulable& scbl){
o.unsubscribe();});
- sc.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results1);});
- sc.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results2);});
- sc.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable& scbl){
s.get_observable().subscribe(results3);});
- sc.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(600, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
+ w.schedule_absolute(700, [&results2](const rxsc::schedulable& scbl){
results2.unsubscribe();});
- sc.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
+ w.schedule_absolute(800, [&results1](const rxsc::schedulable& scbl){
results1.unsubscribe();});
- sc.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
+ w.schedule_absolute(950, [&results3](const rxsc::schedulable& scbl){
results3.unsubscribe();});
- sc.start();
+ w.start();
THEN("result1 contains expected messages"){
record items[] = {