summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/publish.cpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-05-15 13:59:34 -0600
committerKirk Shoop <kirk.shoop@microsoft.com>2014-05-15 13:59:34 -0600
commitf1e239323c3e18b935ba67e8ce15992d23e5107a (patch)
tree6bb1389eda05f5b8e83ecb024a493b4730c721e3 /Rx/v2/test/operators/publish.cpp
parentbb2b3a00606e19d88546a4db654dc30111e1da5a (diff)
downloadRxCpp-f1e239323c3e18b935ba67e8ce15992d23e5107a.tar.gz
add publish test
Diffstat (limited to 'Rx/v2/test/operators/publish.cpp')
-rw-r--r--Rx/v2/test/operators/publish.cpp114
1 files changed, 113 insertions, 1 deletions
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 729464d..464f7d2 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -47,7 +47,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){
}
}
-SCENARIO("publish", "[publish][multicast][operators]"){
+SCENARIO("publish basic", "[publish][multicast][operators]"){
GIVEN("a test hot observable of longs"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -169,3 +169,115 @@ SCENARIO("publish", "[publish][multicast][operators]"){
}
}
+
+SCENARIO("publish error", "[publish][error][multicast][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ typedef rxsc::test::messages<int> m;
+ typedef rxn::subscription life;
+ typedef m::recorded_type record;
+ auto on_next = m::on_next;
+ auto on_error = m::on_error;
+ auto on_completed = m::on_completed;
+ auto subscribe = m::subscribe;
+
+ long invoked = 0;
+
+ std::runtime_error ex("publish on_error");
+
+ record messages[] = {
+ on_next(110, 7),
+ on_next(220, 3),
+ on_next(280, 4),
+ on_next(290, 1),
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(410, 13),
+ on_next(430, 2),
+ on_next(450, 9),
+ on_next(520, 11),
+ on_next(560, 20),
+ on_error(600, ex)
+ };
+ auto xs = sc.make_hot_observable(messages);
+
+ auto res = w.make_subscriber<int>();
+
+ rx::connectable_observable<int> ys;
+
+ WHEN("subscribed and then connected"){
+
+ w.schedule_absolute(rxsc::test::created_time,
+ [&invoked, &ys, &xs](const rxsc::schedulable& scbl){
+ ys = xs.publish().as_dynamic();
+ });
+
+ w.schedule_absolute(rxsc::test::subscribed_time,
+ [&ys, &res](const rxsc::schedulable& scbl){
+ ys.subscribe(res);
+ });
+
+ w.schedule_absolute(rxsc::test::unsubscribed_time,
+ [&res](const rxsc::schedulable& scbl){
+ res.unsubscribe();
+ });
+
+ {
+ rx::composite_subscription connection;
+
+ w.schedule_absolute(300,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ w.schedule_absolute(400,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ {
+ rx::composite_subscription connection;
+
+ w.schedule_absolute(500,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ w.schedule_absolute(800,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ w.start();
+
+ THEN("the output only contains items sent while subscribed"){
+ record items[] = {
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(520, 11),
+ on_next(560, 20),
+ on_error(600, ex)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were 3 subscription/unsubscription"){
+ life items[] = {
+ subscribe(300, 400),
+ subscribe(500, 600)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}