diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-04 19:48:45 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-04 19:48:45 -0700 |
commit | 0a583f0cb92926aa916302f66c8e5a13f7e43ccd (patch) | |
tree | 737b348246448cb8887326da2cc3a357ed3c200d /Rx/v2/test/operators/publish.cpp | |
parent | 0f12d12602aef386e1c3cdbea4f899144fc50129 (diff) | |
download | RxCpp-0a583f0cb92926aa916302f66c8e5a13f7e43ccd.tar.gz |
add dynamic_connectable_observable
Diffstat (limited to 'Rx/v2/test/operators/publish.cpp')
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 145 |
1 files changed, 144 insertions, 1 deletions
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 5a0fd2b..9739646 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -16,8 +16,28 @@ namespace rxt=rxcpp::test; SCENARIO("publish range", "[range][subject][publish][operators]"){ GIVEN("a range"){ WHEN("published"){ - auto published = rxs::range<int>(0, 1000).publish().connect_now(); + auto published = rxs::range<int>(0, 10).publish(); + std::cout << "subscribe to published" << std::endl; + published.subscribe( + // on_next + [](int v){std::cout << v << ", ";}, + // on_completed + [](){std::cout << " done." << std::endl;}); std::cout << "connect to published" << std::endl; + published.connect(); + } + WHEN("ref_count is used"){ + auto published = rxs::range<int>(0, 10).publish().ref_count(); + std::cout << "subscribe to ref_count" << std::endl; + published.subscribe( + // on_next + [](int v){std::cout << v << ", ";}, + // on_completed + [](){std::cout << " done." << std::endl;}); + } + WHEN("connect_now is used"){ + auto published = rxs::range<int>(0, 10).publish().connect_now(); + std::cout << "subscribe to connect_now" << std::endl; published.subscribe( // on_next [](int v){std::cout << v << ", ";}, @@ -26,3 +46,126 @@ SCENARIO("publish range", "[range][subject][publish][operators]"){ } } } + +SCENARIO("publish", "[publish][multicast][operators]"){ + GIVEN("a test hot observable of longs"){ + auto sc = rxsc::make_test(); + typedef rxsc::test::messages<int> m; + typedef rxn::subscription life; + typedef m::recorded_type record; + auto on_next = m::on_next; + auto on_error = m::on_error; + auto on_completed = m::on_completed; + auto subscribe = m::subscribe; + + long invoked = 0; + + record messages[] = { + on_next(110, 7), + on_next(220, 3), + on_next(280, 4), + on_next(290, 1), + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(410, 13), + on_next(430, 2), + on_next(450, 9), + on_next(520, 11), + on_next(560, 20), + on_completed(600) + }; + auto xs = sc.make_hot_observable(messages); + + auto res = sc.make_subscriber<int>(); + + rx::connectable_observable<int> ys; + + WHEN("subscribed and then connected"){ + + sc.schedule_absolute(rxsc::test::created_time, + [&invoked, &ys, &xs](const rxsc::schedulable& scbl){ + ys = xs.publish().as_dynamic(); + //ys = xs.publish_last().as_dynamic(); + }); + + sc.schedule_absolute(rxsc::test::subscribed_time, + [&ys, &res](const rxsc::schedulable& scbl){ + ys.subscribe(res); + }); + + sc.schedule_absolute(rxsc::test::unsubscribed_time, + [&res](const rxsc::schedulable& scbl){ + res.unsubscribe(); + }); + + { + rx::composite_subscription connection; + + sc.schedule_absolute(300, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(400, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + { + rx::composite_subscription connection; + + sc.schedule_absolute(500, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(550, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + { + rx::composite_subscription connection; + + sc.schedule_absolute(650, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(800, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + sc.start(); + + THEN("the output only contains items sent while subscribed"){ + record items[] = { + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(520, 11) + }; + auto required = rxu::to_vector(items); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there were 3 subscription/unsubscription"){ + life items[] = { + subscribe(300, 400), + subscribe(500, 550), + subscribe(650, 800) + }; + auto required = rxu::to_vector(items); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + |