summaryrefslogtreecommitdiff
path: root/Rx/v2/examples
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-06-01 18:06:04 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-06-01 18:10:12 +0300
commit4992c73dbc8bbe3a7335b186f9b3e94da20ea127 (patch)
tree4f5b386b3cc364aa65c0b85216559abc266529e7 /Rx/v2/examples
parent1fe0081ab9866c2882bd0c24183bfa4b2de38d10 (diff)
downloadRxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.gz
Add description and examples for observable<T> members
Diffstat (limited to 'Rx/v2/examples')
-rw-r--r--Rx/v2/examples/doxygen/amb.cpp84
-rw-r--r--Rx/v2/examples/doxygen/as_dynamic.cpp21
-rw-r--r--Rx/v2/examples/doxygen/buffer.cpp204
-rw-r--r--Rx/v2/examples/doxygen/combine_latest.cpp85
-rw-r--r--Rx/v2/examples/doxygen/concat.cpp60
-rw-r--r--Rx/v2/examples/doxygen/concat_map.cpp50
-rw-r--r--Rx/v2/examples/doxygen/distinct_until_changed.cpp15
-rw-r--r--Rx/v2/examples/doxygen/filter.cpp17
-rw-r--r--Rx/v2/examples/doxygen/finally.cpp37
-rw-r--r--Rx/v2/examples/doxygen/flat_map.cpp50
-rw-r--r--Rx/v2/examples/doxygen/from.cpp13
-rw-r--r--Rx/v2/examples/doxygen/group_by.cpp54
-rw-r--r--Rx/v2/examples/doxygen/map.cpp17
-rw-r--r--Rx/v2/examples/doxygen/math.cpp89
-rw-r--r--Rx/v2/examples/doxygen/merge.cpp84
-rw-r--r--Rx/v2/examples/doxygen/observe_on.cpp24
-rw-r--r--Rx/v2/examples/doxygen/pairwise.cpp51
-rw-r--r--Rx/v2/examples/doxygen/publish.cpp97
-rw-r--r--Rx/v2/examples/doxygen/reduce.cpp24
-rw-r--r--Rx/v2/examples/doxygen/repeat.cpp44
-rw-r--r--Rx/v2/examples/doxygen/retry.cpp84
-rw-r--r--Rx/v2/examples/doxygen/scan.cpp19
-rw-r--r--Rx/v2/examples/doxygen/skip.cpp14
-rw-r--r--Rx/v2/examples/doxygen/skip_until.cpp39
-rw-r--r--Rx/v2/examples/doxygen/subscribe.cpp101
-rw-r--r--Rx/v2/examples/doxygen/subscribe_on.cpp24
-rw-r--r--Rx/v2/examples/doxygen/switch_on_next.cpp35
-rw-r--r--Rx/v2/examples/doxygen/take.cpp15
-rw-r--r--Rx/v2/examples/doxygen/take_until.cpp68
-rw-r--r--Rx/v2/examples/doxygen/window.cpp196
-rw-r--r--Rx/v2/examples/doxygen/zip.cpp85
31 files changed, 1797 insertions, 3 deletions
diff --git a/Rx/v2/examples/doxygen/amb.cpp b/Rx/v2/examples/doxygen/amb.cpp
new file mode 100644
index 0000000..8f2ac7d
--- /dev/null
+++ b/Rx/v2/examples/doxygen/amb.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("amb sample"){
+ printf("//! [amb sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.amb(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [amb sample]\n");
+}
+
+SCENARIO("implicit amb sample"){
+ printf("//! [implicit amb sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.amb();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit amb sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded amb sample"){
+ printf("//! [threaded amb sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3);
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded amb sample]\n");
+}
+
+SCENARIO("threaded implicit amb sample"){
+ printf("//! [threaded implicit amb sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ }).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ }).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ }).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.amb(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded implicit amb sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/as_dynamic.cpp b/Rx/v2/examples/doxygen/as_dynamic.cpp
new file mode 100644
index 0000000..363ae57
--- /dev/null
+++ b/Rx/v2/examples/doxygen/as_dynamic.cpp
@@ -0,0 +1,21 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("as_dynamic sample"){
+ printf("//! [as_dynamic sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::empty<int>();
+ auto values = o1.concat(o2, o3);
+ printf("type of o1: %s\n", typeid(o1).name());
+ printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
+ printf("type of o2: %s\n", typeid(o2).name());
+ printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
+ printf("type of o3: %s\n", typeid(o3).name());
+ printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
+ printf("type of values: %s\n", typeid(values).name());
+ printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
+ printf("//! [as_dynamic sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
new file mode 100644
index 0000000..9a5d0b0
--- /dev/null
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -0,0 +1,204 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("buffer count sample"){
+ printf("//! [buffer count sample]\n");
+ auto values = rxcpp::observable<>::range(1, 5).buffer(2);
+ values.
+ subscribe(
+ [](std::vector<int> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](int a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer count sample]\n");
+}
+
+SCENARIO("buffer count+skip sample"){
+ printf("//! [buffer count+skip sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
+ values.
+ subscribe(
+ [](std::vector<int> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](int a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer count+skip sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("buffer period+skip+coordination sample"){
+ printf("//! [buffer period+skip+coordination sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ map([](long v){
+ printf("[thread %s] Interval OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ }).
+ take(7).
+ buffer_with_time(period, skip, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::vector<long> v){
+ printf("[thread %s] OnNext:", get_pid().c_str());
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [buffer period+skip+coordination sample]\n");
+}
+
+SCENARIO("buffer period+skip sample"){
+ printf("//! [buffer period+skip sample]\n");
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip sample]\n");
+}
+
+SCENARIO("buffer period+skip overlapping sample"){
+ printf("//! [buffer period+skip overlapping sample]\n");
+ auto period = std::chrono::milliseconds(6);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip overlapping sample]\n");
+}
+
+SCENARIO("buffer period+skip empty sample"){
+ printf("//! [buffer period+skip empty sample]\n");
+ auto period = std::chrono::milliseconds(2);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip empty sample]\n");
+}
+
+SCENARIO("buffer period+coordination sample"){
+ printf("//! [buffer period+coordination sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+coordination sample]\n");
+}
+
+SCENARIO("buffer period sample"){
+ printf("//! [buffer period sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(std::chrono::milliseconds(4));
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period sample]\n");
+}
+
+SCENARIO("buffer period+count+coordination sample"){
+ printf("//! [buffer period+count+coordination sample]\n");
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
+ values.
+ as_blocking().
+ subscribe(
+ [start](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+count+coordination sample]\n");
+}
+
+SCENARIO("buffer period+count sample"){
+ printf("//! [buffer period+count sample]\n");
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
+ values.
+ subscribe(
+ [start](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+count sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/combine_latest.cpp b/Rx/v2/examples/doxygen/combine_latest.cpp
new file mode 100644
index 0000000..b220da4
--- /dev/null
+++ b/Rx/v2/examples/doxygen/combine_latest.cpp
@@ -0,0 +1,85 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("combine_latest sample"){
+ printf("//! [combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(o2, o3);
+ values.
+ take(5).
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [combine_latest sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("Coordination combine_latest sample"){
+ printf("//! [Coordination combine_latest sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto thr = rxcpp::synchronize_event_loop();
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
+ printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
+ printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
+ printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = o1.combine_latest(thr, o2, o3);
+ values.
+ take(5).
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [Coordination combine_latest sample]\n");
+}
+
+SCENARIO("Selector combine_latest sample"){
+ printf("//! [Selector combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(5).
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Selector combine_latest sample]\n");
+}
+
+SCENARIO("Coordination+Selector combine_latest sample"){
+ printf("//! [Coordination+Selector combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(
+ rxcpp::observe_on_new_thread(),
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(5).
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Coordination+Selector combine_latest sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/concat.cpp b/Rx/v2/examples/doxygen/concat.cpp
new file mode 100644
index 0000000..bb86033
--- /dev/null
+++ b/Rx/v2/examples/doxygen/concat.cpp
@@ -0,0 +1,60 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("concat sample"){
+ printf("//! [concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto values = o1.concat(o2.as_dynamic(), o3.as_dynamic());
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [concat sample]\n");
+}
+
+SCENARIO("implicit concat sample"){
+ printf("//! [implicit concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic());
+ auto values = base.concat();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit concat sample]\n");
+}
+
+SCENARIO("threaded concat sample"){
+ printf("//! [threaded concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto values = o1.concat(rxcpp::observe_on_new_thread(), o2.as_dynamic(), o3.as_dynamic());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded concat sample]\n");
+}
+
+SCENARIO("threaded implicit concat sample"){
+ printf("//! [threaded implicit concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic());
+ auto values = base.concat(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded implicit concat sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/concat_map.cpp b/Rx/v2/examples/doxygen/concat_map.cpp
new file mode 100644
index 0000000..70664e0
--- /dev/null
+++ b/Rx/v2/examples/doxygen/concat_map.cpp
@@ -0,0 +1,50 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("concat_map sample"){
+ printf("//! [concat_map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat_map(
+ [](int v){
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ return std::make_tuple(v_main, v_sub);
+ });
+ values.
+ subscribe(
+ [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [concat_map sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded concat_map sample"){
+ printf("//! [threaded concat_map sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat_map(
+ [](int v){
+ printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+ return std::make_tuple(v_main, v_sub);
+ },
+ rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded concat_map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/distinct_until_changed.cpp b/Rx/v2/examples/doxygen/distinct_until_changed.cpp
new file mode 100644
index 0000000..9422b21
--- /dev/null
+++ b/Rx/v2/examples/doxygen/distinct_until_changed.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("distinct_until_changed sample"){
+ printf("//! [distinct_until_changed sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [distinct_until_changed sample]\n");
+}
+
diff --git a/Rx/v2/examples/doxygen/filter.cpp b/Rx/v2/examples/doxygen/filter.cpp
new file mode 100644
index 0000000..36a1f49
--- /dev/null
+++ b/Rx/v2/examples/doxygen/filter.cpp
@@ -0,0 +1,17 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("filter sample"){
+ printf("//! [filter sample]\n");
+ auto values = rxcpp::observable<>::range(1, 6).
+ filter([](int v){
+ return v % 2;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [filter sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/finally.cpp b/Rx/v2/examples/doxygen/finally.cpp
new file mode 100644
index 0000000..3f25196
--- /dev/null
+++ b/Rx/v2/examples/doxygen/finally.cpp
@@ -0,0 +1,37 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("finally sample"){
+ printf("//! [finally sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ finally([](){
+ printf("The final action\n");
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [finally sample]\n");
+}
+
+SCENARIO("error finally sample"){
+ printf("//! [error finally sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ finally([](){
+ printf("The final action\n");
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [error finally sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/flat_map.cpp b/Rx/v2/examples/doxygen/flat_map.cpp
new file mode 100644
index 0000000..21db657
--- /dev/null
+++ b/Rx/v2/examples/doxygen/flat_map.cpp
@@ -0,0 +1,50 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("flat_map sample"){
+ printf("//! [flat_map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ flat_map(
+ [](int v){
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ return std::make_tuple(v_main, v_sub);
+ });
+ values.
+ subscribe(
+ [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [flat_map sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded flat_map sample"){
+ printf("//! [threaded flat_map sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ flat_map(
+ [](int v){
+ printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, int v_sub){
+ printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+ return std::make_tuple(v_main, v_sub);
+ },
+ rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded flat_map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/from.cpp b/Rx/v2/examples/doxygen/from.cpp
index 03288eb..15186ba 100644
--- a/Rx/v2/examples/doxygen/from.cpp
+++ b/Rx/v2/examples/doxygen/from.cpp
@@ -13,14 +13,21 @@ SCENARIO("from sample"){
printf("//! [from sample]\n");
}
+std::string get_pid();
+
SCENARIO("threaded from sample"){
printf("//! [threaded from sample]\n");
- auto values = rxcpp::observable<>::from(rxcpp::observe_on_event_loop(), 1, 2, 3);
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
+ printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
+ return v;
+ });
values.
as_blocking().
subscribe(
- [](int v){printf("OnNext: %d\n", v);},
- [](){printf("OnCompleted\n");});
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
printf("//! [threaded from sample]\n");
}
diff --git a/Rx/v2/examples/doxygen/group_by.cpp b/Rx/v2/examples/doxygen/group_by.cpp
new file mode 100644
index 0000000..d30f334
--- /dev/null
+++ b/Rx/v2/examples/doxygen/group_by.cpp
@@ -0,0 +1,54 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("group_by sample"){
+ printf("//! [group_by sample]\n");
+ auto values = rxcpp::observable<>::range(0, 8).
+ group_by(
+ [](int v){return v % 3;},
+ [](int v){return 10 * v;});
+ values.
+ subscribe(
+ [](rxcpp::grouped_observable<int, int> g){
+ auto key = g.get_key();
+ printf("OnNext: key = %d\n", key);
+ g.subscribe(
+ [key](int v){printf("[key %d] OnNext: %d\n", key, v);},
+ [key](){printf("[key %d] OnCompleted\n", key);});
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [group_by sample]\n");
+}
+
+//! [group_by full intro]
+bool less(int v1, int v2){
+ return v1 < v2;
+}
+//! [group_by full intro]
+
+SCENARIO("group_by full sample"){
+ printf("//! [group_by full sample]\n");
+ auto data = rxcpp::observable<>::range(0, 8).
+ map([](int v){
+ std::stringstream s;
+ s << "Value " << v;
+ return std::make_pair(v % 3, s.str());
+ });
+ auto values = data.group_by(
+ [](std::pair<int, std::string> v){return v.first;},
+ [](std::pair<int, std::string> v){return v.second;},
+ less);
+ values.
+ subscribe(
+ [](rxcpp::grouped_observable<int, std::string> g){
+ auto key = g.get_key();
+ printf("OnNext: key = %d\n", key);
+ g.subscribe(
+ [key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());},
+ [key](){printf("[key %d] OnCompleted\n", key);});
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [group_by full sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/map.cpp b/Rx/v2/examples/doxygen/map.cpp
new file mode 100644
index 0000000..8db5030
--- /dev/null
+++ b/Rx/v2/examples/doxygen/map.cpp
@@ -0,0 +1,17 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("map sample"){
+ printf("//! [map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ return 2 * v;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp
new file mode 100644
index 0000000..dcf3a90
--- /dev/null
+++ b/Rx/v2/examples/doxygen/math.cpp
@@ -0,0 +1,89 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("first sample"){
+ printf("//! [first sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).first();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [first sample]\n");
+}
+
+//SCENARIO("first empty sample"){
+// printf("//! [first empty sample]\n");
+// auto values = rxcpp::observable<>::empty<int>().first();
+// values.
+// subscribe(
+// [](int v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// catch (...) {
+// printf("OnError:\n");
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [first empty sample]\n");
+//}
+
+SCENARIO("last sample"){
+ printf("//! [last sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).last();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [last sample]\n");
+}
+
+//SCENARIO("last empty sample"){
+// printf("//! [last empty sample]\n");
+// auto values = rxcpp::observable<>::empty<int>().last();
+// values.
+// subscribe(
+// [](int v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [last empty sample]\n");
+//}
+
+SCENARIO("count sample"){
+ printf("//! [count sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).count();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [count sample]\n");
+}
+
+SCENARIO("sum sample"){
+ printf("//! [sum sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).sum();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [sum sample]\n");
+}
+
+SCENARIO("average sample"){
+ printf("//! [average sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).average();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [average sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/merge.cpp b/Rx/v2/examples/doxygen/merge.cpp
new file mode 100644
index 0000000..4b5ea9a
--- /dev/null
+++ b/Rx/v2/examples/doxygen/merge.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("merge sample"){
+ printf("//! [merge sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.merge(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [merge sample]\n");
+}
+
+SCENARIO("implicit merge sample"){
+ printf("//! [implicit merge sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.merge();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit merge sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded merge sample"){
+ printf("//! [threaded merge sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded merge sample]\n");
+}
+
+SCENARIO("threaded implicit merge sample"){
+ printf("//! [threaded implicit merge sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ }).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ }).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ }).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.merge(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded implicit merge sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/observe_on.cpp b/Rx/v2/examples/doxygen/observe_on.cpp
new file mode 100644
index 0000000..927c339
--- /dev/null
+++ b/Rx/v2/examples/doxygen/observe_on.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+std::string get_pid();
+
+SCENARIO("observe_on sample"){
+ printf("//! [observe_on sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ values.
+ observe_on(rxcpp::synchronize_new_thread()).
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [observe_on sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/pairwise.cpp b/Rx/v2/examples/doxygen/pairwise.cpp
new file mode 100644
index 0000000..3dd8d34
--- /dev/null
+++ b/Rx/v2/examples/doxygen/pairwise.cpp
@@ -0,0 +1,51 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("pairwise sample"){
+ printf("//! [pairwise sample]\n");
+ auto values = rxcpp::observable<>::range(1, 5).pairwise();
+ values.
+ subscribe(
+ [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [pairwise sample]\n");
+}
+
+SCENARIO("pairwise short sample"){
+ printf("//! [pairwise short sample]\n");
+ auto values = rxcpp::observable<>::just(1).pairwise();
+ values.
+ subscribe(
+ [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [pairwise short sample]\n");
+}
+
+//std::string get_pid();
+//
+//SCENARIO("threaded flat_map sample"){
+// printf("//! [threaded flat_map sample]\n");
+// printf("[thread %s] Start task\n", get_pid().c_str());
+// auto values = rxcpp::observable<>::range(1, 3).
+// flat_map(
+// [](int v){
+// printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+// return
+// rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+// take(3);
+// },
+// [](int v_main, int v_sub){
+// printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+// return std::make_tuple(v_main, v_sub);
+// },
+// rxcpp::observe_on_new_thread());
+// values.
+// as_blocking().
+// subscribe(
+// [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+// [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+// printf("[thread %s] Finish task\n", get_pid().c_str());
+// printf("//! [threaded flat_map sample]\n");
+//}
diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp
new file mode 100644
index 0000000..700fa89
--- /dev/null
+++ b/Rx/v2/examples/doxygen/publish.cpp
@@ -0,0 +1,97 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("publish_synchronized sample"){
+ printf("//! [publish_synchronized sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
+ take(5).
+ publish_synchronized(rxcpp::observe_on_new_thread());
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish_synchronized sample]\n");
+}
+
+SCENARIO("publish subject sample"){
+ printf("//! [publish subject sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+ take(5).
+ publish();
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish subject sample]\n");
+}
+
+SCENARIO("publish behavior sample"){
+ printf("//! [publish behavior sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+ take(5).
+ publish(0L);
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish behavior sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/reduce.cpp b/Rx/v2/examples/doxygen/reduce.cpp
new file mode 100644
index 0000000..0005bf6
--- /dev/null
+++ b/Rx/v2/examples/doxygen/reduce.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("reduce sample"){
+ printf("//! [reduce sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).
+ reduce(
+ std::make_pair(0, 1.0),
+ [](std::pair<int, double> seed, int v){
+ seed.first += 1;
+ seed.second *= v;
+ return seed;
+ },
+ [](std::pair<int, double> res){
+ return std::pow(res.second, 1.0 / res.first);
+ });
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [reduce sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/repeat.cpp b/Rx/v2/examples/doxygen/repeat.cpp
new file mode 100644
index 0000000..9cd8d43
--- /dev/null
+++ b/Rx/v2/examples/doxygen/repeat.cpp
@@ -0,0 +1,44 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("repeat sample"){
+ printf("//! [repeat sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ repeat().
+ take(5);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat sample]\n");
+}
+
+SCENARIO("repeat count sample"){
+ printf("//! [repeat count sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).repeat(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat count sample]\n");
+}
+
+SCENARIO("repeat error sample"){
+ printf("//! [repeat error sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ repeat();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat error sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/retry.cpp b/Rx/v2/examples/doxygen/retry.cpp
new file mode 100644
index 0000000..efcfb23
--- /dev/null
+++ b/Rx/v2/examples/doxygen/retry.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("retry sample"){
+ printf("//! [retry sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ retry().
+ take(5);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [retry sample]\n");
+}
+
+SCENARIO("retry count sample"){
+ printf("//! [retry count sample]\n");
+ auto source = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ auto values = source.retry(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [retry count sample]\n");
+}
+
+//SCENARIO("retry hot sample"){
+// printf("//! [retry hot sample]\n");
+// auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error3 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error4 from source"))).
+// retry(3);
+// values.
+// subscribe(
+// [](long v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [retry hot sample]\n");
+//}
+//
+//SCENARIO("retry completed sample"){
+// printf("//! [retry completed sample <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<]\n");
+// auto source = rxcpp::observable<>::from(1, 2).
+// concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+// publish();
+// auto values = source.retry();
+// //auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))).
+// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))).
+// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// // retry(3);
+// values.
+// subscribe(
+// [](long v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [retry completed sample]\n");
+//}
diff --git a/Rx/v2/examples/doxygen/scan.cpp b/Rx/v2/examples/doxygen/scan.cpp
new file mode 100644
index 0000000..2a49efd
--- /dev/null
+++ b/Rx/v2/examples/doxygen/scan.cpp
@@ -0,0 +1,19 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("scan sample"){
+ printf("//! [scan sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).
+ scan(
+ 0,
+ [](int seed, int v){
+ return seed + v;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [scan sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/skip.cpp b/Rx/v2/examples/doxygen/skip.cpp
new file mode 100644
index 0000000..e5b5979
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip.cpp
@@ -0,0 +1,14 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip sample"){
+ printf("//! [skip sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).skip(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/skip_until.cpp b/Rx/v2/examples/doxygen/skip_until.cpp
new file mode 100644
index 0000000..3e0202f
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip_until.cpp
@@ -0,0 +1,39 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip_until sample"){
+ printf("//! [skip_until sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
+ auto values = source.skip_until(trigger);
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip_until sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded skip_until sample"){
+ printf("//! [threaded skip_until sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
+ printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded skip_until sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/subscribe.cpp b/Rx/v2/examples/doxygen/subscribe.cpp
new file mode 100644
index 0000000..e7c3435
--- /dev/null
+++ b/Rx/v2/examples/doxygen/subscribe.cpp
@@ -0,0 +1,101 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("subscribe by subscriber"){
+ printf("//! [subscribe by subscriber]\n");
+ auto subscriber = rxcpp::make_subscriber<int>(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(subscriber);
+ printf("//! [subscribe by subscriber]\n");
+}
+
+SCENARIO("subscribe by observer"){
+ printf("//! [subscribe by observer]\n");
+ auto subscriber = rxcpp::make_subscriber<int>(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ auto values1 = rxcpp::observable<>::range(1, 3);
+ auto values2 = rxcpp::observable<>::range(4, 6);
+ values1.subscribe(subscriber.get_observer());
+ values2.subscribe(subscriber.get_observer());
+ printf("//! [subscribe by observer]\n");
+}
+
+SCENARIO("subscribe by on_next"){
+ printf("//! [subscribe by on_next]\n");
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);});
+ printf("//! [subscribe by on_next]\n");
+}
+
+SCENARIO("subscribe by on_next and on_error"){
+ printf("//! [subscribe by on_next and on_error]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ });
+ printf("//! [subscribe by on_next and on_error]\n");
+}
+
+SCENARIO("subscribe by on_next and on_completed"){
+ printf("//! [subscribe by on_next and on_completed]\n");
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by on_next and on_completed]\n");
+}
+
+SCENARIO("subscribe by subscription, on_next, and on_completed"){
+ printf("//! [subscribe by subscription, on_next, and on_completed]\n");
+ auto subscription = rxcpp::composite_subscription();
+ auto values = rxcpp::observable<>::range(1, 5);
+ values.subscribe(
+ subscription,
+ [&subscription](int v){
+ printf("OnNext: %d\n", v);
+ if (v == 3)
+ subscription.unsubscribe();
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by subscription, on_next, and on_completed]\n");
+}
+
+SCENARIO("subscribe by on_next, on_error, and on_completed"){
+ printf("//! [subscribe by on_next, on_error, and on_completed]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by on_next, on_error, and on_completed]\n");
+}
+
+SCENARIO("subscribe unsubscribe"){
+ printf("//! [subscribe unsubscribe]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::never<int>()).
+ finally([](){printf("The final action\n");});
+ auto subscription = values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ subscription.unsubscribe();
+ printf("//! [subscribe unsubscribe]\n");
+}
diff --git a/Rx/v2/examples/doxygen/subscribe_on.cpp b/Rx/v2/examples/doxygen/subscribe_on.cpp
new file mode 100644
index 0000000..e2614bc
--- /dev/null
+++ b/Rx/v2/examples/doxygen/subscribe_on.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+std::string get_pid();
+
+SCENARIO("subscribe_on sample"){
+ printf("//! [subscribe_on sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ values.
+ subscribe_on(rxcpp::synchronize_new_thread()).
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [subscribe_on sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/switch_on_next.cpp b/Rx/v2/examples/doxygen/switch_on_next.cpp
new file mode 100644
index 0000000..6e2da70
--- /dev/null
+++ b/Rx/v2/examples/doxygen/switch_on_next.cpp
@@ -0,0 +1,35 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("switch_on_next sample"){
+ printf("//! [switch_on_next sample]\n");
+ auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
+ take(3).
+ map([](int){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic();
+ });
+ auto values = base.switch_on_next().take(10);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [switch_on_next sample]\n");
+}
+
+SCENARIO("threaded switch_on_next sample"){
+ printf("//! [threaded switch_on_next sample]\n");
+ auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
+ take(3).
+ map([](long){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(10), rxcpp::observe_on_event_loop()).as_dynamic();
+ });
+ auto values = base.switch_on_next(rxcpp::observe_on_new_thread()).take(10);
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded switch_on_next sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/take.cpp b/Rx/v2/examples/doxygen/take.cpp
new file mode 100644
index 0000000..be3be24
--- /dev/null
+++ b/Rx/v2/examples/doxygen/take.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+
+SCENARIO("take sample"){
+ printf("//! [take sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).take(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/take_until.cpp b/Rx/v2/examples/doxygen/take_until.cpp
new file mode 100644
index 0000000..098e7c1
--- /dev/null
+++ b/Rx/v2/examples/doxygen/take_until.cpp
@@ -0,0 +1,68 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("take_until sample"){
+ printf("//! [take_until sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
+ auto values = source.take_until(trigger);
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take_until sample]\n");
+}
+
+SCENARIO("take_until time sample"){
+ printf("//! [take_until time sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take_until time sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded take_until sample"){
+ printf("//! [threaded take_until sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
+ printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded take_until sample]\n");
+}
+
+SCENARIO("threaded take_until time sample"){
+ printf("//! [threaded take_until time sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ }).as_dynamic();
+ auto scheduler = rxcpp::observe_on_new_thread();
+ auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded take_until time sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp
new file mode 100644
index 0000000..0a0ba63
--- /dev/null
+++ b/Rx/v2/examples/doxygen/window.cpp
@@ -0,0 +1,196 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("window count sample"){
+ printf("//! [window count sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::range(1, 5).window(2);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<int> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](int v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window count sample]\n");
+}
+
+SCENARIO("window count+skip sample"){
+ printf("//! [window count+skip sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::range(1, 7).window(2, 3);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<int> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](int v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window count+skip sample]\n");
+}
+
+SCENARIO("window period+skip+coordination sample"){
+ printf("//! [window period+skip+coordination sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip+coordination sample]\n");
+}
+
+SCENARIO("window period+skip sample"){
+ printf("//! [window period+skip sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip sample]\n");
+}
+
+SCENARIO("window period+skip overlapping sample"){
+ printf("//! [window period+skip overlapping sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(6);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip overlapping sample]\n");
+}
+
+SCENARIO("window period+skip empty sample"){
+ printf("//! [window period+skip empty sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(2);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip empty sample]\n");
+}
+
+SCENARIO("window period+coordination sample"){
+ printf("//! [window period+coordination sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+coordination sample]\n");
+}
+
+SCENARIO("window period sample"){
+ printf("//! [window period sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(std::chrono::milliseconds(4));
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period sample]\n");
+}
+
+SCENARIO("window period+count+coordination sample"){
+ printf("//! [window period+count+coordination sample]\n");
+ int counter = 0;
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+count+coordination sample]\n");
+}
+
+SCENARIO("window period+count sample"){
+ printf("//! [window period+count sample]\n");
+ int counter = 0;
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ window_with_time_or_count(std::chrono::milliseconds(20), 2);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+count sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/zip.cpp b/Rx/v2/examples/doxygen/zip.cpp
new file mode 100644
index 0000000..9085110
--- /dev/null
+++ b/Rx/v2/examples/doxygen/zip.cpp
@@ -0,0 +1,85 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("zip sample"){
+ printf("//! [zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(o2, o3);
+ values.
+ take(3).
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [zip sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("Coordination zip sample"){
+ printf("//! [Coordination zip sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto thr = rxcpp::synchronize_event_loop();
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) {
+ printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
+ printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
+ printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = o1.zip(thr, o2, o3);
+ values.
+ take(3).
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [Coordination zip sample]\n");
+}
+
+SCENARIO("Selector zip sample"){
+ printf("//! [Selector zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(3).
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Selector zip sample]\n");
+}
+
+SCENARIO("Coordination+Selector zip sample"){
+ printf("//! [Coordination+Selector zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(
+ rxcpp::observe_on_new_thread(),
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(3).
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Coordination+Selector zip sample]\n");
+}