diff options
Diffstat (limited to 'Rx/v2/test/operators')
-rw-r--r-- | Rx/v2/test/operators/concat.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/group_by.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/merge.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/observe_on.cpp | 91 | ||||
-rw-r--r-- | Rx/v2/test/operators/subscribe_on.cpp | 1 |
7 files changed, 97 insertions, 0 deletions
diff --git a/Rx/v2/test/operators/concat.cpp b/Rx/v2/test/operators/concat.cpp index c8ff234..86f4a7e 100644 --- a/Rx/v2/test/operators/concat.cpp +++ b/Rx/v2/test/operators/concat.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-concat.hpp> #include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 1000000; diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp index 761fdff..0a3ee6b 100644 --- a/Rx/v2/test/operators/concat_map.cpp +++ b/Rx/v2/test/operators/concat_map.cpp @@ -4,6 +4,7 @@ #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-concat_map.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 03cb36e..e2e3cf8 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -4,6 +4,7 @@ #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-flat_map.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp index 2704aa7..606d7df 100644 --- a/Rx/v2/test/operators/group_by.cpp +++ b/Rx/v2/test/operators/group_by.cpp @@ -6,6 +6,7 @@ #include <rxcpp/operators/rx-merge.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-start_with.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> #include <locale> diff --git a/Rx/v2/test/operators/merge.cpp b/Rx/v2/test/operators/merge.cpp index 60e51a2..9a7f28c 100644 --- a/Rx/v2/test/operators/merge.cpp +++ b/Rx/v2/test/operators/merge.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-merge.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 1000000; diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp index 5d56448..644ab93 100644 --- a/Rx/v2/test/operators/observe_on.cpp +++ b/Rx/v2/test/operators/observe_on.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include <rxcpp/operators/rx-take.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 100000; @@ -46,3 +47,93 @@ SCENARIO("range observed on new_thread", "[hide][range][observe_on_debug][observ } } } + +SCENARIO("observe_on", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("subscribe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + .observe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("stream observe_on", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("observe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + | rxo::observe_on(so); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(211, 2), + on.next(241, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +}
\ No newline at end of file diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp index ded1ea4..baa66e2 100644 --- a/Rx/v2/test/operators/subscribe_on.cpp +++ b/Rx/v2/test/operators/subscribe_on.cpp @@ -2,6 +2,7 @@ #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-subscribe_on.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> static const int static_subscriptions = 50000; |