diff options
author | Martin Kodovský <martin.kodovsky@gmail.com> | 2017-12-09 22:39:51 +0100 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-12-09 13:39:51 -0800 |
commit | 815e92158e3e0647b96d1331de1ecc5badcde3f8 (patch) | |
tree | a25f7e575d1d00c6433ab67388b5a0a3ed0b200c /Rx | |
parent | 1b2e0589f19cb34d8cd58803677701dcf2161876 (diff) | |
download | RxCpp-815e92158e3e0647b96d1331de1ecc5badcde3f8.tar.gz |
Add rx-merge-delay-error operator (#417)
* Add rx-merge-delay-error operator
* fix of msvc2013 compilation
* fix #417 comments
Added RXCPP_NOEXCEPT macro; Added doxygen scenarios for composite_exception and merge_delay_error; Fixed composing exception in merge_delay_error operator; Modified test for merge_delay_operator
* #417 fix composite_exception example
* #417 fix merge_delay_error doxygen example
* fix: samples add among others in project doxygen CMakeLists.txt
* fix: composite_exception.cpp example
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/examples/doxygen/composite_exception.cpp | 31 | ||||
-rw-r--r-- | Rx/v2/examples/doxygen/merge_delay_error.cpp | 97 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp | 304 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-composite_exception.hpp | 34 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 7 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 11 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 6 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/merge_delay_error.cpp | 306 |
9 files changed, 797 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/composite_exception.cpp b/Rx/v2/examples/doxygen/composite_exception.cpp new file mode 100644 index 0000000..697b83f --- /dev/null +++ b/Rx/v2/examples/doxygen/composite_exception.cpp @@ -0,0 +1,31 @@ +#include "rxcpp/rx.hpp" +namespace rxu=rxcpp::util; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("composite_exception sample"){ + printf("//! [composite_exception sample]\n"); + auto o1 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source o1\n")); + auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source o2\n")); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.merge_delay_error(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr composite_e) { + printf("OnError %s\n", rxu::what(composite_e).c_str()); + try { std::rethrow_exception(composite_e); } + catch(rxcpp::composite_exception ce) { + for(std::exception_ptr particular_e : ce.exceptions) { + + try{ std::rethrow_exception(particular_e); } + catch(std::runtime_error error) { printf(" *** %s\n", error.what()); } + + } + } + }, + [](){printf("OnCompleted\n");} + ); + printf("//! [composite_exception sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/merge_delay_error.cpp b/Rx/v2/examples/doxygen/merge_delay_error.cpp new file mode 100644 index 0000000..8c28cd8 --- /dev/null +++ b/Rx/v2/examples/doxygen/merge_delay_error.cpp @@ -0,0 +1,97 @@ +#include "rxcpp/rx.hpp" +namespace rxu=rxcpp::util; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("merge_delay_error sample"){ + printf("//! [merge_delay_error sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source\n")); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.merge_delay_error(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr eptr) { printf("OnError %s\n", rxu::what(eptr).c_str()); }, + [](){printf("OnCompleted\n");}); + printf("//! [merge_delay_error sample]\n"); +} + +SCENARIO("implicit merge_delay_error sample"){ + printf("//! [implicit merge_delay_error sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source\n")); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); + auto values = base.merge(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr eptr) { printf("OnError %s\n", rxu::what(eptr).c_str()); }, + [](){printf("OnCompleted\n");}); + printf("//! [implicit merge_delay_error sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded merge_delay_error sample"){ + printf("//! [threaded merge_delay_error sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) { + std::stringstream ss; + ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n"; + printf("%s\n", ss.str().c_str()); + ss.str(std::string()); + ss << "(Error from thread: " << get_pid().c_str() << ")\n"; + return rxcpp::observable<>::error<int>(std::runtime_error(ss.str())); + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](std::exception_ptr eptr) { printf("[thread %s] OnError %s\n", get_pid().c_str(), rxu::what(eptr).c_str()); }, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded merge_delay_error sample]\n"); +} + +SCENARIO("threaded implicit merge_delay_error sample"){ + printf("//! [threaded implicit merge_delay_error sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) { + std::stringstream ss; + ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n"; + printf("%s\n", ss.str().c_str()); + ss.str(std::string()); + ss << "(Error from thread: " << get_pid().c_str() << ")\n"; + return rxcpp::observable<>::error<int>(std::runtime_error(ss.str())); + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); + auto values = base.merge(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](std::exception_ptr eptr) { printf("[thread %s] OnError %s\n", get_pid().c_str(), rxu::what(eptr).c_str()); }, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded implicit merge_delay_error sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp new file mode 100644 index 0000000..43fdee3 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp @@ -0,0 +1,304 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +/*! \file rx-merge_delay_error.hpp + + \brief For each given observable subscribe. + For each item emitted from all of the given observables, deliver from the new observable that is returned. + The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission. + + There are 2 variants of the operator: + - The source observable emits nested observables, nested observables are merged. + - The source observable and the arguments v0...vn are used to provide the observables to merge. + + \tparam Coordination the type of the scheduler (optional). + \tparam Value0 ... (optional). + \tparam ValueN types of source observables (optional). + + \param cn the scheduler to synchronize sources from different contexts (optional). + \param v0 ... (optional). + \param vn source observables (optional). + + \return Observable that emits items that are the result of flattening the observables emitted by the source observable. + + If scheduler is omitted, identity_current_thread is used. + + \sample + \snippet merge_delay_error.cpp threaded implicit merge sample + \snippet output.txt threaded implicit merge sample + + \sample + \snippet merge_delay_error.cpp implicit merge sample + \snippet output.txt implicit merge sample + + \sample + \snippet merge_delay_error.cpp merge sample + \snippet output.txt merge sample + + \sample + \snippet merge_delay_error.cpp threaded merge sample + \snippet output.txt threaded merge sample +*/ + +#if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP) +#define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP + +#include "rx-merge.hpp" + +#include "../rx-composite_exception.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Observable, class Coordination> +struct merge_delay_error + : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> +{ + //static_assert(is_observable<Observable>::value, "merge requires an observable"); + //static_assert(is_observable<T>::value, "merge requires an observable that contains observables"); + + typedef merge_delay_error<T, Observable, Coordination> this_type; + + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Observable> source_type; + + typedef typename source_type::source_operator_type source_operator_type; + typedef typename source_value_type::value_type value_type; + + typedef rxu::decay_t<Coordination> coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + + struct values + { + values(source_operator_type o, coordination_type sf) + : source_operator(std::move(o)) + , coordination(std::move(sf)) + { + } + source_operator_type source_operator; + coordination_type coordination; + }; + values initial; + + merge_delay_error(const source_type& o, coordination_type sf) + : initial(o.source_operator, std::move(sf)) + { + } + + template<class Subscriber> + void on_subscribe(Subscriber scbr) const { + static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); + + typedef Subscriber output_type; + + struct merge_state_type + : public std::enable_shared_from_this<merge_state_type> + , public values + { + merge_state_type(values i, coordinator_type coor, output_type oarg) + : values(i) + , source(i.source_operator) + , pendingCompletions(0) + , coordinator(std::move(coor)) + , out(std::move(oarg)) + { + } + observable<source_value_type, source_operator_type> source; + // on_completed on the output must wait until all the + // subscriptions have received on_completed + int pendingCompletions; + composite_exception exception;; + coordinator_type coordinator; + output_type out; + }; + + auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); + + // take a copy of the values for each subscription + auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr)); + + composite_subscription outercs; + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + state->out.add(outercs); + + auto source = on_exception( + [&](){return state->coordinator.in(state->source);}, + state->out); + if (source.empty()) { + return; + } + + ++state->pendingCompletions; + // this subscribe does not share the observer subscription + // so that when it is unsubscribed the observer can be called + // until the inner subscriptions have finished + auto sink = make_subscriber<source_value_type>( + state->out, + outercs, + // on_next + [state](source_value_type st) { + + composite_subscription innercs; + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + auto innercstoken = state->out.add(innercs); + + innercs.add(make_subscription([state, innercstoken](){ + state->out.remove(innercstoken); + })); + + auto selectedSource = state->coordinator.in(st); + + ++state->pendingCompletions; + // this subscribe does not share the source subscription + // so that when it is unsubscribed the source will continue + auto sinkInner = make_subscriber<value_type>( + state->out, + innercs, + // on_next + [state, st](value_type ct) { + state->out.on_next(std::move(ct)); + }, + // on_error + [state](std::exception_ptr e) { + if(--state->pendingCompletions == 0) { + state->out.on_error( + std::make_exception_ptr(std::move(state->exception.add(e)))); + } else { + state->exception.add(e); + } + }, + //on_completed + [state](){ + if (--state->pendingCompletions == 0) { + if(!state->exception.empty()) { + state->out.on_error( + std::make_exception_ptr(std::move(state->exception))); + } else { + state->out.on_completed(); + } + } + } + ); + + auto selectedSinkInner = state->coordinator.out(sinkInner); + selectedSource.subscribe(std::move(selectedSinkInner)); + }, + // on_error + [state](std::exception_ptr e) { + if(--state->pendingCompletions == 0) { + state->out.on_error( + std::make_exception_ptr(std::move(state->exception.add(e)))); + } else { + state->exception.add(e); + } + }, + // on_completed + [state]() { + if (--state->pendingCompletions == 0) { + if(!state->exception.empty()) { + state->out.on_error( + std::make_exception_ptr(std::move(state->exception))); + } else { + state->out.on_completed(); + } + } + } + ); + auto selectedSink = on_exception( + [&](){return state->coordinator.out(sink);}, + state->out); + if (selectedSink.empty()) { + return; + } + source->subscribe(std::move(selectedSink.get())); + } +}; + +} + +/*! @copydoc rx-merge-delay-error.hpp +*/ +template<class... AN> +auto merge_delay_error(AN&&... an) + -> operator_factory<merge_delay_error_tag, AN...> { + return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + +} + +template<> +struct member_overload<merge_delay_error_tag> +{ + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Merge> + > + static Result member(Observable&& o) { + return Result(Merge(std::forward<Observable>(o), identity_current_thread())); + } + + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Merge> + > + static Result member(Observable&& o, Coordination&& cn) { + return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn))); + } + + template<class Observable, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type, + class Value = rxu::value_type_t<Merge>, + class Result = observable<Value, Merge> + > + static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { + return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); + } + + template<class Observable, class Coordination, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>, + is_coordination<Coordination>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, + class Value = rxu::value_type_t<Merge>, + class Result = observable<Value, Merge> + > + static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { + return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::merge_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)"); + } +}; + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-composite_exception.hpp b/Rx/v2/src/rxcpp/rx-composite_exception.hpp new file mode 100644 index 0000000..333f291 --- /dev/null +++ b/Rx/v2/src/rxcpp/rx-composite_exception.hpp @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_SOURCES_RX_COMPOSITE_EXCEPTION_HPP) +#define RXCPP_SOURCES_RX_COMPOSITE_EXCEPTION_HPP + +#include "rx-includes.hpp" + +namespace rxcpp { + +struct composite_exception : std::exception { + + typedef std::vector<std::exception_ptr> exception_values; + + virtual const char *what() const RXCPP_NOEXCEPT override { + return "rxcpp composite exception"; + } + + virtual bool empty() const { + return exceptions.empty(); + } + + virtual composite_exception add(std::exception_ptr exception_ptr) { + exceptions.push_back(exception_ptr); + return *this; + } + + exception_values exceptions; +}; + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 3527f01..aefa190 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -125,6 +125,12 @@ #define _VARIADIC_MAX 10 #endif +#if defined(_MSC_VER) && (_MSC_VER <= 1800) +#define RXCPP_NOEXCEPT +#else +#define RXCPP_NOEXCEPT noexcept +#endif + #pragma push_macro("min") #pragma push_macro("max") #undef min @@ -203,6 +209,7 @@ #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" +#include "operators/rx-merge_delay_error.hpp" #include "operators/rx-observe_on.hpp" #include "operators/rx-on_error_resume_next.hpp" #include "operators/rx-pairwise.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index ef96d36..3a31240 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1023,6 +1023,17 @@ public: return observable_member(merge_tag{}, *this, std::forward<AN>(an)...); } + /*! @copydoc rx-merge_delay_error.hpp + */ + template<class... AN> + auto merge_delay_error(AN... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond + { + return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...); + } + /*! @copydoc rx-amb.hpp */ template<class... AN> diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 0fa5d58..15e8b54 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -258,6 +258,12 @@ struct merge_tag { static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-merge.hpp>"); }; }; +struct merge_delay_error_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-merge_delay_error.hpp>"); + }; +}; struct multicast_tag { template<class Included> diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 322872c..1064c0f 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -54,6 +54,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/lift.cpp ${TEST_DIR}/operators/map.cpp ${TEST_DIR}/operators/merge.cpp + ${TEST_DIR}/operators/merge_delay_error.cpp ${TEST_DIR}/operators/observe_on.cpp ${TEST_DIR}/operators/on_error_resume_next.cpp ${TEST_DIR}/operators/pairwise.cpp diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp new file mode 100644 index 0000000..d560b45 --- /dev/null +++ b/Rx/v2/test/operators/merge_delay_error.cpp @@ -0,0 +1,306 @@ +#include "../test.h" +#include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-merge_delay_error.hpp> +#include <rxcpp/operators/rx-observe_on.hpp> + +const int static_onnextcalls = 1000000; + +//merge_delay_error must work the very same way as `merge()` except the error handling + +SCENARIO("merge completes", "[merge][join][operators]"){ + GIVEN("1 hot observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(20, 102), + on.next(110, 103), + on.next(120, 104), + on.next(210, 105), + on.next(220, 106), + on.completed(230) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(10, 201), + on.next(20, 202), + on.next(30, 203), + on.next(40, 204), + on.completed(50) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(10, 301), + on.next(20, 302), + on.next(30, 303), + on.next(40, 304), + on.next(120, 305), + on.completed(150) + }); + + auto xs = sc.make_hot_observable({ + o_on.next(300, ys1), + o_on.next(400, ys2), + o_on.next(500, ys3), + o_on.completed(600) + }); + + WHEN("each int is merged"){ + + auto res = w.start( + [&]() { + return xs + | rxo::merge_delay_error() + // forget type to workaround lambda deduction bug on msvc 2013 + | rxo::as_dynamic(); + } + ); + + THEN("the output contains merged ints"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(320, 102), + on.next(410, 103), + on.next(410, 201), + on.next(420, 104), + on.next(420, 202), + on.next(430, 203), + on.next(440, 204), + on.next(510, 105), + on.next(510, 301), + on.next(520, 106), + on.next(520, 302), + on.next(530, 303), + on.next(540, 304), + on.next(620, 305), + on.completed(650) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 600) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 530) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(400, 450) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(500, 650) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ + GIVEN("1 hot observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(20, 102), + on.next(110, 103), + on.next(120, 104), + on.next(210, 105), + on.next(230, 107), + on.completed(240) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(10, 201), + on.next(20, 202), + on.next(30, 203), + on.error(40, std::runtime_error("merge_delay_error on_error from ys2")), + on.next(50, 205), + on.completed(60) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(10, 301), + on.next(20, 302), + on.next(30, 303), + on.next(40, 304), + on.next(120, 305), + on.completed(150) + }); + + WHEN("each int is merged"){ + + auto res = w.start( + [&]() { + return ys1 + .merge_delay_error(ys2, ys3); + } + ); + + rx::composite_exception ex; + THEN("the output contains merged ints"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(210, 201), + on.next(210, 301), + on.next(220, 102), + on.next(220, 202), + on.next(220, 302), + on.next(230, 203), + on.next(230, 303), + on.next(240, 304), + on.next(310, 103), + on.next(320, 104), + on.next(320, 305), + on.next(410, 105), + on.next(430, 107), + on.error(440, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 440) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 240) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 350) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){ + GIVEN("1 hot observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(20, 102), + on.next(110, 103), + on.next(120, 104), + on.next(210, 105), + on.error(220, std::runtime_error("merge_delay_error on_error from ys1")), + on.next(230, 107), + on.completed(240) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(10, 201), + on.next(20, 202), + on.next(30, 203), + on.error(40, std::runtime_error("merge_delay_error on_error from ys2")), + on.next(50, 205), + on.completed(60) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(10, 301), + on.next(20, 302), + on.next(30, 303), + on.next(40, 304), + on.next(120, 305), + on.completed(150) + }); + + WHEN("each int is merged"){ + + auto res = w.start( + [&]() { + return ys1 + .merge_delay_error(ys2, ys3); + } + ); + + rx::composite_exception ex; + THEN("the output contains merged ints"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(210, 201), + on.next(210, 301), + on.next(220, 102), + on.next(220, 202), + on.next(220, 302), + on.next(230, 203), + on.next(230, 303), + on.next(240, 304), + on.next(310, 103), + on.next(320, 104), + on.next(320, 305), + on.next(410, 105), + on.error(420, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 420) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 240) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 350) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} |