summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2017-01-12 12:34:24 -0800
committerGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-13 02:44:52 +0300
commitbbd51e36354e9f159e4408996014e0577f656431 (patch)
tree13fab15fb277aab359a1f1ef3c2304fd217f603b /Rx/v2
parente1360ef0de064ef55c119791fe55fa6674bada9e (diff)
downloadRxCpp-bbd51e36354e9f159e4408996014e0577f656431.tar.gz
print async during subject perf test
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/test/subjects/subject.cpp37
1 files changed, 28 insertions, 9 deletions
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp
index f13a12a..d069624 100644
--- a/Rx/v2/test/subjects/subject.cpp
+++ b/Rx/v2/test/subjects/subject.cpp
@@ -2,6 +2,8 @@
#include "../test.h"
+#include <rxcpp/operators/rx-finally.hpp>
+
#include <future>
@@ -309,6 +311,7 @@ SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]")
#if RXCPP_SUBJECT_TEST_ASYNC
std::vector<std::future<int>> f(n);
+ std::atomic<int> asyncUnsubscriptions{0};
#endif
auto o = sub.get_subscriber();
@@ -320,12 +323,15 @@ SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]")
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub, o]() {
+ f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
auto source = sub.get_observable();
while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
- source.subscribe(
+ source
+ .finally([&asyncUnsubscriptions](){
+ ++asyncUnsubscriptions;})
+ .subscribe(
rx::make_subscriber<int>(
cs,
[cs](int){
@@ -354,7 +360,11 @@ SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]")
o.on_completed();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
+ std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
+#if RXCPP_SUBJECT_TEST_ASYNC
+ std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
+#endif
+ std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
@@ -374,6 +384,7 @@ SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){
#if RXCPP_SUBJECT_TEST_ASYNC
std::vector<std::future<int>> f(n);
+ std::atomic<int> asyncUnsubscriptions{0};
#endif
auto o = sub.get_subscriber();
@@ -385,14 +396,18 @@ SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub, o]() {
+ f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
- sub.get_observable().subscribe(cs, [cs](int){
- cs.unsubscribe();
- },
- [](std::exception_ptr){abort();});
+ sub.get_observable()
+ .finally([&asyncUnsubscriptions](){
+ ++asyncUnsubscriptions;})
+ .subscribe(cs,
+ [cs](int){
+ cs.unsubscribe();
+ },
+ [](std::exception_ptr){abort();});
}
return 0;
});
@@ -418,7 +433,11 @@ SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){
.subscribe(o);
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "range -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
+ std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
+#if RXCPP_SUBJECT_TEST_ASYNC
+ std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
+#endif
+ std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
}
}
}