diff options
author | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-01 18:06:04 +0300 |
---|---|---|
committer | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-01 18:10:12 +0300 |
commit | 4992c73dbc8bbe3a7335b186f9b3e94da20ea127 (patch) | |
tree | 4f5b386b3cc364aa65c0b85216559abc266529e7 /Rx/v2/examples | |
parent | 1fe0081ab9866c2882bd0c24183bfa4b2de38d10 (diff) | |
download | RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.gz |
Add description and examples for observable<T> members
Diffstat (limited to 'Rx/v2/examples')
31 files changed, 1797 insertions, 3 deletions
diff --git a/Rx/v2/examples/doxygen/amb.cpp b/Rx/v2/examples/doxygen/amb.cpp new file mode 100644 index 0000000..8f2ac7d --- /dev/null +++ b/Rx/v2/examples/doxygen/amb.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("amb sample"){ + printf("//! [amb sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.amb(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [amb sample]\n"); +} + +SCENARIO("implicit amb sample"){ + printf("//! [implicit amb sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.amb(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit amb sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded amb sample"){ + printf("//! [threaded amb sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded amb sample]\n"); +} + +SCENARIO("threaded implicit amb sample"){ + printf("//! [threaded implicit amb sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.amb(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded implicit amb sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/as_dynamic.cpp b/Rx/v2/examples/doxygen/as_dynamic.cpp new file mode 100644 index 0000000..363ae57 --- /dev/null +++ b/Rx/v2/examples/doxygen/as_dynamic.cpp @@ -0,0 +1,21 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("as_dynamic sample"){ + printf("//! [as_dynamic sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::empty<int>(); + auto values = o1.concat(o2, o3); + printf("type of o1: %s\n", typeid(o1).name()); + printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name()); + printf("type of o2: %s\n", typeid(o2).name()); + printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name()); + printf("type of o3: %s\n", typeid(o3).name()); + printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name()); + printf("type of values: %s\n", typeid(values).name()); + printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name()); + printf("//! [as_dynamic sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp new file mode 100644 index 0000000..9a5d0b0 --- /dev/null +++ b/Rx/v2/examples/doxygen/buffer.cpp @@ -0,0 +1,204 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("buffer count sample"){ + printf("//! [buffer count sample]\n"); + auto values = rxcpp::observable<>::range(1, 5).buffer(2); + values. + subscribe( + [](std::vector<int> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](int a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer count sample]\n"); +} + +SCENARIO("buffer count+skip sample"){ + printf("//! [buffer count+skip sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3); + values. + subscribe( + [](std::vector<int> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](int a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer count+skip sample]\n"); +} + +std::string get_pid(); + +SCENARIO("buffer period+skip+coordination sample"){ + printf("//! [buffer period+skip+coordination sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + map([](long v){ + printf("[thread %s] Interval OnNext: %d\n", get_pid().c_str(), v); + return v; + }). + take(7). + buffer_with_time(period, skip, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::vector<long> v){ + printf("[thread %s] OnNext:", get_pid().c_str()); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [buffer period+skip+coordination sample]\n"); +} + +SCENARIO("buffer period+skip sample"){ + printf("//! [buffer period+skip sample]\n"); + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip sample]\n"); +} + +SCENARIO("buffer period+skip overlapping sample"){ + printf("//! [buffer period+skip overlapping sample]\n"); + auto period = std::chrono::milliseconds(6); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip overlapping sample]\n"); +} + +SCENARIO("buffer period+skip empty sample"){ + printf("//! [buffer period+skip empty sample]\n"); + auto period = std::chrono::milliseconds(2); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip empty sample]\n"); +} + +SCENARIO("buffer period+coordination sample"){ + printf("//! [buffer period+coordination sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+coordination sample]\n"); +} + +SCENARIO("buffer period sample"){ + printf("//! [buffer period sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(std::chrono::milliseconds(4)); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period sample]\n"); +} + +SCENARIO("buffer period+count+coordination sample"){ + printf("//! [buffer period+count+coordination sample]\n"); + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop()); + values. + as_blocking(). + subscribe( + [start](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+count+coordination sample]\n"); +} + +SCENARIO("buffer period+count sample"){ + printf("//! [buffer period+count sample]\n"); + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + buffer_with_time_or_count(std::chrono::milliseconds(20), 2); + values. + subscribe( + [start](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+count sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/combine_latest.cpp b/Rx/v2/examples/doxygen/combine_latest.cpp new file mode 100644 index 0000000..b220da4 --- /dev/null +++ b/Rx/v2/examples/doxygen/combine_latest.cpp @@ -0,0 +1,85 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("combine_latest sample"){ + printf("//! [combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest(o2, o3); + values. + take(5). + subscribe( + [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [combine_latest sample]\n"); +} + +std::string get_pid(); + +SCENARIO("Coordination combine_latest sample"){ + printf("//! [Coordination combine_latest sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto thr = rxcpp::synchronize_event_loop(); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) { + printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) { + printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) { + printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto values = o1.combine_latest(thr, o2, o3); + values. + take(5). + as_blocking(). + subscribe( + [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [Coordination combine_latest sample]\n"); +} + +SCENARIO("Selector combine_latest sample"){ + printf("//! [Selector combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest( + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(5). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Selector combine_latest sample]\n"); +} + +SCENARIO("Coordination+Selector combine_latest sample"){ + printf("//! [Coordination+Selector combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest( + rxcpp::observe_on_new_thread(), + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(5). + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Coordination+Selector combine_latest sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/concat.cpp b/Rx/v2/examples/doxygen/concat.cpp new file mode 100644 index 0000000..bb86033 --- /dev/null +++ b/Rx/v2/examples/doxygen/concat.cpp @@ -0,0 +1,60 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("concat sample"){ + printf("//! [concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto values = o1.concat(o2.as_dynamic(), o3.as_dynamic()); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [concat sample]\n"); +} + +SCENARIO("implicit concat sample"){ + printf("//! [implicit concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic()); + auto values = base.concat(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit concat sample]\n"); +} + +SCENARIO("threaded concat sample"){ + printf("//! [threaded concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto values = o1.concat(rxcpp::observe_on_new_thread(), o2.as_dynamic(), o3.as_dynamic()); + values. + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded concat sample]\n"); +} + +SCENARIO("threaded implicit concat sample"){ + printf("//! [threaded implicit concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic()); + auto values = base.concat(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded implicit concat sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/concat_map.cpp b/Rx/v2/examples/doxygen/concat_map.cpp new file mode 100644 index 0000000..70664e0 --- /dev/null +++ b/Rx/v2/examples/doxygen/concat_map.cpp @@ -0,0 +1,50 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("concat_map sample"){ + printf("//! [concat_map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat_map( + [](int v){ + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + return std::make_tuple(v_main, v_sub); + }); + values. + subscribe( + [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [concat_map sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded concat_map sample"){ + printf("//! [threaded concat_map sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + concat_map( + [](int v){ + printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); + return std::make_tuple(v_main, v_sub); + }, + rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded concat_map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/distinct_until_changed.cpp b/Rx/v2/examples/doxygen/distinct_until_changed.cpp new file mode 100644 index 0000000..9422b21 --- /dev/null +++ b/Rx/v2/examples/doxygen/distinct_until_changed.cpp @@ -0,0 +1,15 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("distinct_until_changed sample"){ + printf("//! [distinct_until_changed sample]\n"); + auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [distinct_until_changed sample]\n"); +} + diff --git a/Rx/v2/examples/doxygen/filter.cpp b/Rx/v2/examples/doxygen/filter.cpp new file mode 100644 index 0000000..36a1f49 --- /dev/null +++ b/Rx/v2/examples/doxygen/filter.cpp @@ -0,0 +1,17 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("filter sample"){ + printf("//! [filter sample]\n"); + auto values = rxcpp::observable<>::range(1, 6). + filter([](int v){ + return v % 2; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [filter sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/finally.cpp b/Rx/v2/examples/doxygen/finally.cpp new file mode 100644 index 0000000..3f25196 --- /dev/null +++ b/Rx/v2/examples/doxygen/finally.cpp @@ -0,0 +1,37 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("finally sample"){ + printf("//! [finally sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + finally([](){ + printf("The final action\n"); + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [finally sample]\n"); +} + +SCENARIO("error finally sample"){ + printf("//! [error finally sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + finally([](){ + printf("The final action\n"); + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [error finally sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/flat_map.cpp b/Rx/v2/examples/doxygen/flat_map.cpp new file mode 100644 index 0000000..21db657 --- /dev/null +++ b/Rx/v2/examples/doxygen/flat_map.cpp @@ -0,0 +1,50 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("flat_map sample"){ + printf("//! [flat_map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + flat_map( + [](int v){ + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + return std::make_tuple(v_main, v_sub); + }); + values. + subscribe( + [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [flat_map sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded flat_map sample"){ + printf("//! [threaded flat_map sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + flat_map( + [](int v){ + printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, int v_sub){ + printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); + return std::make_tuple(v_main, v_sub); + }, + rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded flat_map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/from.cpp b/Rx/v2/examples/doxygen/from.cpp index 03288eb..15186ba 100644 --- a/Rx/v2/examples/doxygen/from.cpp +++ b/Rx/v2/examples/doxygen/from.cpp @@ -13,14 +13,21 @@ SCENARIO("from sample"){ printf("//! [from sample]\n"); } +std::string get_pid(); + SCENARIO("threaded from sample"){ printf("//! [threaded from sample]\n"); - auto values = rxcpp::observable<>::from(rxcpp::observe_on_event_loop(), 1, 2, 3); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){ + printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v); + return v; + }); values. as_blocking(). subscribe( - [](int v){printf("OnNext: %d\n", v);}, - [](){printf("OnCompleted\n");}); + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); printf("//! [threaded from sample]\n"); } diff --git a/Rx/v2/examples/doxygen/group_by.cpp b/Rx/v2/examples/doxygen/group_by.cpp new file mode 100644 index 0000000..d30f334 --- /dev/null +++ b/Rx/v2/examples/doxygen/group_by.cpp @@ -0,0 +1,54 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("group_by sample"){ + printf("//! [group_by sample]\n"); + auto values = rxcpp::observable<>::range(0, 8). + group_by( + [](int v){return v % 3;}, + [](int v){return 10 * v;}); + values. + subscribe( + [](rxcpp::grouped_observable<int, int> g){ + auto key = g.get_key(); + printf("OnNext: key = %d\n", key); + g.subscribe( + [key](int v){printf("[key %d] OnNext: %d\n", key, v);}, + [key](){printf("[key %d] OnCompleted\n", key);}); + }, + [](){printf("OnCompleted\n");}); + printf("//! [group_by sample]\n"); +} + +//! [group_by full intro] +bool less(int v1, int v2){ + return v1 < v2; +} +//! [group_by full intro] + +SCENARIO("group_by full sample"){ + printf("//! [group_by full sample]\n"); + auto data = rxcpp::observable<>::range(0, 8). + map([](int v){ + std::stringstream s; + s << "Value " << v; + return std::make_pair(v % 3, s.str()); + }); + auto values = data.group_by( + [](std::pair<int, std::string> v){return v.first;}, + [](std::pair<int, std::string> v){return v.second;}, + less); + values. + subscribe( + [](rxcpp::grouped_observable<int, std::string> g){ + auto key = g.get_key(); + printf("OnNext: key = %d\n", key); + g.subscribe( + [key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());}, + [key](){printf("[key %d] OnCompleted\n", key);}); + }, + [](){printf("OnCompleted\n");}); + printf("//! [group_by full sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/map.cpp b/Rx/v2/examples/doxygen/map.cpp new file mode 100644 index 0000000..8db5030 --- /dev/null +++ b/Rx/v2/examples/doxygen/map.cpp @@ -0,0 +1,17 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("map sample"){ + printf("//! [map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + return 2 * v; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp new file mode 100644 index 0000000..dcf3a90 --- /dev/null +++ b/Rx/v2/examples/doxygen/math.cpp @@ -0,0 +1,89 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("first sample"){ + printf("//! [first sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).first(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [first sample]\n"); +} + +//SCENARIO("first empty sample"){ +// printf("//! [first empty sample]\n"); +// auto values = rxcpp::observable<>::empty<int>().first(); +// values. +// subscribe( +// [](int v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// catch (...) { +// printf("OnError:\n"); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [first empty sample]\n"); +//} + +SCENARIO("last sample"){ + printf("//! [last sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).last(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [last sample]\n"); +} + +//SCENARIO("last empty sample"){ +// printf("//! [last empty sample]\n"); +// auto values = rxcpp::observable<>::empty<int>().last(); +// values. +// subscribe( +// [](int v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [last empty sample]\n"); +//} + +SCENARIO("count sample"){ + printf("//! [count sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).count(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [count sample]\n"); +} + +SCENARIO("sum sample"){ + printf("//! [sum sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).sum(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [sum sample]\n"); +} + +SCENARIO("average sample"){ + printf("//! [average sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).average(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [average sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/merge.cpp b/Rx/v2/examples/doxygen/merge.cpp new file mode 100644 index 0000000..4b5ea9a --- /dev/null +++ b/Rx/v2/examples/doxygen/merge.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("merge sample"){ + printf("//! [merge sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.merge(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [merge sample]\n"); +} + +SCENARIO("implicit merge sample"){ + printf("//! [implicit merge sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.merge(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit merge sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded merge sample"){ + printf("//! [threaded merge sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded merge sample]\n"); +} + +SCENARIO("threaded implicit merge sample"){ + printf("//! [threaded implicit merge sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.merge(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded implicit merge sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/observe_on.cpp b/Rx/v2/examples/doxygen/observe_on.cpp new file mode 100644 index 0000000..927c339 --- /dev/null +++ b/Rx/v2/examples/doxygen/observe_on.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +std::string get_pid(); + +SCENARIO("observe_on sample"){ + printf("//! [observe_on sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + printf("[thread %s] Emit value %d\n", get_pid().c_str(), v); + return v; + }); + values. + observe_on(rxcpp::synchronize_new_thread()). + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [observe_on sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/pairwise.cpp b/Rx/v2/examples/doxygen/pairwise.cpp new file mode 100644 index 0000000..3dd8d34 --- /dev/null +++ b/Rx/v2/examples/doxygen/pairwise.cpp @@ -0,0 +1,51 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("pairwise sample"){ + printf("//! [pairwise sample]\n"); + auto values = rxcpp::observable<>::range(1, 5).pairwise(); + values. + subscribe( + [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [pairwise sample]\n"); +} + +SCENARIO("pairwise short sample"){ + printf("//! [pairwise short sample]\n"); + auto values = rxcpp::observable<>::just(1).pairwise(); + values. + subscribe( + [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [pairwise short sample]\n"); +} + +//std::string get_pid(); +// +//SCENARIO("threaded flat_map sample"){ +// printf("//! [threaded flat_map sample]\n"); +// printf("[thread %s] Start task\n", get_pid().c_str()); +// auto values = rxcpp::observable<>::range(1, 3). +// flat_map( +// [](int v){ +// printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); +// return +// rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). +// take(3); +// }, +// [](int v_main, int v_sub){ +// printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); +// return std::make_tuple(v_main, v_sub); +// }, +// rxcpp::observe_on_new_thread()); +// values. +// as_blocking(). +// subscribe( +// [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, +// [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); +// printf("[thread %s] Finish task\n", get_pid().c_str()); +// printf("//! [threaded flat_map sample]\n"); +//} diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp new file mode 100644 index 0000000..700fa89 --- /dev/null +++ b/Rx/v2/examples/doxygen/publish.cpp @@ -0,0 +1,97 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("publish_synchronized sample"){ + printf("//! [publish_synchronized sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + publish_synchronized(rxcpp::observe_on_new_thread()); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish_synchronized sample]\n"); +} + +SCENARIO("publish subject sample"){ + printf("//! [publish subject sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish subject sample]\n"); +} + +SCENARIO("publish behavior sample"){ + printf("//! [publish behavior sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(0L); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish behavior sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/reduce.cpp b/Rx/v2/examples/doxygen/reduce.cpp new file mode 100644 index 0000000..0005bf6 --- /dev/null +++ b/Rx/v2/examples/doxygen/reduce.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("reduce sample"){ + printf("//! [reduce sample]\n"); + auto values = rxcpp::observable<>::range(1, 7). + reduce( + std::make_pair(0, 1.0), + [](std::pair<int, double> seed, int v){ + seed.first += 1; + seed.second *= v; + return seed; + }, + [](std::pair<int, double> res){ + return std::pow(res.second, 1.0 / res.first); + }); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [reduce sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/repeat.cpp b/Rx/v2/examples/doxygen/repeat.cpp new file mode 100644 index 0000000..9cd8d43 --- /dev/null +++ b/Rx/v2/examples/doxygen/repeat.cpp @@ -0,0 +1,44 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("repeat sample"){ + printf("//! [repeat sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + repeat(). + take(5); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [repeat sample]\n"); +} + +SCENARIO("repeat count sample"){ + printf("//! [repeat count sample]\n"); + auto values = rxcpp::observable<>::from(1, 2).repeat(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [repeat count sample]\n"); +} + +SCENARIO("repeat error sample"){ + printf("//! [repeat error sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + repeat(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [repeat error sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/retry.cpp b/Rx/v2/examples/doxygen/retry.cpp new file mode 100644 index 0000000..efcfb23 --- /dev/null +++ b/Rx/v2/examples/doxygen/retry.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("retry sample"){ + printf("//! [retry sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + retry(). + take(5); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [retry sample]\n"); +} + +SCENARIO("retry count sample"){ + printf("//! [retry count sample]\n"); + auto source = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + auto values = source.retry(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [retry count sample]\n"); +} + +//SCENARIO("retry hot sample"){ +// printf("//! [retry hot sample]\n"); +// auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error3 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error4 from source"))). +// retry(3); +// values. +// subscribe( +// [](long v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [retry hot sample]\n"); +//} +// +//SCENARIO("retry completed sample"){ +// printf("//! [retry completed sample <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<]\n"); +// auto source = rxcpp::observable<>::from(1, 2). +// concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). +// publish(); +// auto values = source.retry(); +// //auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). +// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))). +// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))). +// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// // retry(3); +// values. +// subscribe( +// [](long v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [retry completed sample]\n"); +//} diff --git a/Rx/v2/examples/doxygen/scan.cpp b/Rx/v2/examples/doxygen/scan.cpp new file mode 100644 index 0000000..2a49efd --- /dev/null +++ b/Rx/v2/examples/doxygen/scan.cpp @@ -0,0 +1,19 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("scan sample"){ + printf("//! [scan sample]\n"); + auto values = rxcpp::observable<>::range(1, 7). + scan( + 0, + [](int seed, int v){ + return seed + v; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [scan sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/skip.cpp b/Rx/v2/examples/doxygen/skip.cpp new file mode 100644 index 0000000..e5b5979 --- /dev/null +++ b/Rx/v2/examples/doxygen/skip.cpp @@ -0,0 +1,14 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("skip sample"){ + printf("//! [skip sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).skip(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [skip sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/skip_until.cpp b/Rx/v2/examples/doxygen/skip_until.cpp new file mode 100644 index 0000000..3e0202f --- /dev/null +++ b/Rx/v2/examples/doxygen/skip_until.cpp @@ -0,0 +1,39 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("skip_until sample"){ + printf("//! [skip_until sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)); + auto values = source.skip_until(trigger); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [skip_until sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded skip_until sample"){ + printf("//! [threaded skip_until sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){ + printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded skip_until sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/subscribe.cpp b/Rx/v2/examples/doxygen/subscribe.cpp new file mode 100644 index 0000000..e7c3435 --- /dev/null +++ b/Rx/v2/examples/doxygen/subscribe.cpp @@ -0,0 +1,101 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("subscribe by subscriber"){ + printf("//! [subscribe by subscriber]\n"); + auto subscriber = rxcpp::make_subscriber<int>( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe(subscriber); + printf("//! [subscribe by subscriber]\n"); +} + +SCENARIO("subscribe by observer"){ + printf("//! [subscribe by observer]\n"); + auto subscriber = rxcpp::make_subscriber<int>( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + auto values1 = rxcpp::observable<>::range(1, 3); + auto values2 = rxcpp::observable<>::range(4, 6); + values1.subscribe(subscriber.get_observer()); + values2.subscribe(subscriber.get_observer()); + printf("//! [subscribe by observer]\n"); +} + +SCENARIO("subscribe by on_next"){ + printf("//! [subscribe by on_next]\n"); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}); + printf("//! [subscribe by on_next]\n"); +} + +SCENARIO("subscribe by on_next and on_error"){ + printf("//! [subscribe by on_next and on_error]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }); + printf("//! [subscribe by on_next and on_error]\n"); +} + +SCENARIO("subscribe by on_next and on_completed"){ + printf("//! [subscribe by on_next and on_completed]\n"); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by on_next and on_completed]\n"); +} + +SCENARIO("subscribe by subscription, on_next, and on_completed"){ + printf("//! [subscribe by subscription, on_next, and on_completed]\n"); + auto subscription = rxcpp::composite_subscription(); + auto values = rxcpp::observable<>::range(1, 5); + values.subscribe( + subscription, + [&subscription](int v){ + printf("OnNext: %d\n", v); + if (v == 3) + subscription.unsubscribe(); + }, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by subscription, on_next, and on_completed]\n"); +} + +SCENARIO("subscribe by on_next, on_error, and on_completed"){ + printf("//! [subscribe by on_next, on_error, and on_completed]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by on_next, on_error, and on_completed]\n"); +} + +SCENARIO("subscribe unsubscribe"){ + printf("//! [subscribe unsubscribe]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::never<int>()). + finally([](){printf("The final action\n");}); + auto subscription = values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + subscription.unsubscribe(); + printf("//! [subscribe unsubscribe]\n"); +} diff --git a/Rx/v2/examples/doxygen/subscribe_on.cpp b/Rx/v2/examples/doxygen/subscribe_on.cpp new file mode 100644 index 0000000..e2614bc --- /dev/null +++ b/Rx/v2/examples/doxygen/subscribe_on.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +std::string get_pid(); + +SCENARIO("subscribe_on sample"){ + printf("//! [subscribe_on sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + printf("[thread %s] Emit value %d\n", get_pid().c_str(), v); + return v; + }); + values. + subscribe_on(rxcpp::synchronize_new_thread()). + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [subscribe_on sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/switch_on_next.cpp b/Rx/v2/examples/doxygen/switch_on_next.cpp new file mode 100644 index 0000000..6e2da70 --- /dev/null +++ b/Rx/v2/examples/doxygen/switch_on_next.cpp @@ -0,0 +1,35 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("switch_on_next sample"){ + printf("//! [switch_on_next sample]\n"); + auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)). + take(3). + map([](int){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic(); + }); + auto values = base.switch_on_next().take(10); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [switch_on_next sample]\n"); +} + +SCENARIO("threaded switch_on_next sample"){ + printf("//! [threaded switch_on_next sample]\n"); + auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)). + take(3). + map([](long){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(10), rxcpp::observe_on_event_loop()).as_dynamic(); + }); + auto values = base.switch_on_next(rxcpp::observe_on_new_thread()).take(10); + values. + as_blocking(). + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded switch_on_next sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/take.cpp b/Rx/v2/examples/doxygen/take.cpp new file mode 100644 index 0000000..be3be24 --- /dev/null +++ b/Rx/v2/examples/doxygen/take.cpp @@ -0,0 +1,15 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + + +SCENARIO("take sample"){ + printf("//! [take sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).take(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/take_until.cpp b/Rx/v2/examples/doxygen/take_until.cpp new file mode 100644 index 0000000..098e7c1 --- /dev/null +++ b/Rx/v2/examples/doxygen/take_until.cpp @@ -0,0 +1,68 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("take_until sample"){ + printf("//! [take_until sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)); + auto values = source.take_until(trigger); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take_until sample]\n"); +} + +SCENARIO("take_until time sample"){ + printf("//! [take_until time sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25)); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take_until time sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded take_until sample"){ + printf("//! [threaded take_until sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){ + printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto values = source.take_until(trigger, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded take_until sample]\n"); +} + +SCENARIO("threaded take_until time sample"){ + printf("//! [threaded take_until time sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }).as_dynamic(); + auto scheduler = rxcpp::observe_on_new_thread(); + auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded take_until time sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp new file mode 100644 index 0000000..0a0ba63 --- /dev/null +++ b/Rx/v2/examples/doxygen/window.cpp @@ -0,0 +1,196 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("window count sample"){ + printf("//! [window count sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::range(1, 5).window(2); + values. + subscribe( + [&counter](rxcpp::observable<int> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](int v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window count sample]\n"); +} + +SCENARIO("window count+skip sample"){ + printf("//! [window count+skip sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::range(1, 7).window(2, 3); + values. + subscribe( + [&counter](rxcpp::observable<int> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](int v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window count+skip sample]\n"); +} + +SCENARIO("window period+skip+coordination sample"){ + printf("//! [window period+skip+coordination sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip+coordination sample]\n"); +} + +SCENARIO("window period+skip sample"){ + printf("//! [window period+skip sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip sample]\n"); +} + +SCENARIO("window period+skip overlapping sample"){ + printf("//! [window period+skip overlapping sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(6); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip overlapping sample]\n"); +} + +SCENARIO("window period+skip empty sample"){ + printf("//! [window period+skip empty sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(2); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip empty sample]\n"); +} + +SCENARIO("window period+coordination sample"){ + printf("//! [window period+coordination sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+coordination sample]\n"); +} + +SCENARIO("window period sample"){ + printf("//! [window period sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(std::chrono::milliseconds(4)); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period sample]\n"); +} + +SCENARIO("window period+count+coordination sample"){ + printf("//! [window period+count+coordination sample]\n"); + int counter = 0; + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+count+coordination sample]\n"); +} + +SCENARIO("window period+count sample"){ + printf("//! [window period+count sample]\n"); + int counter = 0; + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + window_with_time_or_count(std::chrono::milliseconds(20), 2); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+count sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/zip.cpp b/Rx/v2/examples/doxygen/zip.cpp new file mode 100644 index 0000000..9085110 --- /dev/null +++ b/Rx/v2/examples/doxygen/zip.cpp @@ -0,0 +1,85 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("zip sample"){ + printf("//! [zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip(o2, o3); + values. + take(3). + subscribe( + [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [zip sample]\n"); +} + +std::string get_pid(); + +SCENARIO("Coordination zip sample"){ + printf("//! [Coordination zip sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto thr = rxcpp::synchronize_event_loop(); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) { + printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) { + printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) { + printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto values = o1.zip(thr, o2, o3); + values. + take(3). + as_blocking(). + subscribe( + [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [Coordination zip sample]\n"); +} + +SCENARIO("Selector zip sample"){ + printf("//! [Selector zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip( + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(3). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Selector zip sample]\n"); +} + +SCENARIO("Coordination+Selector zip sample"){ + printf("//! [Coordination+Selector zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip( + rxcpp::observe_on_new_thread(), + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(3). + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Coordination+Selector zip sample]\n"); +} |