summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-24 19:39:47 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-24 08:39:47 -0800
commit6f4ca1198079ed2a439f53865d494729b975bbb7 (patch)
tree37da80a3e6fd7e8ffdeb96d33faf5ac986c84b06 /Rx/v2/test/operators
parentff18d4640e5f2f96eba2fe17fd5d91132f44dc0f (diff)
downloadRxCpp-6f4ca1198079ed2a439f53865d494729b975bbb7.tar.gz
decouple observe_on from observable (#335)
* decouple observe_on from observable * decouple observe_on from observable - fix msvc
Diffstat (limited to 'Rx/v2/test/operators')
-rw-r--r--Rx/v2/test/operators/concat.cpp1
-rw-r--r--Rx/v2/test/operators/concat_map.cpp1
-rw-r--r--Rx/v2/test/operators/flat_map.cpp1
-rw-r--r--Rx/v2/test/operators/group_by.cpp1
-rw-r--r--Rx/v2/test/operators/merge.cpp1
-rw-r--r--Rx/v2/test/operators/observe_on.cpp91
-rw-r--r--Rx/v2/test/operators/subscribe_on.cpp1
7 files changed, 97 insertions, 0 deletions
diff --git a/Rx/v2/test/operators/concat.cpp b/Rx/v2/test/operators/concat.cpp
index c8ff234..86f4a7e 100644
--- a/Rx/v2/test/operators/concat.cpp
+++ b/Rx/v2/test/operators/concat.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-concat.hpp>
#include <rxcpp/operators/rx-reduce.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 1000000;
diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp
index 761fdff..0a3ee6b 100644
--- a/Rx/v2/test/operators/concat_map.cpp
+++ b/Rx/v2/test/operators/concat_map.cpp
@@ -4,6 +4,7 @@
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-take.hpp>
#include <rxcpp/operators/rx-concat_map.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
static const int static_tripletCount = 100;
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 03cb36e..e2e3cf8 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -4,6 +4,7 @@
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-take.hpp>
#include <rxcpp/operators/rx-flat_map.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
static const int static_tripletCount = 100;
diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp
index 2704aa7..606d7df 100644
--- a/Rx/v2/test/operators/group_by.cpp
+++ b/Rx/v2/test/operators/group_by.cpp
@@ -6,6 +6,7 @@
#include <rxcpp/operators/rx-merge.hpp>
#include <rxcpp/operators/rx-take.hpp>
#include <rxcpp/operators/rx-start_with.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
#include <locale>
diff --git a/Rx/v2/test/operators/merge.cpp b/Rx/v2/test/operators/merge.cpp
index 60e51a2..9a7f28c 100644
--- a/Rx/v2/test/operators/merge.cpp
+++ b/Rx/v2/test/operators/merge.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-merge.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 1000000;
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index 5d56448..644ab93 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 100000;
@@ -46,3 +47,93 @@ SCENARIO("range observed on new_thread", "[hide][range][observe_on_debug][observ
}
}
}
+
+SCENARIO("observe_on", "[observe][observe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("subscribe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ .observe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(211, 2),
+ on.next(241, 3),
+ on.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("stream observe_on", "[observe][observe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("observe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::observe_on(so);
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(211, 2),
+ on.next(241, 3),
+ on.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+} \ No newline at end of file
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
index ded1ea4..baa66e2 100644
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ b/Rx/v2/test/operators/subscribe_on.cpp
@@ -2,6 +2,7 @@
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-subscribe_on.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
static const int static_subscriptions = 50000;