diff options
author | Elliott Hughes <enh@google.com> | 2022-09-01 14:59:27 +0000 |
---|---|---|
committer | Elliott Hughes <enh@google.com> | 2022-09-01 15:00:13 +0000 |
commit | a6661d2cf339b71de3e25f80fb927e029468c603 (patch) | |
tree | c0b1aac3621dd301e9fb7add9170d8879cc97cbe /Rx/v2/test/operators/subscribe_on.cpp | |
parent | de986f59e99bd585280a061f545150ee1268f012 (diff) | |
download | RxCpp-a6661d2cf339b71de3e25f80fb927e029468c603.tar.gz |
Do not necromance this without going through the go/android-3p process
again.
Test: treehugger
Change-Id: Id813d74d7d2597e0636b925430117c94bea3fa5a
Diffstat (limited to 'Rx/v2/test/operators/subscribe_on.cpp')
-rw-r--r-- | Rx/v2/test/operators/subscribe_on.cpp | 180 |
1 files changed, 0 insertions, 180 deletions
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp deleted file mode 100644 index ef8a8c7..0000000 --- a/Rx/v2/test/operators/subscribe_on.cpp +++ /dev/null @@ -1,180 +0,0 @@ -#include "../test.h" -#include <rxcpp/operators/rx-reduce.hpp> -#include <rxcpp/operators/rx-map.hpp> -#include <rxcpp/operators/rx-subscribe_on.hpp> -#include <rxcpp/operators/rx-observe_on.hpp> - -#include <sstream> - -static const int static_subscriptions = 50000; - -SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){ - const int& subscriptions = static_subscriptions; - GIVEN("a for loop"){ - WHEN("subscribe 50K times"){ - using namespace std::chrono; - typedef steady_clock clock; - - int runs = 10; - - for (;runs > 0; --runs) { - - int c = 0; - int n = 1; - auto start = clock::now(); - for (int i = 0; i < subscriptions; ++i) { - c += rx::observable<>::just(1) - .map([](int i) { - std::stringstream serializer; - serializer << i; - return serializer.str(); - }) - .map([](const std::string& s) { - int i; - std::stringstream(s) >> i; - return i; - }) - .subscribe_on(rx::observe_on_event_loop()) - .observe_on(rx::observe_on_event_loop()) - .as_blocking() - .count(); - } - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - REQUIRE(subscriptions == c); - std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - } - } - } -} - -SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){ - const int& subscriptions = static_subscriptions; - GIVEN("a for loop"){ - WHEN("subscribe 50K times"){ - using namespace std::chrono; - typedef steady_clock clock; - - int runs = 10; - - for (;runs > 0; --runs) { - - int c = 0; - int n = 1; - auto start = clock::now(); - - for (int i = 0; i < subscriptions; ++i) { - c += rx::observable<>:: - just(1). - map([](int i) { - std::stringstream serializer; - serializer << i; - return serializer.str(); - }). - map([](const std::string& s) { - int i; - std::stringstream(s) >> i; - return i; - }). - subscribe_on(rx::observe_on_event_loop()). - as_blocking(). - count(); - } - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - REQUIRE(subscriptions == c); - std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - } - } - } -} - -SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){ - GIVEN("a source"){ - auto sc = rxsc::make_test(); - auto so = rx::synchronize_in_one_worker(sc); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(150, 1), - on.next(210, 2), - on.next(240, 3), - on.completed(300) - }); - - WHEN("subscribe_on is specified"){ - - auto res = w.start( - [so, xs]() { - return xs - .subscribe_on(so); - } - ); - - THEN("the output contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(210, 2), - on.next(240, 3), - on.completed(300) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription to the source"){ - auto required = rxu::to_vector({ - on.subscribe(201, 300) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){ - GIVEN("a source"){ - auto sc = rxsc::make_test(); - auto so = rx::synchronize_in_one_worker(sc); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(150, 1), - on.next(210, 2), - on.next(240, 3), - on.completed(300) - }); - - WHEN("subscribe_on is specified"){ - - auto res = w.start( - [so, xs]() { - return xs - | rxo::subscribe_on(so); - } - ); - - THEN("the output contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(210, 2), - on.next(240, 3), - on.completed(300) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription to the source"){ - auto required = rxu::to_vector({ - on.subscribe(201, 300) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} |