summaryrefslogtreecommitdiff
path: root/Rx/v2/examples/doxygen/publish.cpp
blob: d34e99171ecd7140057d23ed02a72d5ff2d68572 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include "rxcpp/rx.hpp"

#include "rxcpp/rx-test.hpp"
#include "catch.hpp"

SCENARIO("publish_synchronized sample"){
    printf("//! [publish_synchronized sample]\n");
    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
        take(5).
        publish_synchronized(rxcpp::observe_on_new_thread());

    // Subscribe from the beginning
    values.subscribe(
        [](long v){printf("[1] OnNext: %ld\n", v);},
        [](){printf("[1] OnCompleted\n");});

    // Another subscription from the beginning
    values.subscribe(
        [](long v){printf("[2] OnNext: %ld\n", v);},
        [](){printf("[2] OnCompleted\n");});

    // Start emitting
    values.connect();

    // Wait before subscribing
    rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
        values.subscribe(
            [](long v){printf("[3] OnNext: %ld\n", v);},
            [](){printf("[3] OnCompleted\n");});
    });

    // Add blocking subscription to see results
    values.as_blocking().subscribe();
    printf("//! [publish_synchronized sample]\n");
}

SCENARIO("publish subject sample"){
    printf("//! [publish subject sample]\n");
    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
        take(5).
        publish();

    // Subscribe from the beginning
    values.subscribe(
        [](long v){printf("[1] OnNext: %ld\n", v);},
        [](){printf("[1] OnCompleted\n");});

    // Another subscription from the beginning
    values.subscribe(
        [](long v){printf("[2] OnNext: %ld\n", v);},
        [](){printf("[2] OnCompleted\n");});

    // Start emitting
    values.connect();

    // Wait before subscribing
    rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
        values.subscribe(
            [](long v){printf("[3] OnNext: %ld\n", v);},
            [](){printf("[3] OnCompleted\n");});
    });

    // Add blocking subscription to see results
    values.as_blocking().subscribe();
    printf("//! [publish subject sample]\n");
}

SCENARIO("publish behavior sample"){
    printf("//! [publish behavior sample]\n");
    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
        take(5).
        publish(0L);

    // Subscribe from the beginning
    values.subscribe(
        [](long v){printf("[1] OnNext: %ld\n", v);},
        [](){printf("[1] OnCompleted\n");});

    // Another subscription from the beginning
    values.subscribe(
        [](long v){printf("[2] OnNext: %ld\n", v);},
        [](){printf("[2] OnCompleted\n");});

    // Start emitting
    values.connect();

    // Wait before subscribing
    rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
        values.subscribe(
            [](long v){printf("[3] OnNext: %ld\n", v);},
            [](){printf("[3] OnCompleted\n");});
    });

    // Add blocking subscription to see results
    values.as_blocking().subscribe();
    printf("//! [publish behavior sample]\n");
}