diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-29 23:46:04 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-01 14:03:55 -0700 |
commit | 09b16daca270ed0ade1f41f76c0910d6442e95de (patch) | |
tree | fba873aeed4c42c57dde9c5cccc7b1203ae8443f /Rx/v2/test/operators | |
parent | 15807b31b06a4558b55356a3d69c0287f6177f7d (diff) | |
download | RxCpp-09b16daca270ed0ade1f41f76c0910d6442e95de.tar.gz |
fix virtual_time and test scheduler
Diffstat (limited to 'Rx/v2/test/operators')
-rw-r--r-- | Rx/v2/test/operators/filter.cpp | 23 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 25 | ||||
-rw-r--r-- | Rx/v2/test/operators/map.cpp | 3 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 23 | ||||
-rw-r--r-- | Rx/v2/test/operators/take.cpp | 66 |
5 files changed, 90 insertions, 50 deletions
diff --git a/Rx/v2/test/operators/filter.cpp b/Rx/v2/test/operators/filter.cpp index ac101ef..3e9e61d 100644 --- a/Rx/v2/test/operators/filter.cpp +++ b/Rx/v2/test/operators/filter.cpp @@ -31,6 +31,7 @@ bool IsPrime(int x) SCENARIO("filter stops on completion", "[filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -61,7 +62,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){ auto xs = sc.make_hot_observable(messages); WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [&xs, &invoked]() { #if 0 && RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -119,6 +120,7 @@ SCENARIO("filter stops on completion", "[filter][operators]"){ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -146,7 +148,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [&xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -199,6 +201,7 @@ SCENARIO("filter stops on disposal", "[where][filter][operators]"){ SCENARIO("filter stops on error", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -232,7 +235,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -286,6 +289,7 @@ SCENARIO("filter stops on error", "[where][filter][operators]"){ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -319,7 +323,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ WHEN("filtered to ints that are primes"){ - auto res = sc.start<int>( + auto res = w.start<int>( [ex, xs, &invoked]() { #if RXCPP_USE_OBSERVABLE_MEMBERS return xs @@ -377,6 +381,7 @@ SCENARIO("filter stops on throw from predicate", "[where][filter][operators]"){ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -406,13 +411,13 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]") }; auto xs = sc.make_hot_observable(rxu::to_vector(messages)); - auto res = sc.make_subscriber<int>(); + auto res = w.make_subscriber<int>(); rx::observable<int, rx::dynamic_observable<int>> ys; WHEN("filtered to ints that are primes"){ - sc.schedule_absolute(rxsc::test::created_time, + w.schedule_absolute(rxsc::test::created_time, [&invoked, &res, &ys, &xs](const rxsc::schedulable& scbl) { #if RXCPP_USE_OBSERVABLE_MEMBERS ys = xs @@ -433,15 +438,15 @@ SCENARIO("filter stops on dispose from predicate", "[where][filter][operators]") #endif }); - sc.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) { + w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl) { ys.subscribe(res); }); - sc.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) { + w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl) { res.unsubscribe(); }); - sc.start(); + w.start(); THEN("the output only contains primes"){ record items[] = { diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 7e6401e..ead0b0e 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -12,7 +12,7 @@ namespace rxt=rxcpp::test; #include "catch.hpp" -static const int static_tripletCount = 500; +static const int static_tripletCount = 2; SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; @@ -32,6 +32,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){ for(int y = x; y <= z; ++y) { ++c; + std::cout << z << "," << y << "," << x << std::endl; if(x*x + y*y == z*z) { //result += (x + y + z); @@ -60,6 +61,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ using namespace std::chrono; typedef steady_clock clock; + std::vector<std::tuple<int, int, int>> tried; int c = 0; int ct = 0; int n = 1; @@ -67,10 +69,12 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ auto triples = rxs::range(1) .flat_map( - [&c](int z){ return rxs::range(1, z) + [&c, &tried](int z){ return rxs::range(1, z) .flat_map( - [&c, z](int x){ return rxs::range(x, z) - .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) + [&c, &tried, z](int x){ return rxs::range(x, z) + .filter([&c, &tried, z, x](int y){++c; + tried.push_back(std::make_tuple(z, y, x)); + return x*x + y*y == z*z;}) .map([z, x](int y){return std::make_tuple(x, y, z);});}, [](int x, std::tuple<int,int,int> triplet){return triplet;});}, [](int z, std::tuple<int,int,int> triplet){return triplet;}); @@ -81,6 +85,10 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ //int x,y,z; std::tie(x,y,z) = triplet; std::cout << x << "," << y << "," << z << std::endl; }, [](std::exception_ptr){abort();}); + std::sort(tried.begin(), tried.end()); + for (auto& t : tried) { + int x,y,z; std::tie(z,y,x) = t; std::cout << z << "," << y << "," << x << std::endl; + } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - duration_cast<milliseconds>(start.time_since_epoch()); @@ -93,6 +101,7 @@ SCENARIO("pythagorian ranges", "[hide][for][pythagorian][perf]"){ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -127,7 +136,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map( @@ -193,6 +202,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -227,7 +237,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) @@ -294,6 +304,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxsc::test::messages<std::string> ms; typedef rxn::subscription life; @@ -330,7 +341,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ WHEN("each int is mapped to the strings"){ - auto res = sc.start<std::string>( + auto res = w.start<std::string>( [&]() { return xs .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp index 354fd24..db53387 100644 --- a/Rx/v2/test/operators/map.cpp +++ b/Rx/v2/test/operators/map.cpp @@ -16,6 +16,7 @@ namespace rxt=rxcpp::test; SCENARIO("map stops on completion", "[map][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -41,7 +42,7 @@ SCENARIO("map stops on completion", "[map][operators]"){ WHEN("mapped to ints that are one larger"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, &invoked]() { return xs .map([&invoked](int x) { diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index f043d18..02d799b 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -50,6 +50,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){ SCENARIO("publish", "[publish][multicast][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -77,24 +78,24 @@ SCENARIO("publish", "[publish][multicast][operators]"){ }; auto xs = sc.make_hot_observable(messages); - auto res = sc.make_subscriber<int>(); + auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ - sc.schedule_absolute(rxsc::test::created_time, + w.schedule_absolute(rxsc::test::created_time, [&invoked, &ys, &xs](const rxsc::schedulable& scbl){ ys = xs.publish().as_dynamic(); //ys = xs.publish_last().as_dynamic(); }); - sc.schedule_absolute(rxsc::test::subscribed_time, + w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable& scbl){ ys.subscribe(res); }); - sc.schedule_absolute(rxsc::test::unsubscribed_time, + w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable& scbl){ res.unsubscribe(); }); @@ -102,11 +103,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(300, + w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(400, + w.schedule_absolute(400, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); @@ -115,11 +116,11 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(500, + w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(550, + w.schedule_absolute(550, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); @@ -128,17 +129,17 @@ SCENARIO("publish", "[publish][multicast][operators]"){ { rx::composite_subscription connection; - sc.schedule_absolute(650, + w.schedule_absolute(650, [connection, &ys](const rxsc::schedulable& scbl){ ys.connect(connection); }); - sc.schedule_absolute(800, + w.schedule_absolute(800, [connection](const rxsc::schedulable& scbl){ connection.unsubscribe(); }); } - sc.start(); + w.start(); THEN("the output only contains items sent while subscribed"){ record items[] = { diff --git a/Rx/v2/test/operators/take.cpp b/Rx/v2/test/operators/take.cpp index 6e8ed00..e44bd5b 100644 --- a/Rx/v2/test/operators/take.cpp +++ b/Rx/v2/test/operators/take.cpp @@ -15,6 +15,7 @@ namespace rxt=rxcpp::test; SCENARIO("take 2", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -35,7 +36,7 @@ SCENARIO("take 2", "[take][operators]"){ WHEN("2 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(2) @@ -71,6 +72,7 @@ SCENARIO("take 2", "[take][operators]"){ SCENARIO("take, complete after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -104,7 +106,7 @@ SCENARIO("take, complete after", "[take][operators]"){ WHEN("20 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(20) @@ -155,6 +157,7 @@ SCENARIO("take, complete after", "[take][operators]"){ SCENARIO("take, complete same", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -189,7 +192,7 @@ SCENARIO("take, complete same", "[take][operators]"){ WHEN("17 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(17) @@ -240,6 +243,7 @@ SCENARIO("take, complete same", "[take][operators]"){ SCENARIO("take, complete before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -274,7 +278,7 @@ SCENARIO("take, complete before", "[take][operators]"){ WHEN("10 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(10) @@ -318,6 +322,7 @@ SCENARIO("take, complete before", "[take][operators]"){ SCENARIO("take, error after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -354,7 +359,7 @@ SCENARIO("take, error after", "[take][operators]"){ WHEN("20 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(20) @@ -405,6 +410,7 @@ SCENARIO("take, error after", "[take][operators]"){ SCENARIO("take, error same", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -439,7 +445,7 @@ SCENARIO("take, error same", "[take][operators]"){ WHEN("17 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(17) @@ -490,6 +496,7 @@ SCENARIO("take, error same", "[take][operators]"){ SCENARIO("take, error before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -524,7 +531,7 @@ SCENARIO("take, error before", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -561,6 +568,7 @@ SCENARIO("take, error before", "[take][operators]"){ SCENARIO("take, dispose before", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -594,7 +602,7 @@ SCENARIO("take, dispose before", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -630,6 +638,7 @@ SCENARIO("take, dispose before", "[take][operators]"){ SCENARIO("take, dispose after", "[take][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -663,7 +672,7 @@ SCENARIO("take, dispose after", "[take][operators]"){ WHEN("3 values are taken"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs]() { return xs .take(3) @@ -703,6 +712,7 @@ SCENARIO("take, dispose after", "[take][operators]"){ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -730,7 +740,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [xs, ys]() { return xs .take_until(ys) @@ -775,6 +785,7 @@ SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -802,7 +813,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -847,6 +858,7 @@ SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -875,7 +887,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]") WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -920,6 +932,7 @@ SCENARIO("take_until, preempt some data error", "[take_until][take][operators]") SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -946,7 +959,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -993,6 +1006,7 @@ SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1018,7 +1032,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1065,6 +1079,7 @@ SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1087,7 +1102,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1130,6 +1145,7 @@ SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1153,7 +1169,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1196,6 +1212,7 @@ SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1217,7 +1234,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1257,6 +1274,7 @@ SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1277,7 +1295,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1317,6 +1335,7 @@ SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1341,7 +1360,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) @@ -1384,6 +1403,7 @@ SCENARIO("take_until, preempt before first produced", "[take_until][take][operat SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1410,7 +1430,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r, &sourceNotDisposed]() { return l .map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v;}) @@ -1442,6 +1462,7 @@ SCENARIO("take_until, preempt before first produced, remain silent and proper un SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1468,7 +1489,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal" WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r, &signalNotDisposed]() { return l .take_until(r @@ -1501,6 +1522,7 @@ SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal" SCENARIO("take_until, error some", "[take_until][take][operators]"){ GIVEN("2 sources"){ auto sc = rxsc::make_test(); + auto w = sc.create_worker(); typedef rxsc::test::messages<int> m; typedef rxn::subscription life; typedef m::recorded_type record; @@ -1525,7 +1547,7 @@ SCENARIO("take_until, error some", "[take_until][take][operators]"){ WHEN("one is taken until the other emits a marble"){ - auto res = sc.start<int>( + auto res = w.start<int>( [l, r]() { return l .take_until(r) |