summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/subscribe_on.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test/operators/subscribe_on.cpp')
-rw-r--r--Rx/v2/test/operators/subscribe_on.cpp91
1 files changed, 91 insertions, 0 deletions
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
index 80e9b2c..ded1ea4 100644
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ b/Rx/v2/test/operators/subscribe_on.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-map.hpp>
+#include <rxcpp/operators/rx-subscribe_on.hpp>
static const int static_subscriptions = 50000;
@@ -84,3 +85,93 @@ SCENARIO("for loop subscribes to map with subscribe_on", "[hide][subscribe_on_on
}
}
}
+
+SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("subscribe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ .subscribe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(201, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("subscribe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::subscribe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(201, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}