summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorMartin Kodovský <martin.kodovsky@gmail.com>2017-12-09 22:39:51 +0100
committerKirk Shoop <kirk.shoop@microsoft.com>2017-12-09 13:39:51 -0800
commit815e92158e3e0647b96d1331de1ecc5badcde3f8 (patch)
treea25f7e575d1d00c6433ab67388b5a0a3ed0b200c /Rx
parent1b2e0589f19cb34d8cd58803677701dcf2161876 (diff)
downloadRxCpp-815e92158e3e0647b96d1331de1ecc5badcde3f8.tar.gz
Add rx-merge-delay-error operator (#417)
* Add rx-merge-delay-error operator * fix of msvc2013 compilation * fix #417 comments Added RXCPP_NOEXCEPT macro; Added doxygen scenarios for composite_exception and merge_delay_error; Fixed composing exception in merge_delay_error operator; Modified test for merge_delay_operator * #417 fix composite_exception example * #417 fix merge_delay_error doxygen example * fix: samples add among others in project doxygen CMakeLists.txt * fix: composite_exception.cpp example
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/composite_exception.cpp31
-rw-r--r--Rx/v2/examples/doxygen/merge_delay_error.cpp97
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp304
-rw-r--r--Rx/v2/src/rxcpp/rx-composite_exception.hpp34
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp7
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp11
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp6
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/merge_delay_error.cpp306
9 files changed, 797 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/composite_exception.cpp b/Rx/v2/examples/doxygen/composite_exception.cpp
new file mode 100644
index 0000000..697b83f
--- /dev/null
+++ b/Rx/v2/examples/doxygen/composite_exception.cpp
@@ -0,0 +1,31 @@
+#include "rxcpp/rx.hpp"
+namespace rxu=rxcpp::util;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("composite_exception sample"){
+ printf("//! [composite_exception sample]\n");
+ auto o1 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source o1\n"));
+ auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source o2\n"));
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.merge_delay_error(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr composite_e) {
+ printf("OnError %s\n", rxu::what(composite_e).c_str());
+ try { std::rethrow_exception(composite_e); }
+ catch(rxcpp::composite_exception ce) {
+ for(std::exception_ptr particular_e : ce.exceptions) {
+
+ try{ std::rethrow_exception(particular_e); }
+ catch(std::runtime_error error) { printf(" *** %s\n", error.what()); }
+
+ }
+ }
+ },
+ [](){printf("OnCompleted\n");}
+ );
+ printf("//! [composite_exception sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/merge_delay_error.cpp b/Rx/v2/examples/doxygen/merge_delay_error.cpp
new file mode 100644
index 0000000..8c28cd8
--- /dev/null
+++ b/Rx/v2/examples/doxygen/merge_delay_error.cpp
@@ -0,0 +1,97 @@
+#include "rxcpp/rx.hpp"
+namespace rxu=rxcpp::util;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("merge_delay_error sample"){
+ printf("//! [merge_delay_error sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source\n"));
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.merge_delay_error(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr eptr) { printf("OnError %s\n", rxu::what(eptr).c_str()); },
+ [](){printf("OnCompleted\n");});
+ printf("//! [merge_delay_error sample]\n");
+}
+
+SCENARIO("implicit merge_delay_error sample"){
+ printf("//! [implicit merge_delay_error sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::error<int>(std::runtime_error("Error from source\n"));
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
+ auto values = base.merge();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr eptr) { printf("OnError %s\n", rxu::what(eptr).c_str()); },
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit merge_delay_error sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded merge_delay_error sample"){
+ printf("//! [threaded merge_delay_error sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) {
+ std::stringstream ss;
+ ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n";
+ printf("%s\n", ss.str().c_str());
+ ss.str(std::string());
+ ss << "(Error from thread: " << get_pid().c_str() << ")\n";
+ return rxcpp::observable<>::error<int>(std::runtime_error(ss.str()));
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](std::exception_ptr eptr) { printf("[thread %s] OnError %s\n", get_pid().c_str(), rxu::what(eptr).c_str()); },
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded merge_delay_error sample]\n");
+}
+
+SCENARIO("threaded implicit merge_delay_error sample"){
+ printf("//! [threaded implicit merge_delay_error sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) {
+ std::stringstream ss;
+ ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n";
+ printf("%s\n", ss.str().c_str());
+ ss.str(std::string());
+ ss << "(Error from thread: " << get_pid().c_str() << ")\n";
+ return rxcpp::observable<>::error<int>(std::runtime_error(ss.str()));
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
+ auto values = base.merge(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](std::exception_ptr eptr) { printf("[thread %s] OnError %s\n", get_pid().c_str(), rxu::what(eptr).c_str()); },
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded implicit merge_delay_error sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp
new file mode 100644
index 0000000..43fdee3
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp
@@ -0,0 +1,304 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+/*! \file rx-merge_delay_error.hpp
+
+ \brief For each given observable subscribe.
+ For each item emitted from all of the given observables, deliver from the new observable that is returned.
+ The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
+
+ There are 2 variants of the operator:
+ - The source observable emits nested observables, nested observables are merged.
+ - The source observable and the arguments v0...vn are used to provide the observables to merge.
+
+ \tparam Coordination the type of the scheduler (optional).
+ \tparam Value0 ... (optional).
+ \tparam ValueN types of source observables (optional).
+
+ \param cn the scheduler to synchronize sources from different contexts (optional).
+ \param v0 ... (optional).
+ \param vn source observables (optional).
+
+ \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
+
+ If scheduler is omitted, identity_current_thread is used.
+
+ \sample
+ \snippet merge_delay_error.cpp threaded implicit merge sample
+ \snippet output.txt threaded implicit merge sample
+
+ \sample
+ \snippet merge_delay_error.cpp implicit merge sample
+ \snippet output.txt implicit merge sample
+
+ \sample
+ \snippet merge_delay_error.cpp merge sample
+ \snippet output.txt merge sample
+
+ \sample
+ \snippet merge_delay_error.cpp threaded merge sample
+ \snippet output.txt threaded merge sample
+*/
+
+#if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP)
+#define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP
+
+#include "rx-merge.hpp"
+
+#include "../rx-composite_exception.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Observable, class Coordination>
+struct merge_delay_error
+ : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
+{
+ //static_assert(is_observable<Observable>::value, "merge requires an observable");
+ //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
+
+ typedef merge_delay_error<T, Observable, Coordination> this_type;
+
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Observable> source_type;
+
+ typedef typename source_type::source_operator_type source_operator_type;
+ typedef typename source_value_type::value_type value_type;
+
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+
+ struct values
+ {
+ values(source_operator_type o, coordination_type sf)
+ : source_operator(std::move(o))
+ , coordination(std::move(sf))
+ {
+ }
+ source_operator_type source_operator;
+ coordination_type coordination;
+ };
+ values initial;
+
+ merge_delay_error(const source_type& o, coordination_type sf)
+ : initial(o.source_operator, std::move(sf))
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(Subscriber scbr) const {
+ static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
+
+ typedef Subscriber output_type;
+
+ struct merge_state_type
+ : public std::enable_shared_from_this<merge_state_type>
+ , public values
+ {
+ merge_state_type(values i, coordinator_type coor, output_type oarg)
+ : values(i)
+ , source(i.source_operator)
+ , pendingCompletions(0)
+ , coordinator(std::move(coor))
+ , out(std::move(oarg))
+ {
+ }
+ observable<source_value_type, source_operator_type> source;
+ // on_completed on the output must wait until all the
+ // subscriptions have received on_completed
+ int pendingCompletions;
+ composite_exception exception;;
+ coordinator_type coordinator;
+ output_type out;
+ };
+
+ auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
+
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
+
+ composite_subscription outercs;
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ state->out.add(outercs);
+
+ auto source = on_exception(
+ [&](){return state->coordinator.in(state->source);},
+ state->out);
+ if (source.empty()) {
+ return;
+ }
+
+ ++state->pendingCompletions;
+ // this subscribe does not share the observer subscription
+ // so that when it is unsubscribed the observer can be called
+ // until the inner subscriptions have finished
+ auto sink = make_subscriber<source_value_type>(
+ state->out,
+ outercs,
+ // on_next
+ [state](source_value_type st) {
+
+ composite_subscription innercs;
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ auto innercstoken = state->out.add(innercs);
+
+ innercs.add(make_subscription([state, innercstoken](){
+ state->out.remove(innercstoken);
+ }));
+
+ auto selectedSource = state->coordinator.in(st);
+
+ ++state->pendingCompletions;
+ // this subscribe does not share the source subscription
+ // so that when it is unsubscribed the source will continue
+ auto sinkInner = make_subscriber<value_type>(
+ state->out,
+ innercs,
+ // on_next
+ [state, st](value_type ct) {
+ state->out.on_next(std::move(ct));
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ if(--state->pendingCompletions == 0) {
+ state->out.on_error(
+ std::make_exception_ptr(std::move(state->exception.add(e))));
+ } else {
+ state->exception.add(e);
+ }
+ },
+ //on_completed
+ [state](){
+ if (--state->pendingCompletions == 0) {
+ if(!state->exception.empty()) {
+ state->out.on_error(
+ std::make_exception_ptr(std::move(state->exception)));
+ } else {
+ state->out.on_completed();
+ }
+ }
+ }
+ );
+
+ auto selectedSinkInner = state->coordinator.out(sinkInner);
+ selectedSource.subscribe(std::move(selectedSinkInner));
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ if(--state->pendingCompletions == 0) {
+ state->out.on_error(
+ std::make_exception_ptr(std::move(state->exception.add(e))));
+ } else {
+ state->exception.add(e);
+ }
+ },
+ // on_completed
+ [state]() {
+ if (--state->pendingCompletions == 0) {
+ if(!state->exception.empty()) {
+ state->out.on_error(
+ std::make_exception_ptr(std::move(state->exception)));
+ } else {
+ state->out.on_completed();
+ }
+ }
+ }
+ );
+ auto selectedSink = on_exception(
+ [&](){return state->coordinator.out(sink);},
+ state->out);
+ if (selectedSink.empty()) {
+ return;
+ }
+ source->subscribe(std::move(selectedSink.get()));
+ }
+};
+
+}
+
+/*! @copydoc rx-merge-delay-error.hpp
+*/
+template<class... AN>
+auto merge_delay_error(AN&&... an)
+ -> operator_factory<merge_delay_error_tag, AN...> {
+ return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
+}
+
+template<>
+struct member_overload<merge_delay_error_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Merge>
+ >
+ static Result member(Observable&& o) {
+ return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
+ }
+
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Merge>
+ >
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
+ }
+
+ template<class Observable, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type,
+ class Value = rxu::value_type_t<Merge>,
+ class Result = observable<Value, Merge>
+ >
+ static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
+ return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
+ }
+
+ template<class Observable, class Coordination, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>,
+ is_coordination<Coordination>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
+ class Value = rxu::value_type_t<Merge>,
+ class Result = observable<Value, Merge>
+ >
+ static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
+ return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::merge_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
+ }
+};
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-composite_exception.hpp b/Rx/v2/src/rxcpp/rx-composite_exception.hpp
new file mode 100644
index 0000000..333f291
--- /dev/null
+++ b/Rx/v2/src/rxcpp/rx-composite_exception.hpp
@@ -0,0 +1,34 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_SOURCES_RX_COMPOSITE_EXCEPTION_HPP)
+#define RXCPP_SOURCES_RX_COMPOSITE_EXCEPTION_HPP
+
+#include "rx-includes.hpp"
+
+namespace rxcpp {
+
+struct composite_exception : std::exception {
+
+ typedef std::vector<std::exception_ptr> exception_values;
+
+ virtual const char *what() const RXCPP_NOEXCEPT override {
+ return "rxcpp composite exception";
+ }
+
+ virtual bool empty() const {
+ return exceptions.empty();
+ }
+
+ virtual composite_exception add(std::exception_ptr exception_ptr) {
+ exceptions.push_back(exception_ptr);
+ return *this;
+ }
+
+ exception_values exceptions;
+};
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 3527f01..aefa190 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -125,6 +125,12 @@
#define _VARIADIC_MAX 10
#endif
+#if defined(_MSC_VER) && (_MSC_VER <= 1800)
+#define RXCPP_NOEXCEPT
+#else
+#define RXCPP_NOEXCEPT noexcept
+#endif
+
#pragma push_macro("min")
#pragma push_macro("max")
#undef min
@@ -203,6 +209,7 @@
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
+#include "operators/rx-merge_delay_error.hpp"
#include "operators/rx-observe_on.hpp"
#include "operators/rx-on_error_resume_next.hpp"
#include "operators/rx-pairwise.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index ef96d36..3a31240 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1023,6 +1023,17 @@ public:
return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
}
+ /*! @copydoc rx-merge_delay_error.hpp
+ */
+ template<class... AN>
+ auto merge_delay_error(AN... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
+ {
+ return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
+ }
+
/*! @copydoc rx-amb.hpp
*/
template<class... AN>
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 0fa5d58..15e8b54 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -258,6 +258,12 @@ struct merge_tag {
static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-merge.hpp>");
};
};
+struct merge_delay_error_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-merge_delay_error.hpp>");
+ };
+};
struct multicast_tag {
template<class Included>
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 322872c..1064c0f 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -54,6 +54,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/lift.cpp
${TEST_DIR}/operators/map.cpp
${TEST_DIR}/operators/merge.cpp
+ ${TEST_DIR}/operators/merge_delay_error.cpp
${TEST_DIR}/operators/observe_on.cpp
${TEST_DIR}/operators/on_error_resume_next.cpp
${TEST_DIR}/operators/pairwise.cpp
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
new file mode 100644
index 0000000..d560b45
--- /dev/null
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -0,0 +1,306 @@
+#include "../test.h"
+#include <rxcpp/operators/rx-reduce.hpp>
+#include <rxcpp/operators/rx-merge_delay_error.hpp>
+#include <rxcpp/operators/rx-observe_on.hpp>
+
+const int static_onnextcalls = 1000000;
+
+//merge_delay_error must work the very same way as `merge()` except the error handling
+
+SCENARIO("merge completes", "[merge][join][operators]"){
+ GIVEN("1 hot observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(20, 102),
+ on.next(110, 103),
+ on.next(120, 104),
+ on.next(210, 105),
+ on.next(220, 106),
+ on.completed(230)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(10, 201),
+ on.next(20, 202),
+ on.next(30, 203),
+ on.next(40, 204),
+ on.completed(50)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(10, 301),
+ on.next(20, 302),
+ on.next(30, 303),
+ on.next(40, 304),
+ on.next(120, 305),
+ on.completed(150)
+ });
+
+ auto xs = sc.make_hot_observable({
+ o_on.next(300, ys1),
+ o_on.next(400, ys2),
+ o_on.next(500, ys3),
+ o_on.completed(600)
+ });
+
+ WHEN("each int is merged"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ | rxo::merge_delay_error()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains merged ints"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(320, 102),
+ on.next(410, 103),
+ on.next(410, 201),
+ on.next(420, 104),
+ on.next(420, 202),
+ on.next(430, 203),
+ on.next(440, 204),
+ on.next(510, 105),
+ on.next(510, 301),
+ on.next(520, 106),
+ on.next(520, 302),
+ on.next(530, 303),
+ on.next(540, 304),
+ on.next(620, 305),
+ on.completed(650)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 600)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 530)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(400, 450)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(500, 650)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic merge completes with error", "[merge][join][operators]"){
+ GIVEN("1 hot observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(20, 102),
+ on.next(110, 103),
+ on.next(120, 104),
+ on.next(210, 105),
+ on.next(230, 107),
+ on.completed(240)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(10, 201),
+ on.next(20, 202),
+ on.next(30, 203),
+ on.error(40, std::runtime_error("merge_delay_error on_error from ys2")),
+ on.next(50, 205),
+ on.completed(60)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(10, 301),
+ on.next(20, 302),
+ on.next(30, 303),
+ on.next(40, 304),
+ on.next(120, 305),
+ on.completed(150)
+ });
+
+ WHEN("each int is merged"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .merge_delay_error(ys2, ys3);
+ }
+ );
+
+ rx::composite_exception ex;
+ THEN("the output contains merged ints"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(210, 201),
+ on.next(210, 301),
+ on.next(220, 102),
+ on.next(220, 202),
+ on.next(220, 302),
+ on.next(230, 203),
+ on.next(230, 303),
+ on.next(240, 304),
+ on.next(310, 103),
+ on.next(320, 104),
+ on.next(320, 305),
+ on.next(410, 105),
+ on.next(430, 107),
+ on.error(440, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 440)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 240)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 350)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){
+ GIVEN("1 hot observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(20, 102),
+ on.next(110, 103),
+ on.next(120, 104),
+ on.next(210, 105),
+ on.error(220, std::runtime_error("merge_delay_error on_error from ys1")),
+ on.next(230, 107),
+ on.completed(240)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(10, 201),
+ on.next(20, 202),
+ on.next(30, 203),
+ on.error(40, std::runtime_error("merge_delay_error on_error from ys2")),
+ on.next(50, 205),
+ on.completed(60)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(10, 301),
+ on.next(20, 302),
+ on.next(30, 303),
+ on.next(40, 304),
+ on.next(120, 305),
+ on.completed(150)
+ });
+
+ WHEN("each int is merged"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .merge_delay_error(ys2, ys3);
+ }
+ );
+
+ rx::composite_exception ex;
+ THEN("the output contains merged ints"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(210, 201),
+ on.next(210, 301),
+ on.next(220, 102),
+ on.next(220, 202),
+ on.next(220, 302),
+ on.next(230, 203),
+ on.next(230, 303),
+ on.next(240, 304),
+ on.next(310, 103),
+ on.next(320, 104),
+ on.next(320, 305),
+ on.next(410, 105),
+ on.error(420, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 420)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 240)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 350)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}