1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include "catch.hpp"
SCENARIO("publish_synchronized sample"){
printf("//! [publish_synchronized sample]\n");
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
publish_synchronized(rxcpp::observe_on_new_thread());
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("//! [publish_synchronized sample]\n");
}
SCENARIO("publish subject sample"){
printf("//! [publish subject sample]\n");
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
publish();
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("//! [publish subject sample]\n");
}
SCENARIO("publish behavior sample"){
printf("//! [publish behavior sample]\n");
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
publish(0L);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("//! [publish behavior sample]\n");
}
|