summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-12-13 19:58:34 -0800
committerGitHub <noreply@github.com>2016-12-13 19:58:34 -0800
commitcfa49a4d4baa57159319d8498f8f9158b9036e6a (patch)
tree88fe0d4d32c8824aaf80ad9b759421effbc9983b /Rx/v2
parent2ac078e3b0c78785134a195c26e7b55a6758f3c7 (diff)
downloadRxCpp-cfa49a4d4baa57159319d8498f8f9158b9036e6a.tar.gz
add support for C++ coroutines (#286)
* add support for C++ coroutines * add tests and docs * address feedback and fix lifetime issue
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/awaitable/CMakeLists.txt36
-rw-r--r--Rx/v2/examples/awaitable/main.cpp58
-rw-r--r--Rx/v2/src/rxcpp/rx-coroutine.hpp198
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/subscriptions/coroutine.cpp125
5 files changed, 418 insertions, 0 deletions
diff --git a/Rx/v2/examples/awaitable/CMakeLists.txt b/Rx/v2/examples/awaitable/CMakeLists.txt
new file mode 100644
index 0000000..8d919b0
--- /dev/null
+++ b/Rx/v2/examples/awaitable/CMakeLists.txt
@@ -0,0 +1,36 @@
+cmake_minimum_required(VERSION 3.2 FATAL_ERROR)
+
+get_filename_component(SAMPLE_PROJECT "${CMAKE_CURRENT_SOURCE_DIR}" NAME)
+
+project(${SAMPLE_PROJECT} LANGUAGES C CXX)
+
+# define some folders
+get_filename_component(RXCPP_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PATH)
+get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH)
+get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH)
+get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH)
+
+MESSAGE( STATUS "RXCPP_DIR: " ${RXCPP_DIR} )
+
+include(${RXCPP_DIR}/projects/CMake/shared.cmake)
+
+# define the sources
+set(SAMPLE_SOURCES
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
+)
+add_executable(${SAMPLE_PROJECT} ${SAMPLE_SOURCES})
+add_executable(rxcpp::examples::${SAMPLE_PROJECT} ALIAS ${SAMPLE_PROJECT})
+target_compile_options(${SAMPLE_PROJECT} PUBLIC ${RX_COMPILE_OPTIONS} /await)
+target_compile_features(${SAMPLE_PROJECT} PUBLIC ${RX_COMPILE_FEATURES})
+target_include_directories(${SAMPLE_PROJECT} PUBLIC ${RX_SRC_DIR})
+target_link_libraries(${SAMPLE_PROJECT} ${CMAKE_THREAD_LIBS_INIT})
+
+# configure unit tests via CTest
+enable_testing()
+set(CTEST_CONFIGURATION_TYPE "${JOB_BUILD_CONFIGURATION}")
+
+set_target_properties(${SAMPLE_PROJECT} PROPERTIES FOLDER "Examples")
+
+add_test(NAME RunTests
+ WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}"
+ COMMAND ${SAMPLE_PROJECT} ${TEST_ARGS})
diff --git a/Rx/v2/examples/awaitable/main.cpp b/Rx/v2/examples/awaitable/main.cpp
new file mode 100644
index 0000000..475fd23
--- /dev/null
+++ b/Rx/v2/examples/awaitable/main.cpp
@@ -0,0 +1,58 @@
+
+#include <rxcpp/rx-lite.hpp>
+#include <rxcpp/operators/rx-take.hpp>
+
+#include <rxcpp/rx-coroutine.hpp>
+
+using namespace rxcpp;
+using namespace rxcpp::sources;
+using namespace rxcpp::operators;
+using namespace rxcpp::util;
+
+using namespace std;
+using namespace std::chrono;
+
+future<void> intervals(){
+
+ {
+ printf("early exit from interval on thread\n");
+ for co_await (auto c : interval(seconds(1), observe_on_event_loop())) {
+ printf("%d\n", c);
+ break;
+ }
+ }
+
+ {
+ printf("interval on thread\n");
+ for co_await (auto c : interval(seconds(1), observe_on_event_loop()) | take(3)) {
+ printf("%d\n", c);
+ }
+ }
+
+ {
+ printf("current thread\n");
+ int last = 0;
+ for co_await (auto c : range(1, 100000)) {
+ last = c;
+ }
+ printf("reached %d\n", last);
+ }
+
+ try {
+ printf("error in observable\n");
+ for co_await (auto c : error<long>(runtime_error("stopped by error"))) {
+ printf("%d\n", c);
+ }
+ printf("not reachable\n");
+ terminate();
+ }
+ catch(const exception& e) {
+ printf("%s\n", e.what());
+ }
+}
+
+int main()
+{
+ intervals().get();
+ return 0;
+}
diff --git a/Rx/v2/src/rxcpp/rx-coroutine.hpp b/Rx/v2/src/rxcpp/rx-coroutine.hpp
new file mode 100644
index 0000000..317f867
--- /dev/null
+++ b/Rx/v2/src/rxcpp/rx-coroutine.hpp
@@ -0,0 +1,198 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+/*! \file rx-coroutine.hpp
+
+ \brief The proposal to add couroutines to the standard adds `co_await`, `for co_await`, `co_yield` and `co_return`. This file adds `begin(observable<>)` & `end(observable<>)` which enables `for co_await` to work with the `observable<>` type.
+
+ for co_await (auto c : interval(seconds(1), observe_on_event_loop()) | take(3)) {
+ printf("%d\n", c);
+ }
+
+*/
+
+#if !defined(RXCPP_RX_COROUTINE_HPP)
+#define RXCPP_RX_COROUTINE_HPP
+
+#include "rx-includes.hpp"
+
+#ifdef _RESUMABLE_FUNCTIONS_SUPPORTED
+
+#include <rxcpp/operators/rx-finally.hpp>
+
+#include <experimental/resumable>
+
+namespace rxcpp {
+namespace coroutine {
+
+using namespace std;
+using namespace std::chrono;
+using namespace std::experimental;
+
+template<typename Source>
+struct co_observable_iterator;
+
+template<typename Source>
+struct co_observable_iterator_state : std::enable_shared_from_this<co_observable_iterator_state<Source>>
+{
+ using value_type = typename Source::value_type;
+
+ ~co_observable_iterator_state() {
+ lifetime.unsubscribe();
+ }
+ explicit co_observable_iterator_state(const Source& o) : o(o) {}
+
+ coroutine_handle<> caller{};
+ composite_subscription lifetime{};
+ const value_type* value{nullptr};
+ exception_ptr error{nullptr};
+ Source o;
+};
+
+template<typename Source>
+struct co_observable_inc_awaiter
+{
+ bool await_ready() {
+ return false;
+ }
+
+ bool await_suspend(coroutine_handle<> handle) {
+ if (!state->lifetime.is_subscribed()) {return false;}
+ state->caller = handle;
+ return true;
+ }
+
+ co_observable_iterator<Source> await_resume();
+
+ shared_ptr<co_observable_iterator_state<Source>> state;
+};
+
+template<typename Source>
+struct co_observable_iterator : public iterator<input_iterator_tag, typename Source::value_type>
+{
+ using value_type = typename Source::value_type;
+
+ co_observable_iterator() {}
+
+ explicit co_observable_iterator(const Source& o) : state(make_shared<co_observable_iterator_state<Source>>(o)) {}
+ explicit co_observable_iterator(const shared_ptr<co_observable_iterator_state<Source>>& o) : state(o) {}
+
+ co_observable_iterator(co_observable_iterator&&)=default;
+ co_observable_iterator& operator=(co_observable_iterator&&)=default;
+
+ co_observable_inc_awaiter<Source> operator++()
+ {
+ return co_observable_inc_awaiter<Source>{state};
+ }
+
+ co_observable_iterator& operator++(int) = delete;
+ // not implementing postincrement
+
+ bool operator==(co_observable_iterator const &rhs) const
+ {
+ return !!state && !rhs.state && !state->lifetime.is_subscribed();
+ }
+
+ bool operator!=(co_observable_iterator const &rhs) const
+ {
+ return !(*this == rhs);
+ }
+
+ value_type const &operator*() const
+ {
+ return *(state->value);
+ }
+
+ value_type const *operator->() const
+ {
+ return std::addressof(operator*());
+ }
+
+ shared_ptr<co_observable_iterator_state<Source>> state;
+};
+
+template<typename Source>
+co_observable_iterator<Source> co_observable_inc_awaiter<Source>::await_resume() {
+ if (!!state->error) {rethrow_exception(state->error);}
+ return co_observable_iterator<Source>{state};
+}
+
+template<typename Source>
+struct co_observable_iterator_awaiter
+{
+ using iterator=co_observable_iterator<Source>;
+ using value_type=typename iterator::value_type;
+
+ explicit co_observable_iterator_awaiter(const Source& o) : it(o) {
+ }
+
+ bool await_ready() {
+ return false;
+ }
+
+ void await_suspend(coroutine_handle<> handle) {
+ weak_ptr<co_observable_iterator_state<Source>> wst=it.state;
+ it.state->caller = handle;
+ it.state->o |
+ rxo::finally([wst](){
+ auto st = wst.lock();
+ if (st && !!st->caller) {
+ auto caller = st->caller;
+ st->caller = nullptr;
+ caller();
+ }
+ }) |
+ rxo::subscribe<value_type>(
+ it.state->lifetime,
+ // next
+ [wst](const value_type& v){
+ auto st = wst.lock();
+ if (!st || !st->caller) {terminate();}
+ st->value = addressof(v);
+ auto caller = st->caller;
+ st->caller = nullptr;
+ caller();
+ },
+ // error
+ [wst](exception_ptr e){
+ auto st = wst.lock();
+ if (!st || !st->caller) {terminate();}
+ st->error = e;
+ auto caller = st->caller;
+ st->caller = nullptr;
+ caller();
+ });
+ }
+
+ iterator await_resume() {
+ if (!!it.state->error) {rethrow_exception(it.state->error);}
+ return std::move(it);
+ }
+
+ iterator it;
+};
+
+}
+}
+
+namespace std
+{
+
+template<typename T, typename SourceOperator>
+auto begin(const rxcpp::observable<T, SourceOperator>& o)
+ -> rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>> {
+ return rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>>{o};
+}
+
+template<typename T, typename SourceOperator>
+auto end(const rxcpp::observable<T, SourceOperator>&)
+ -> rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>> {
+ return rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>>{};
+}
+
+}
+
+#endif
+
+#endif
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 1a94a1b..88659c1 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -19,6 +19,7 @@ set(TEST_DIR ${RXCPP_DIR}/Rx/v2/test)
# define the sources of the self test
set(TEST_SOURCES
+ ${TEST_DIR}/subscriptions/coroutine.cpp
${TEST_DIR}/subscriptions/observer.cpp
${TEST_DIR}/subscriptions/subscription.cpp
${TEST_DIR}/subjects/subject.cpp
diff --git a/Rx/v2/test/subscriptions/coroutine.cpp b/Rx/v2/test/subscriptions/coroutine.cpp
new file mode 100644
index 0000000..bf67c55
--- /dev/null
+++ b/Rx/v2/test/subscriptions/coroutine.cpp
@@ -0,0 +1,125 @@
+#include "../test.h"
+
+#include <rxcpp/rx-coroutine.hpp>
+
+#ifdef _RESUMABLE_FUNCTIONS_SUPPORTED
+
+SCENARIO("coroutine completes", "[coroutine]"){
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(110, 1),
+ on.next(210, 2),
+ on.next(310, 10),
+ on.completed(350)
+ });
+
+ WHEN("for co_await"){
+
+ std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
+
+ w.advance_to(rxsc::test::subscribed_time);
+
+ auto d = [&]() -> std::future<void> {
+ try {
+ for co_await (auto n : xs | rxo::as_dynamic()) {
+ messages.push_back(on.next(w.clock(), n));
+ }
+ messages.push_back(on.completed(w.clock()));
+ } catch (...) {
+ messages.push_back(on.error(w.clock(), std::current_exception()));
+ }
+ }();
+
+ w.advance_to(rxsc::test::unsubscribed_time);
+
+ THEN("the function completed"){
+ REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
+ }
+
+ THEN("the output only contains true"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(310, 10),
+ on.completed(350)
+ });
+ auto actual = messages;
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 350)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("coroutine errors", "[coroutine]"){
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("error in source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(110, 1),
+ on.next(210, 2),
+ on.error(310, ex),
+ on.next(310, 10),
+ on.completed(350)
+ });
+
+ WHEN("for co_await"){
+
+ std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
+
+ w.advance_to(rxsc::test::subscribed_time);
+
+ auto d = [&]() -> std::future<void> {
+ try {
+ for co_await (auto n : xs | rxo::as_dynamic()) {
+ messages.push_back(on.next(w.clock(), n));
+ }
+ messages.push_back(on.completed(w.clock()));
+ } catch (...) {
+ messages.push_back(on.error(w.clock(), std::current_exception()));
+ }
+ }();
+
+ w.advance_to(rxsc::test::unsubscribed_time);
+
+ THEN("the function completed"){
+ REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
+ }
+
+ THEN("the output only contains true"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.error(310, ex)
+ });
+ auto actual = messages;
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 310)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+#endif