summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/subscribe_on.cpp
blob: 80e9b2c70a77e5e8cc6ae410ede5e15bc7cc0c23 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-map.hpp>

static const int static_subscriptions = 50000;

SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 50K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            int runs = 10;

            for (;runs > 0; --runs) {

                int c = 0;
                int n = 1;
                auto start = clock::now();
                for (int i = 0; i < subscriptions; ++i) {
                    c += rx::observable<>::just(1)
                        .map([](int i) {
                            std::stringstream serializer;
                            serializer << i;
                            return serializer.str();
                        })
                        .map([](const std::string& s) {
                            int i;
                            std::stringstream(s) >> i;
                            return i;
                        })
                        .subscribe_on(rx::observe_on_event_loop())
                        .observe_on(rx::observe_on_event_loop())
                        .as_blocking()
                        .count();
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                REQUIRE(subscriptions == c);
                std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
            }
        }
    }
}

SCENARIO("for loop subscribes to map with subscribe_on", "[hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 50K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            int runs = 10;

            for (;runs > 0; --runs) {

                int c = 0;
                int n = 1;
                auto start = clock::now();

                for (int i = 0; i < subscriptions; ++i) {
                    c += rx::observable<>::
                        just(1).
                        map([](int i) {
                            std::stringstream serializer;
                            serializer << i;
                            return serializer.str();
                        }).
                        map([](const std::string& s) {
                            int i;
                            std::stringstream(s) >> i;
                            return i;
                        }).
                        subscribe_on(rx::observe_on_event_loop()).
                        as_blocking().
                        count();
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                REQUIRE(subscriptions == c);
                std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
            }
        }
    }
}