diff options
Diffstat (limited to 'Rx/v2/examples/doxygen/publish.cpp')
-rw-r--r-- | Rx/v2/examples/doxygen/publish.cpp | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp new file mode 100644 index 0000000..d34e991 --- /dev/null +++ b/Rx/v2/examples/doxygen/publish.cpp @@ -0,0 +1,97 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("publish_synchronized sample"){ + printf("//! [publish_synchronized sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + publish_synchronized(rxcpp::observe_on_new_thread()); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %ld\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish_synchronized sample]\n"); +} + +SCENARIO("publish subject sample"){ + printf("//! [publish subject sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %ld\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish subject sample]\n"); +} + +SCENARIO("publish behavior sample"){ + printf("//! [publish behavior sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(0L); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %ld\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %ld\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %ld\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish behavior sample]\n"); +} |