diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-15 13:59:34 -0600 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-05-15 13:59:34 -0600 |
commit | f1e239323c3e18b935ba67e8ce15992d23e5107a (patch) | |
tree | 6bb1389eda05f5b8e83ecb024a493b4730c721e3 /Rx/v2/test/operators/publish.cpp | |
parent | bb2b3a00606e19d88546a4db654dc30111e1da5a (diff) | |
download | RxCpp-f1e239323c3e18b935ba67e8ce15992d23e5107a.tar.gz |
add publish test
Diffstat (limited to 'Rx/v2/test/operators/publish.cpp')
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 114 |
1 files changed, 113 insertions, 1 deletions
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 729464d..464f7d2 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -47,7 +47,7 @@ SCENARIO("publish range", "[hide][range][subject][publish][operators]"){ } } -SCENARIO("publish", "[publish][multicast][operators]"){ +SCENARIO("publish basic", "[publish][multicast][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -169,3 +169,115 @@ SCENARIO("publish", "[publish][multicast][operators]"){ } } + +SCENARIO("publish error", "[publish][error][multicast][operators]"){ + GIVEN("a test hot observable of longs"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + typedef rxsc::test::messages<int> m; + typedef rxn::subscription life; + typedef m::recorded_type record; + auto on_next = m::on_next; + auto on_error = m::on_error; + auto on_completed = m::on_completed; + auto subscribe = m::subscribe; + + long invoked = 0; + + std::runtime_error ex("publish on_error"); + + record messages[] = { + on_next(110, 7), + on_next(220, 3), + on_next(280, 4), + on_next(290, 1), + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(410, 13), + on_next(430, 2), + on_next(450, 9), + on_next(520, 11), + on_next(560, 20), + on_error(600, ex) + }; + auto xs = sc.make_hot_observable(messages); + + auto res = w.make_subscriber<int>(); + + rx::connectable_observable<int> ys; + + WHEN("subscribed and then connected"){ + + w.schedule_absolute(rxsc::test::created_time, + [&invoked, &ys, &xs](const rxsc::schedulable& scbl){ + ys = xs.publish().as_dynamic(); + }); + + w.schedule_absolute(rxsc::test::subscribed_time, + [&ys, &res](const rxsc::schedulable& scbl){ + ys.subscribe(res); + }); + + w.schedule_absolute(rxsc::test::unsubscribed_time, + [&res](const rxsc::schedulable& scbl){ + res.unsubscribe(); + }); + + { + rx::composite_subscription connection; + + w.schedule_absolute(300, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + w.schedule_absolute(400, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + { + rx::composite_subscription connection; + + w.schedule_absolute(500, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + w.schedule_absolute(800, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + w.start(); + + THEN("the output only contains items sent while subscribed"){ + record items[] = { + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(520, 11), + on_next(560, 20), + on_error(600, ex) + }; + auto required = rxu::to_vector(items); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there were 3 subscription/unsubscription"){ + life items[] = { + subscribe(300, 400), + subscribe(500, 600) + }; + auto required = rxu::to_vector(items); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |