summaryrefslogtreecommitdiff
path: root/Rx/v2/test
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test')
-rw-r--r--Rx/v2/test/operators/merge_delay_error.cpp6
-rw-r--r--Rx/v2/test/operators/observe_on.cpp56
2 files changed, 58 insertions, 4 deletions
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index 7c7a58d..b53b884 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000;
//merge_delay_error must work the very same way as `merge()` except the error handling
-SCENARIO("merge delay error completes", "[merge][join][operators]"){
+SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -117,7 +117,7 @@ SCENARIO("merge delay error completes", "[merge][join][operators]"){
}
}
-SCENARIO("variadic merge delay error completes with error", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -211,7 +211,7 @@ SCENARIO("variadic merge delay error completes with error", "[merge][join][opera
}
}
-SCENARIO("variadic merge delay error completes with 2 errors", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index 644ab93..ffa85aa 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 100000;
@@ -136,4 +137,57 @@ SCENARIO("stream observe_on", "[observe][observe_on]"){
}
}
-} \ No newline at end of file
+}
+
+class nocompare {
+public:
+ int v;
+};
+
+SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::observe_on_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<nocompare> in;
+ const rxsc::test::messages<int> out;
+
+ auto xs = sc.make_hot_observable({
+ in.next(150, nocompare{1}),
+ in.next(210, nocompare{2}),
+ in.next(240, nocompare{3}),
+ in.completed(300)
+ });
+
+ WHEN("observe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::observe_on(so)
+ | rxo::map([](nocompare v){ return v.v; })
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ out.next(211, 2),
+ out.next(241, 3),
+ out.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ out.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}