summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-05-13 17:44:03 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-05-13 17:44:03 +0300
commita50b50463da715ece2f2d6102575f6e49a974223 (patch)
treeefc02c69af79d8581aa744bd85c57a3f7317f625
parent2ea2aede1135c44f587eeea2e7c0afc1f0601c2a (diff)
downloadRxCpp-a50b50463da715ece2f2d6102575f6e49a974223.tar.gz
Add amb operator including tests
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-amb.hpp207
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp67
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/operators/amb.cpp915
-rw-r--r--Rx/v2/test/operators/amb_variadic.cpp559
-rw-r--r--projects/CMake/CMakeLists.txt2
6 files changed, 1751 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-amb.hpp b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
new file mode 100644
index 0000000..8045f95
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
@@ -0,0 +1,207 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
+#define RXCPP_OPERATORS_RX_AMB_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Observable, class Coordination>
+struct amb
+ : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
+{
+ //static_assert(is_observable<Observable>::value, "amb requires an observable");
+ //static_assert(is_observable<T>::value, "amb requires an observable that contains observables");
+
+ typedef amb<T, Observable, Coordination> this_type;
+
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Observable> source_type;
+
+ typedef typename source_type::source_operator_type source_operator_type;
+ typedef typename source_value_type::value_type value_type;
+
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+
+ struct values
+ {
+ values(source_operator_type o, coordination_type sf)
+ : source_operator(std::move(o))
+ , coordination(std::move(sf))
+ {
+ }
+ source_operator_type source_operator;
+ coordination_type coordination;
+ };
+ values initial;
+
+ amb(const source_type& o, coordination_type sf)
+ : initial(o.source_operator, std::move(sf))
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(Subscriber scbr) const {
+ static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
+
+ typedef Subscriber output_type;
+
+ struct amb_state_type
+ : public std::enable_shared_from_this<amb_state_type>
+ , public values
+ {
+ amb_state_type(values i, coordinator_type coor, output_type oarg)
+ : values(i)
+ , source(i.source_operator)
+ , coordinator(std::move(coor))
+ , out(std::move(oarg))
+ , pendingObservables(0)
+ , firstEmitted(false)
+ {
+ }
+ observable<source_value_type, source_operator_type> source;
+ coordinator_type coordinator;
+ output_type out;
+ int pendingObservables;
+ bool firstEmitted;
+ std::vector<composite_subscription> innerSubscriptions;
+ };
+
+ auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
+
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
+
+ composite_subscription outercs;
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ state->out.add(outercs);
+
+ auto source = on_exception(
+ [&](){return state->coordinator.in(state->source);},
+ state->out);
+ if (source.empty()) {
+ return;
+ }
+
+ // this subscribe does not share the observer subscription
+ // so that when it is unsubscribed the observer can be called
+ // until the inner subscriptions have finished
+ auto sink = make_subscriber<source_value_type>(
+ state->out,
+ outercs,
+ // on_next
+ [state](source_value_type st) {
+
+ if (state->firstEmitted)
+ return;
+
+ composite_subscription innercs;
+
+ state->innerSubscriptions.push_back(innercs);
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ auto innercstoken = state->out.add(innercs);
+
+ innercs.add(make_subscription([state, innercstoken](){
+ state->out.remove(innercstoken);
+ }));
+
+ auto selectedSource = state->coordinator.in(st);
+
+ auto current_id = state->pendingObservables++;
+
+ // this subscribe does not share the source subscription
+ // so that when it is unsubscribed the source will continue
+ auto sinkInner = make_subscriber<value_type>(
+ state->out,
+ innercs,
+ // on_next
+ [state, st, current_id](value_type ct) {
+ state->out.on_next(std::move(ct));
+ if (!state->firstEmitted) {
+ state->firstEmitted = true;
+ auto do_unsubscribe = [](composite_subscription cs) {
+ cs.unsubscribe();
+ };
+ std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
+ std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
+ }
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->out.on_error(e);
+ },
+ //on_completed
+ [state](){
+ state->out.on_completed();
+ }
+ );
+
+ auto selectedSinkInner = state->coordinator.out(sinkInner);
+ selectedSource.subscribe(std::move(selectedSinkInner));
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->out.on_error(e);
+ },
+ // on_completed
+ [state]() {
+ if (state->pendingObservables == 0) {
+ state->out.on_completed();
+ }
+ }
+ );
+ auto selectedSink = on_exception(
+ [&](){return state->coordinator.out(sink);},
+ state->out);
+ if (selectedSink.empty()) {
+ return;
+ }
+ source->subscribe(std::move(selectedSink.get()));
+ }
+};
+
+template<class Coordination>
+class amb_factory
+{
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ coordination_type coordination;
+public:
+ amb_factory(coordination_type sf)
+ : coordination(std::move(sf))
+ {
+ }
+
+ template<class Observable>
+ auto operator()(Observable source)
+ -> observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>> {
+ return observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>>(
+ amb<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination));
+ }
+};
+
+}
+
+template<class Coordination>
+auto amb(Coordination&& sf)
+ -> detail::amb_factory<Coordination> {
+ return detail::amb_factory<Coordination>(std::forward<Coordination>(sf));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 8ad4e35..302159f 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -764,6 +764,73 @@ public:
return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
}
+ template<class Coordination>
+ struct defer_amb : public defer_observable<
+ is_observable<value_type>,
+ this_type,
+ rxo::detail::amb, value_type, observable<value_type>, Coordination>
+ {
+ };
+
+ /// amb ->
+ /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
+ /// for each item from this observable subscribe.
+ /// for each item from only the first of the nested observables deliver from the new observable that is returned.
+ ///
+ auto amb() const
+ -> typename defer_amb<identity_one_worker>::observable_type {
+ return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread());
+ }
+
+ /// amb ->
+ /// The coordination is used to synchronize sources from different contexts.
+ /// for each item from this observable subscribe.
+ /// for each item from only the first of the nested observables deliver from the new observable that is returned.
+ ///
+ template<class Coordination>
+ auto amb(Coordination cn) const
+ -> typename std::enable_if<
+ defer_amb<Coordination>::value,
+ typename defer_amb<Coordination>::observable_type>::type {
+ return defer_amb<Coordination>::make(*this, *this, std::move(cn));
+ }
+
+ template<class Coordination, class Value0>
+ struct defer_amb_from : public defer_observable<
+ rxu::all_true<
+ is_coordination<Coordination>::value,
+ is_observable<Value0>::value>,
+ this_type,
+ rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination>
+ {
+ };
+
+ /// amb ->
+ /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
+ /// for each item from this observable subscribe.
+ /// for each item from only the first of the nested observables deliver from the new observable that is returned.
+ ///
+ template<class Value0, class... ValueN>
+ auto amb(Value0 v0, ValueN... vn) const
+ -> typename std::enable_if<
+ defer_amb_from<identity_one_worker, Value0>::value,
+ typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type {
+ return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
+ }
+
+ /// amb ->
+ /// The coordination is used to synchronize sources from different contexts.
+ /// for each item from this observable subscribe.
+ /// for each item from only the first of the nested observables deliver from the new observable that is returned.
+ ///
+ template<class Coordination, class Value0, class... ValueN>
+ auto amb(Coordination cn, Value0 v0, ValueN... vn) const
+ -> typename std::enable_if<
+ defer_amb_from<Coordination, Value0>::value,
+ typename defer_amb_from<Coordination, Value0>::observable_type>::type {
+ return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
+ }
+
/// flat_map (AKA SelectMany) ->
/// All sources must be synchronized! This means that calls across all the subscribers must be serial.
/// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 9d9aec8..894f091 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -34,6 +34,7 @@ namespace rxo=operators;
}
+#include "operators/rx-amb.hpp"
#include "operators/rx-buffer_count.hpp"
#include "operators/rx-buffer_time.hpp"
#include "operators/rx-buffer_time_count.hpp"
diff --git a/Rx/v2/test/operators/amb.cpp b/Rx/v2/test/operators/amb.cpp
new file mode 100644
index 0000000..79e1f7f
--- /dev/null
+++ b/Rx/v2/test/operators/amb.cpp
@@ -0,0 +1,915 @@
+#include "rxcpp/rx.hpp"
+namespace rx=rxcpp;
+namespace rxu=rxcpp::util;
+namespace rxs=rxcpp::sources;
+namespace rxsc=rxcpp::schedulers;
+namespace rxn=rx::notifications;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("amb never 3", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2)
+ });
+
+ auto ys3 = sc.make_hot_observable({
+ on.next(120, 3)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(200)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 1000)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 1000)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 1000)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb never empty", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 2 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2),
+ on.completed(400)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.completed(150)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only complete message"){
+ auto required = rxu::to_vector({
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 350)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 400)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 400)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb completes", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(100)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(410, 102),
+ on.next(510, 103),
+ on.completed(610)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 610)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb winner throws", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.error(310, ex)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(100)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(410, 102),
+ on.next(510, 103),
+ on.error(610, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 610)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb loser throws", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.error(20, ex)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(100)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(410, 102),
+ on.next(510, 103),
+ on.completed(610)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 610)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb throws before selection", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(110, 1),
+ on.completed(200)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.error(50, ex)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(130, 3),
+ on.completed(300)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(100)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only an error"){
+ auto required = rxu::to_vector({
+ on.error(350, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb throws before selection and emission end", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(110, 1),
+ on.completed(200)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.error(50, ex)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(130, 3),
+ on.completed(300)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.completed(500)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only an error"){
+ auto required = rxu::to_vector({
+ on.error(350, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 350)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 350)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb loser comes when winner has already emitted", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(200, ys3),
+ o_on.completed(200)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(410, 102),
+ on.next(510, 103),
+ on.completed(610)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 610)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were no subscriptions to the ys3"){
+ auto required = std::vector<rxn::subscription>();
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb empty list", "[amb][join][operators]"){
+ GIVEN("1 empty cold observable of observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto xs = sc.make_cold_observable({
+ o_on.completed(200)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only comlpete message"){
+ auto required = rxu::to_vector({
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb source throws before selection", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.error(100, ex)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only an error"){
+ auto required = rxu::to_vector({
+ on.error(300, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 300)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 300)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 300)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("amb source throws after selection", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.next(100, ys3),
+ o_on.error(300, ex)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(310, 101),
+ on.next(410, 102),
+ on.error(500, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 500)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 500)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(300, 310)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
diff --git a/Rx/v2/test/operators/amb_variadic.cpp b/Rx/v2/test/operators/amb_variadic.cpp
new file mode 100644
index 0000000..038cc5a
--- /dev/null
+++ b/Rx/v2/test/operators/amb_variadic.cpp
@@ -0,0 +1,559 @@
+#include "rxcpp/rx.hpp"
+namespace rx=rxcpp;
+namespace rxu=rxcpp::util;
+namespace rxs=rxcpp::sources;
+namespace rxsc=rxcpp::schedulers;
+namespace rxn=rx::notifications;
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("variadic amb never 3", "[amb][join][operators]"){
+ GIVEN("3 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2)
+ });
+
+ auto ys3 = sc.make_hot_observable({
+ on.next(120, 3)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb never empty", "[amb][join][operators]"){
+ GIVEN("2 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2),
+ on.completed(400)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only complete message"){
+ auto required = rxu::to_vector({
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb empty never", "[amb][join][operators]"){
+ GIVEN("2 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1),
+ on.completed(400)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only complete message"){
+ auto required = rxu::to_vector({
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb completes", "[amb][join][operators]"){
+ GIVEN("3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(310, 102),
+ on.next(410, 103),
+ on.completed(510)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 510)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb winner&owner throws", "[amb][join][operators]"){
+ GIVEN("3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.error(310, ex)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(310, 102),
+ on.next(410, 103),
+ on.error(510, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 510)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb winner&non-owner throws", "[amb][join][operators]"){
+ GIVEN("3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.error(310, ex)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.next(20, 201),
+ on.next(120, 202),
+ on.next(220, 203),
+ on.completed(320)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys2
+ .amb(ys1, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(310, 102),
+ on.next(410, 103),
+ on.error(510, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 510)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb loser&non-owner throws", "[amb][join][operators]"){
+ GIVEN("3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.error(20, ex)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys2
+ .amb(ys1, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(310, 102),
+ on.next(410, 103),
+ on.completed(510)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 510)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){
+ GIVEN("3 cold observables of ints."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("amb on_error from source");
+
+ auto ys1 = sc.make_cold_observable({
+ on.next(10, 101),
+ on.next(110, 102),
+ on.next(210, 103),
+ on.completed(310)
+ });
+
+ auto ys2 = sc.make_cold_observable({
+ on.error(20, ex)
+ });
+
+ auto ys3 = sc.make_cold_observable({
+ on.next(30, 301),
+ on.next(130, 302),
+ on.next(230, 303),
+ on.completed(330)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(ys2, ys3)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints from the first observable"){
+ auto required = rxu::to_vector({
+ on.next(210, 101),
+ on.next(310, 102),
+ on.next(410, 103),
+ on.completed(510)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys1"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 510)
+ });
+ auto actual = ys1.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys2"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys2.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ys3"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = ys3.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 2a0110f..0515b55 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -49,6 +49,8 @@ set(TEST_SOURCES
${TEST_DIR}/sources/interval.cpp
${TEST_DIR}/sources/scope.cpp
${TEST_DIR}/sources/timer.cpp
+ ${TEST_DIR}/operators/amb.cpp
+ ${TEST_DIR}/operators/amb_variadic.cpp
${TEST_DIR}/operators/buffer.cpp
${TEST_DIR}/operators/combine_latest.1.cpp
${TEST_DIR}/operators/combine_latest.2.cpp