diff options
author | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-18 15:23:03 +0300 |
---|---|---|
committer | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-18 15:23:03 +0300 |
commit | 80da6100e87fb964f3b9e30d31b490ba3eb1477e (patch) | |
tree | 37dc5a8d7ffd84c280852a0316c98f0bfeea0952 | |
parent | 978623554af936aba1be75353acff52ce573eff3 (diff) | |
download | RxCpp-80da6100e87fb964f3b9e30d31b490ba3eb1477e.tar.gz |
Add doxy examples for replay operator
-rw-r--r-- | Rx/v2/examples/doxygen/replay.cpp | 234 | ||||
-rw-r--r-- | projects/doxygen/CMakeLists.txt | 1 |
2 files changed, 235 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/replay.cpp b/Rx/v2/examples/doxygen/replay.cpp new file mode 100644 index 0000000..d6f08ed --- /dev/null +++ b/Rx/v2/examples/doxygen/replay.cpp @@ -0,0 +1,234 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +std::string get_pid(); + +SCENARIO("replay sample"){ + printf("//! [replay sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + replay(); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){ + values.as_blocking().subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + }); + printf("//! [replay sample]\n"); +} + +SCENARIO("threaded replay sample"){ + printf("//! [threaded replay sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto coordination = rxcpp::serialize_new_thread(); + auto worker = coordination.create_coordinator().get_worker(); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + replay(coordination); + + // Subscribe from the beginning + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); + }); + + // Wait before subscribing + worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); + }); + + // Start emitting + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.connect(); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded replay sample]\n"); +} + +SCENARIO("replay count sample"){ + printf("//! [replay count sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + replay(2); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){ + values.as_blocking().subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + }); + printf("//! [replay count sample]\n"); +} + +SCENARIO("threaded replay count sample"){ + printf("//! [threaded replay count sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto coordination = rxcpp::serialize_new_thread(); + auto worker = coordination.create_coordinator().get_worker(); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + replay(2, coordination); + + // Subscribe from the beginning + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); + }); + + // Wait before subscribing + worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); + }); + + // Start emitting + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.connect(); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded replay count sample]\n"); +} + +SCENARIO("replay period sample"){ + printf("//! [replay period sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + replay(std::chrono::milliseconds(125)); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){ + values.as_blocking().subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + }); + printf("//! [replay period sample]\n"); +} + +SCENARIO("threaded replay period sample"){ + printf("//! [threaded replay period sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto coordination = rxcpp::serialize_new_thread(); + auto worker = coordination.create_coordinator().get_worker(); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + replay(std::chrono::milliseconds(125), coordination); + + // Subscribe from the beginning + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); + }); + + // Wait before subscribing + worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); + }); + + // Start emitting + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.connect(); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded replay period sample]\n"); +} + +SCENARIO("replay count+period sample"){ + printf("//! [replay count+period sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + replay(2, std::chrono::milliseconds(125)); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){ + values.as_blocking().subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + }); + printf("//! [replay count+period sample]\n"); +} + +SCENARIO("threaded replay count+period sample"){ + printf("//! [threaded replay count+period sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto coordination = rxcpp::serialize_new_thread(); + auto worker = coordination.create_coordinator().get_worker(); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + replay(2, std::chrono::milliseconds(125), coordination); + + // Subscribe from the beginning + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); + }); + + // Wait before subscribing + worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){ + values.subscribe( + [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, + [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); + }); + + // Start emitting + worker.schedule([&](const rxcpp::schedulers::schedulable&){ + values.connect(); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded replay count+period sample]\n"); +} diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index 4a7fe13..4abbb70 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -72,6 +72,7 @@ if(DOXYGEN_FOUND) ${DOXY_EXAMPLES_SRC_DIR}/range.cpp ${DOXY_EXAMPLES_SRC_DIR}/reduce.cpp ${DOXY_EXAMPLES_SRC_DIR}/repeat.cpp + ${DOXY_EXAMPLES_SRC_DIR}/replay.cpp ${DOXY_EXAMPLES_SRC_DIR}/retry.cpp ${DOXY_EXAMPLES_SRC_DIR}/scan.cpp ${DOXY_EXAMPLES_SRC_DIR}/scope.cpp |