summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/publish.cpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-04-04 19:48:45 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-04-04 19:48:45 -0700
commit0a583f0cb92926aa916302f66c8e5a13f7e43ccd (patch)
tree737b348246448cb8887326da2cc3a357ed3c200d /Rx/v2/test/operators/publish.cpp
parent0f12d12602aef386e1c3cdbea4f899144fc50129 (diff)
downloadRxCpp-0a583f0cb92926aa916302f66c8e5a13f7e43ccd.tar.gz
add dynamic_connectable_observable
Diffstat (limited to 'Rx/v2/test/operators/publish.cpp')
-rw-r--r--Rx/v2/test/operators/publish.cpp145
1 files changed, 144 insertions, 1 deletions
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 5a0fd2b..9739646 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -16,8 +16,28 @@ namespace rxt=rxcpp::test;
SCENARIO("publish range", "[range][subject][publish][operators]"){
GIVEN("a range"){
WHEN("published"){
- auto published = rxs::range<int>(0, 1000).publish().connect_now();
+ auto published = rxs::range<int>(0, 10).publish();
+ std::cout << "subscribe to published" << std::endl;
+ published.subscribe(
+ // on_next
+ [](int v){std::cout << v << ", ";},
+ // on_completed
+ [](){std::cout << " done." << std::endl;});
std::cout << "connect to published" << std::endl;
+ published.connect();
+ }
+ WHEN("ref_count is used"){
+ auto published = rxs::range<int>(0, 10).publish().ref_count();
+ std::cout << "subscribe to ref_count" << std::endl;
+ published.subscribe(
+ // on_next
+ [](int v){std::cout << v << ", ";},
+ // on_completed
+ [](){std::cout << " done." << std::endl;});
+ }
+ WHEN("connect_now is used"){
+ auto published = rxs::range<int>(0, 10).publish().connect_now();
+ std::cout << "subscribe to connect_now" << std::endl;
published.subscribe(
// on_next
[](int v){std::cout << v << ", ";},
@@ -26,3 +46,126 @@ SCENARIO("publish range", "[range][subject][publish][operators]"){
}
}
}
+
+SCENARIO("publish", "[publish][multicast][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto sc = rxsc::make_test();
+ typedef rxsc::test::messages<int> m;
+ typedef rxn::subscription life;
+ typedef m::recorded_type record;
+ auto on_next = m::on_next;
+ auto on_error = m::on_error;
+ auto on_completed = m::on_completed;
+ auto subscribe = m::subscribe;
+
+ long invoked = 0;
+
+ record messages[] = {
+ on_next(110, 7),
+ on_next(220, 3),
+ on_next(280, 4),
+ on_next(290, 1),
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(410, 13),
+ on_next(430, 2),
+ on_next(450, 9),
+ on_next(520, 11),
+ on_next(560, 20),
+ on_completed(600)
+ };
+ auto xs = sc.make_hot_observable(messages);
+
+ auto res = sc.make_subscriber<int>();
+
+ rx::connectable_observable<int> ys;
+
+ WHEN("subscribed and then connected"){
+
+ sc.schedule_absolute(rxsc::test::created_time,
+ [&invoked, &ys, &xs](const rxsc::schedulable& scbl){
+ ys = xs.publish().as_dynamic();
+ //ys = xs.publish_last().as_dynamic();
+ });
+
+ sc.schedule_absolute(rxsc::test::subscribed_time,
+ [&ys, &res](const rxsc::schedulable& scbl){
+ ys.subscribe(res);
+ });
+
+ sc.schedule_absolute(rxsc::test::unsubscribed_time,
+ [&res](const rxsc::schedulable& scbl){
+ res.unsubscribe();
+ });
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(300,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(400,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(500,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(550,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(650,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(800,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ sc.start();
+
+ THEN("the output only contains items sent while subscribed"){
+ record items[] = {
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(520, 11)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were 3 subscription/unsubscription"){
+ life items[] = {
+ subscribe(300, 400),
+ subscribe(500, 550),
+ subscribe(650, 800)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+