diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-13 19:58:34 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-12-13 19:58:34 -0800 |
commit | cfa49a4d4baa57159319d8498f8f9158b9036e6a (patch) | |
tree | 88fe0d4d32c8824aaf80ad9b759421effbc9983b /Rx/v2 | |
parent | 2ac078e3b0c78785134a195c26e7b55a6758f3c7 (diff) | |
download | RxCpp-cfa49a4d4baa57159319d8498f8f9158b9036e6a.tar.gz |
add support for C++ coroutines (#286)
* add support for C++ coroutines
* add tests and docs
* address feedback and fix lifetime issue
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/examples/awaitable/CMakeLists.txt | 36 | ||||
-rw-r--r-- | Rx/v2/examples/awaitable/main.cpp | 58 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-coroutine.hpp | 198 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/coroutine.cpp | 125 |
5 files changed, 418 insertions, 0 deletions
diff --git a/Rx/v2/examples/awaitable/CMakeLists.txt b/Rx/v2/examples/awaitable/CMakeLists.txt new file mode 100644 index 0000000..8d919b0 --- /dev/null +++ b/Rx/v2/examples/awaitable/CMakeLists.txt @@ -0,0 +1,36 @@ +cmake_minimum_required(VERSION 3.2 FATAL_ERROR) + +get_filename_component(SAMPLE_PROJECT "${CMAKE_CURRENT_SOURCE_DIR}" NAME) + +project(${SAMPLE_PROJECT} LANGUAGES C CXX) + +# define some folders +get_filename_component(RXCPP_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PATH) +get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH) +get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH) +get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH) + +MESSAGE( STATUS "RXCPP_DIR: " ${RXCPP_DIR} ) + +include(${RXCPP_DIR}/projects/CMake/shared.cmake) + +# define the sources +set(SAMPLE_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp +) +add_executable(${SAMPLE_PROJECT} ${SAMPLE_SOURCES}) +add_executable(rxcpp::examples::${SAMPLE_PROJECT} ALIAS ${SAMPLE_PROJECT}) +target_compile_options(${SAMPLE_PROJECT} PUBLIC ${RX_COMPILE_OPTIONS} /await) +target_compile_features(${SAMPLE_PROJECT} PUBLIC ${RX_COMPILE_FEATURES}) +target_include_directories(${SAMPLE_PROJECT} PUBLIC ${RX_SRC_DIR}) +target_link_libraries(${SAMPLE_PROJECT} ${CMAKE_THREAD_LIBS_INIT}) + +# configure unit tests via CTest +enable_testing() +set(CTEST_CONFIGURATION_TYPE "${JOB_BUILD_CONFIGURATION}") + +set_target_properties(${SAMPLE_PROJECT} PROPERTIES FOLDER "Examples") + +add_test(NAME RunTests + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}" + COMMAND ${SAMPLE_PROJECT} ${TEST_ARGS}) diff --git a/Rx/v2/examples/awaitable/main.cpp b/Rx/v2/examples/awaitable/main.cpp new file mode 100644 index 0000000..475fd23 --- /dev/null +++ b/Rx/v2/examples/awaitable/main.cpp @@ -0,0 +1,58 @@ + +#include <rxcpp/rx-lite.hpp> +#include <rxcpp/operators/rx-take.hpp> + +#include <rxcpp/rx-coroutine.hpp> + +using namespace rxcpp; +using namespace rxcpp::sources; +using namespace rxcpp::operators; +using namespace rxcpp::util; + +using namespace std; +using namespace std::chrono; + +future<void> intervals(){ + + { + printf("early exit from interval on thread\n"); + for co_await (auto c : interval(seconds(1), observe_on_event_loop())) { + printf("%d\n", c); + break; + } + } + + { + printf("interval on thread\n"); + for co_await (auto c : interval(seconds(1), observe_on_event_loop()) | take(3)) { + printf("%d\n", c); + } + } + + { + printf("current thread\n"); + int last = 0; + for co_await (auto c : range(1, 100000)) { + last = c; + } + printf("reached %d\n", last); + } + + try { + printf("error in observable\n"); + for co_await (auto c : error<long>(runtime_error("stopped by error"))) { + printf("%d\n", c); + } + printf("not reachable\n"); + terminate(); + } + catch(const exception& e) { + printf("%s\n", e.what()); + } +} + +int main() +{ + intervals().get(); + return 0; +} diff --git a/Rx/v2/src/rxcpp/rx-coroutine.hpp b/Rx/v2/src/rxcpp/rx-coroutine.hpp new file mode 100644 index 0000000..317f867 --- /dev/null +++ b/Rx/v2/src/rxcpp/rx-coroutine.hpp @@ -0,0 +1,198 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +/*! \file rx-coroutine.hpp + + \brief The proposal to add couroutines to the standard adds `co_await`, `for co_await`, `co_yield` and `co_return`. This file adds `begin(observable<>)` & `end(observable<>)` which enables `for co_await` to work with the `observable<>` type. + + for co_await (auto c : interval(seconds(1), observe_on_event_loop()) | take(3)) { + printf("%d\n", c); + } + +*/ + +#if !defined(RXCPP_RX_COROUTINE_HPP) +#define RXCPP_RX_COROUTINE_HPP + +#include "rx-includes.hpp" + +#ifdef _RESUMABLE_FUNCTIONS_SUPPORTED + +#include <rxcpp/operators/rx-finally.hpp> + +#include <experimental/resumable> + +namespace rxcpp { +namespace coroutine { + +using namespace std; +using namespace std::chrono; +using namespace std::experimental; + +template<typename Source> +struct co_observable_iterator; + +template<typename Source> +struct co_observable_iterator_state : std::enable_shared_from_this<co_observable_iterator_state<Source>> +{ + using value_type = typename Source::value_type; + + ~co_observable_iterator_state() { + lifetime.unsubscribe(); + } + explicit co_observable_iterator_state(const Source& o) : o(o) {} + + coroutine_handle<> caller{}; + composite_subscription lifetime{}; + const value_type* value{nullptr}; + exception_ptr error{nullptr}; + Source o; +}; + +template<typename Source> +struct co_observable_inc_awaiter +{ + bool await_ready() { + return false; + } + + bool await_suspend(coroutine_handle<> handle) { + if (!state->lifetime.is_subscribed()) {return false;} + state->caller = handle; + return true; + } + + co_observable_iterator<Source> await_resume(); + + shared_ptr<co_observable_iterator_state<Source>> state; +}; + +template<typename Source> +struct co_observable_iterator : public iterator<input_iterator_tag, typename Source::value_type> +{ + using value_type = typename Source::value_type; + + co_observable_iterator() {} + + explicit co_observable_iterator(const Source& o) : state(make_shared<co_observable_iterator_state<Source>>(o)) {} + explicit co_observable_iterator(const shared_ptr<co_observable_iterator_state<Source>>& o) : state(o) {} + + co_observable_iterator(co_observable_iterator&&)=default; + co_observable_iterator& operator=(co_observable_iterator&&)=default; + + co_observable_inc_awaiter<Source> operator++() + { + return co_observable_inc_awaiter<Source>{state}; + } + + co_observable_iterator& operator++(int) = delete; + // not implementing postincrement + + bool operator==(co_observable_iterator const &rhs) const + { + return !!state && !rhs.state && !state->lifetime.is_subscribed(); + } + + bool operator!=(co_observable_iterator const &rhs) const + { + return !(*this == rhs); + } + + value_type const &operator*() const + { + return *(state->value); + } + + value_type const *operator->() const + { + return std::addressof(operator*()); + } + + shared_ptr<co_observable_iterator_state<Source>> state; +}; + +template<typename Source> +co_observable_iterator<Source> co_observable_inc_awaiter<Source>::await_resume() { + if (!!state->error) {rethrow_exception(state->error);} + return co_observable_iterator<Source>{state}; +} + +template<typename Source> +struct co_observable_iterator_awaiter +{ + using iterator=co_observable_iterator<Source>; + using value_type=typename iterator::value_type; + + explicit co_observable_iterator_awaiter(const Source& o) : it(o) { + } + + bool await_ready() { + return false; + } + + void await_suspend(coroutine_handle<> handle) { + weak_ptr<co_observable_iterator_state<Source>> wst=it.state; + it.state->caller = handle; + it.state->o | + rxo::finally([wst](){ + auto st = wst.lock(); + if (st && !!st->caller) { + auto caller = st->caller; + st->caller = nullptr; + caller(); + } + }) | + rxo::subscribe<value_type>( + it.state->lifetime, + // next + [wst](const value_type& v){ + auto st = wst.lock(); + if (!st || !st->caller) {terminate();} + st->value = addressof(v); + auto caller = st->caller; + st->caller = nullptr; + caller(); + }, + // error + [wst](exception_ptr e){ + auto st = wst.lock(); + if (!st || !st->caller) {terminate();} + st->error = e; + auto caller = st->caller; + st->caller = nullptr; + caller(); + }); + } + + iterator await_resume() { + if (!!it.state->error) {rethrow_exception(it.state->error);} + return std::move(it); + } + + iterator it; +}; + +} +} + +namespace std +{ + +template<typename T, typename SourceOperator> +auto begin(const rxcpp::observable<T, SourceOperator>& o) + -> rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>> { + return rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>>{o}; +} + +template<typename T, typename SourceOperator> +auto end(const rxcpp::observable<T, SourceOperator>&) + -> rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>> { + return rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>>{}; +} + +} + +#endif + +#endif diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 1a94a1b..88659c1 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -19,6 +19,7 @@ set(TEST_DIR ${RXCPP_DIR}/Rx/v2/test) # define the sources of the self test set(TEST_SOURCES + ${TEST_DIR}/subscriptions/coroutine.cpp ${TEST_DIR}/subscriptions/observer.cpp ${TEST_DIR}/subscriptions/subscription.cpp ${TEST_DIR}/subjects/subject.cpp diff --git a/Rx/v2/test/subscriptions/coroutine.cpp b/Rx/v2/test/subscriptions/coroutine.cpp new file mode 100644 index 0000000..bf67c55 --- /dev/null +++ b/Rx/v2/test/subscriptions/coroutine.cpp @@ -0,0 +1,125 @@ +#include "../test.h" + +#include <rxcpp/rx-coroutine.hpp> + +#ifdef _RESUMABLE_FUNCTIONS_SUPPORTED + +SCENARIO("coroutine completes", "[coroutine]"){ + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(110, 1), + on.next(210, 2), + on.next(310, 10), + on.completed(350) + }); + + WHEN("for co_await"){ + + std::vector<typename rxsc::test::messages<int>::recorded_type> messages; + + w.advance_to(rxsc::test::subscribed_time); + + auto d = [&]() -> std::future<void> { + try { + for co_await (auto n : xs | rxo::as_dynamic()) { + messages.push_back(on.next(w.clock(), n)); + } + messages.push_back(on.completed(w.clock())); + } catch (...) { + messages.push_back(on.error(w.clock(), std::current_exception())); + } + }(); + + w.advance_to(rxsc::test::unsubscribed_time); + + THEN("the function completed"){ + REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready); + } + + THEN("the output only contains true"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(310, 10), + on.completed(350) + }); + auto actual = messages; + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 350) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("coroutine errors", "[coroutine]"){ + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("error in source"); + + auto xs = sc.make_hot_observable({ + on.next(110, 1), + on.next(210, 2), + on.error(310, ex), + on.next(310, 10), + on.completed(350) + }); + + WHEN("for co_await"){ + + std::vector<typename rxsc::test::messages<int>::recorded_type> messages; + + w.advance_to(rxsc::test::subscribed_time); + + auto d = [&]() -> std::future<void> { + try { + for co_await (auto n : xs | rxo::as_dynamic()) { + messages.push_back(on.next(w.clock(), n)); + } + messages.push_back(on.completed(w.clock())); + } catch (...) { + messages.push_back(on.error(w.clock(), std::current_exception())); + } + }(); + + w.advance_to(rxsc::test::unsubscribed_time); + + THEN("the function completed"){ + REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready); + } + + THEN("the output only contains true"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.error(310, ex) + }); + auto actual = messages; + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 310) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +#endif |