summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-04-29 23:46:04 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-05-01 14:03:55 -0700
commit09b16daca270ed0ade1f41f76c0910d6442e95de (patch)
treefba873aeed4c42c57dde9c5cccc7b1203ae8443f /Rx/v2/test/operators
parent15807b31b06a4558b55356a3d69c0287f6177f7d (diff)
downloadRxCpp-09b16daca270ed0ade1f41f76c0910d6442e95de.tar.gz
fix virtual_time and test scheduler
Diffstat (limited to 'Rx/v2/test/operators')
-rw-r--r--Rx/v2/test/operators/filter.cpp23
-rw-r--r--Rx/v2/test/operators/flat_map.cpp25
-rw-r--r--Rx/v2/test/operators/map.cpp3
-rw-r--r--Rx/v2/test/operators/publish.cpp23
-rw-r--r--Rx/v2/test/operators/take.cpp66
5 files changed, 90 insertions, 50 deletions
diff --git a/Rx/v2/test/operators/filter.cpp b/Rx/v2/test/operators/filter.cpp
index ac101ef..3e9e61d 100644
--- a/Rx/v2/test/operators/filter.cpp
+++ b/Rx/v2/test/operators/filter.cpp
@@ -31,6 +31,7 @@ bool IsPrime(int x)
SCENARIO("filter stops on completion", "[filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -61,7 +62,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){
auto xs = sc.make_hot_observable(messages);
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[&xs, &invoked]() {
#if 0 && RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -119,6 +120,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){
SCENARIO("filter stops on disposal", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -146,7 +148,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[&xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -199,6 +201,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){
SCENARIO("filter stops on error", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -232,7 +235,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -286,6 +289,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){
SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -319,7 +323,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
WHEN("filtered to ints that are primes"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[ex, xs, &invoked]() {
#if RXCPP_USE_OBSERVABLE_MEMBERS
return xs
@@ -377,6 +381,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){
SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -406,13 +411,13 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]")
};
auto xs = sc.make_hot_observable(rxu::to_vector(messages));
- auto res = sc.make_subscriber<int>();
+ auto res = w.make_subscriber<int>();
rx::observable<int, rx::dynamic_observable<int>> ys;
WHEN("filtered to ints that are primes"){
- sc.schedule_absolute(rxsc::test::created_time,
+ w.schedule_absolute(rxsc::test::created_time,
[&invoked, &res, &ys, &xs](const rxsc::schedulable& scbl) {
#if RXCPP_USE_OBSERVABLE_MEMBERS
ys = xs
@@ -433,15 +438,15 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]")
#endif
});
- sc.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) {
+ w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) {
ys.subscribe(res);
});
- sc.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) {
+ w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) {
res.unsubscribe();
});
- sc.start();
+ w.start();
THEN("the output only contains primes"){
record items[] = {
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 7e6401e..ead0b0e 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -12,7 +12,7 @@ namespace rxt=rxcpp::test;
#include "catch.hpp"
-static const int static_tripletCount = 500;
+static const int static_tripletCount = 2;
SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
@@ -32,6 +32,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
for(int y = x; y <= z; ++y)
{
++c;
+ std::cout << z << "," << y << "," << x << std::endl;
if(x*x + y*y == z*z)
{
//result += (x + y + z);
@@ -60,6 +61,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
using namespace std::chrono;
typedef steady_clock clock;
+ std::vector<std::tuple<int, int, int>> tried;
int c = 0;
int ct = 0;
int n = 1;
@@ -67,10 +69,12 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
auto triples =
rxs::range(1)
.flat_map(
- [&c](int z){ return rxs::range(1, z)
+ [&c, &tried](int z){ return rxs::range(1, z)
.flat_map(
- [&c, z](int x){ return rxs::range(x, z)
- .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
+ [&c, &tried, z](int x){ return rxs::range(x, z)
+ .filter([&c, &tried, z, x](int y){++c;
+ tried.push_back(std::make_tuple(z, y, x));
+ return x*x + y*y == z*z;})
.map([z, x](int y){return std::make_tuple(x, y, z);});},
[](int x, std::tuple<int,int,int> triplet){return triplet;});},
[](int z, std::tuple<int,int,int> triplet){return triplet;});
@@ -81,6 +85,10 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
//int x,y,z; std::tie(x,y,z) = triplet; std::cout << x << "," << y << "," << z << std::endl;
},
[](std::exception_ptr){abort();});
+ std::sort(tried.begin(), tried.end());
+ for (auto& t : tried) {
+ int x,y,z; std::tie(z,y,x) = t; std::cout << z << "," << y << "," << x << std::endl;
+ }
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
duration_cast<milliseconds>(start.time_since_epoch());
@@ -93,6 +101,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){
SCENARIO("flat_map completes", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -127,7 +136,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map(
@@ -193,6 +202,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -227,7 +237,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map([&](int){return ys;}, [](int, std::string s){return s;})
@@ -294,6 +304,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxsc::test::messages<std::string> ms;
typedef rxn::subscription life;
@@ -330,7 +341,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
WHEN("each int is mapped to the strings"){
- auto res = sc.start<std::string>(
+ auto res = w.start<std::string>(
[&]() {
return xs
.flat_map([&](int){return ys;}, [](int, std::string s){return s;})
diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp
index 354fd24..db53387 100644
--- a/Rx/v2/test/operators/map.cpp
+++ b/Rx/v2/test/operators/map.cpp
@@ -16,6 +16,7 @@ namespace rxt=rxcpp::test;
SCENARIO("map stops on completion", "[map][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -41,7 +42,7 @@ SCENARIO("map stops on completion", "[map][operators]"){
WHEN("mapped to ints that are one larger"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, &invoked]() {
return xs
.map([&invoked](int x) {
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index f043d18..02d799b 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -50,6 +50,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){
SCENARIO("publish", "[publish][multicast][operators]"){
GIVEN("a test hot observable of longs"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -77,24 +78,24 @@ SCENARIO("publish", "[publish][multicast][operators]"){
};
auto xs = sc.make_hot_observable(messages);
- auto res = sc.make_subscriber<int>();
+ auto res = w.make_subscriber<int>();
rx::connectable_observable<int> ys;
WHEN("subscribed and then connected"){
- sc.schedule_absolute(rxsc::test::created_time,
+ w.schedule_absolute(rxsc::test::created_time,
[&invoked, &ys, &xs](const rxsc::schedulable& scbl){
ys = xs.publish().as_dynamic();
//ys = xs.publish_last().as_dynamic();
});
- sc.schedule_absolute(rxsc::test::subscribed_time,
+ w.schedule_absolute(rxsc::test::subscribed_time,
[&ys, &res](const rxsc::schedulable& scbl){
ys.subscribe(res);
});
- sc.schedule_absolute(rxsc::test::unsubscribed_time,
+ w.schedule_absolute(rxsc::test::unsubscribed_time,
[&res](const rxsc::schedulable& scbl){
res.unsubscribe();
});
@@ -102,11 +103,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(300,
+ w.schedule_absolute(300,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(400,
+ w.schedule_absolute(400,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
@@ -115,11 +116,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(500,
+ w.schedule_absolute(500,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(550,
+ w.schedule_absolute(550,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
@@ -128,17 +129,17 @@ SCENARIO("publish", "[publish][multicast][operators]"){
{
rx::composite_subscription connection;
- sc.schedule_absolute(650,
+ w.schedule_absolute(650,
[connection, &ys](const rxsc::schedulable& scbl){
ys.connect(connection);
});
- sc.schedule_absolute(800,
+ w.schedule_absolute(800,
[connection](const rxsc::schedulable& scbl){
connection.unsubscribe();
});
}
- sc.start();
+ w.start();
THEN("the output only contains items sent while subscribed"){
record items[] = {
diff --git a/Rx/v2/test/operators/take.cpp b/Rx/v2/test/operators/take.cpp
index 6e8ed00..e44bd5b 100644
--- a/Rx/v2/test/operators/take.cpp
+++ b/Rx/v2/test/operators/take.cpp
@@ -15,6 +15,7 @@ namespace rxt=rxcpp::test;
SCENARIO("take 2", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -35,7 +36,7 @@ SCENARIO("take 2", "[take][operators]"){
WHEN("2 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(2)
@@ -71,6 +72,7 @@ SCENARIO("take 2", "[take][operators]"){
SCENARIO("take, complete after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -104,7 +106,7 @@ SCENARIO("take, complete after", "[take][operators]"){
WHEN("20 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(20)
@@ -155,6 +157,7 @@ SCENARIO("take, complete after", "[take][operators]"){
SCENARIO("take, complete same", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -189,7 +192,7 @@ SCENARIO("take, complete same", "[take][operators]"){
WHEN("17 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(17)
@@ -240,6 +243,7 @@ SCENARIO("take, complete same", "[take][operators]"){
SCENARIO("take, complete before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -274,7 +278,7 @@ SCENARIO("take, complete before", "[take][operators]"){
WHEN("10 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(10)
@@ -318,6 +322,7 @@ SCENARIO("take, complete before", "[take][operators]"){
SCENARIO("take, error after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -354,7 +359,7 @@ SCENARIO("take, error after", "[take][operators]"){
WHEN("20 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(20)
@@ -405,6 +410,7 @@ SCENARIO("take, error after", "[take][operators]"){
SCENARIO("take, error same", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -439,7 +445,7 @@ SCENARIO("take, error same", "[take][operators]"){
WHEN("17 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(17)
@@ -490,6 +496,7 @@ SCENARIO("take, error same", "[take][operators]"){
SCENARIO("take, error before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -524,7 +531,7 @@ SCENARIO("take, error before", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -561,6 +568,7 @@ SCENARIO("take, error before", "[take][operators]"){
SCENARIO("take, dispose before", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -594,7 +602,7 @@ SCENARIO("take, dispose before", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -630,6 +638,7 @@ SCENARIO("take, dispose before", "[take][operators]"){
SCENARIO("take, dispose after", "[take][operators]"){
GIVEN("a source"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -663,7 +672,7 @@ SCENARIO("take, dispose after", "[take][operators]"){
WHEN("3 values are taken"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs]() {
return xs
.take(3)
@@ -703,6 +712,7 @@ SCENARIO("take, dispose after", "[take][operators]"){
SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -730,7 +740,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[xs, ys]() {
return xs
.take_until(ys)
@@ -775,6 +785,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -802,7 +813,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -847,6 +858,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -875,7 +887,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]")
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -920,6 +932,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]")
SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -946,7 +959,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -993,6 +1006,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators
SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1018,7 +1032,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1065,6 +1079,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators
SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1087,7 +1102,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1130,6 +1145,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1153,7 +1169,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1196,6 +1212,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1217,7 +1234,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1257,6 +1274,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1277,7 +1295,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1317,6 +1335,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1341,7 +1360,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)
@@ -1384,6 +1403,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat
SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1410,7 +1430,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r, &sourceNotDisposed]() {
return l
.map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v;})
@@ -1442,6 +1462,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un
SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1468,7 +1489,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal"
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r, &signalNotDisposed]() {
return l
.take_until(r
@@ -1501,6 +1522,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal"
SCENARIO("take_until, error some", "[take_until][take][operators]"){
GIVEN("2 sources"){
auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
typedef rxsc::test::messages<int> m;
typedef rxn::subscription life;
typedef m::recorded_type record;
@@ -1525,7 +1547,7 @@ SCENARIO("take_until, error some", "[take_until][take][operators]"){
WHEN("one is taken until the other emits a marble"){
- auto res = sc.start<int>(
+ auto res = w.start<int>(
[l, r]() {
return l
.take_until(r)