diff options
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-amb.hpp | 207 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 67 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/amb.cpp | 915 | ||||
-rw-r--r-- | Rx/v2/test/operators/amb_variadic.cpp | 559 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 2 |
6 files changed, 1751 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-amb.hpp b/Rx/v2/src/rxcpp/operators/rx-amb.hpp new file mode 100644 index 0000000..8045f95 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-amb.hpp @@ -0,0 +1,207 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_AMB_HPP) +#define RXCPP_OPERATORS_RX_AMB_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Observable, class Coordination> +struct amb + : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> +{ + //static_assert(is_observable<Observable>::value, "amb requires an observable"); + //static_assert(is_observable<T>::value, "amb requires an observable that contains observables"); + + typedef amb<T, Observable, Coordination> this_type; + + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Observable> source_type; + + typedef typename source_type::source_operator_type source_operator_type; + typedef typename source_value_type::value_type value_type; + + typedef rxu::decay_t<Coordination> coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + + struct values + { + values(source_operator_type o, coordination_type sf) + : source_operator(std::move(o)) + , coordination(std::move(sf)) + { + } + source_operator_type source_operator; + coordination_type coordination; + }; + values initial; + + amb(const source_type& o, coordination_type sf) + : initial(o.source_operator, std::move(sf)) + { + } + + template<class Subscriber> + void on_subscribe(Subscriber scbr) const { + static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); + + typedef Subscriber output_type; + + struct amb_state_type + : public std::enable_shared_from_this<amb_state_type> + , public values + { + amb_state_type(values i, coordinator_type coor, output_type oarg) + : values(i) + , source(i.source_operator) + , coordinator(std::move(coor)) + , out(std::move(oarg)) + , pendingObservables(0) + , firstEmitted(false) + { + } + observable<source_value_type, source_operator_type> source; + coordinator_type coordinator; + output_type out; + int pendingObservables; + bool firstEmitted; + std::vector<composite_subscription> innerSubscriptions; + }; + + auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); + + // take a copy of the values for each subscription + auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr)); + + composite_subscription outercs; + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + state->out.add(outercs); + + auto source = on_exception( + [&](){return state->coordinator.in(state->source);}, + state->out); + if (source.empty()) { + return; + } + + // this subscribe does not share the observer subscription + // so that when it is unsubscribed the observer can be called + // until the inner subscriptions have finished + auto sink = make_subscriber<source_value_type>( + state->out, + outercs, + // on_next + [state](source_value_type st) { + + if (state->firstEmitted) + return; + + composite_subscription innercs; + + state->innerSubscriptions.push_back(innercs); + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + auto innercstoken = state->out.add(innercs); + + innercs.add(make_subscription([state, innercstoken](){ + state->out.remove(innercstoken); + })); + + auto selectedSource = state->coordinator.in(st); + + auto current_id = state->pendingObservables++; + + // this subscribe does not share the source subscription + // so that when it is unsubscribed the source will continue + auto sinkInner = make_subscriber<value_type>( + state->out, + innercs, + // on_next + [state, st, current_id](value_type ct) { + state->out.on_next(std::move(ct)); + if (!state->firstEmitted) { + state->firstEmitted = true; + auto do_unsubscribe = [](composite_subscription cs) { + cs.unsubscribe(); + }; + std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe); + std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe); + } + }, + // on_error + [state](std::exception_ptr e) { + state->out.on_error(e); + }, + //on_completed + [state](){ + state->out.on_completed(); + } + ); + + auto selectedSinkInner = state->coordinator.out(sinkInner); + selectedSource.subscribe(std::move(selectedSinkInner)); + }, + // on_error + [state](std::exception_ptr e) { + state->out.on_error(e); + }, + // on_completed + [state]() { + if (state->pendingObservables == 0) { + state->out.on_completed(); + } + } + ); + auto selectedSink = on_exception( + [&](){return state->coordinator.out(sink);}, + state->out); + if (selectedSink.empty()) { + return; + } + source->subscribe(std::move(selectedSink.get())); + } +}; + +template<class Coordination> +class amb_factory +{ + typedef rxu::decay_t<Coordination> coordination_type; + + coordination_type coordination; +public: + amb_factory(coordination_type sf) + : coordination(std::move(sf)) + { + } + + template<class Observable> + auto operator()(Observable source) + -> observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>> { + return observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>>( + amb<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination)); + } +}; + +} + +template<class Coordination> +auto amb(Coordination&& sf) + -> detail::amb_factory<Coordination> { + return detail::amb_factory<Coordination>(std::forward<Coordination>(sf)); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 8ad4e35..302159f 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -764,6 +764,73 @@ public: return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); } + template<class Coordination> + struct defer_amb : public defer_observable< + is_observable<value_type>, + this_type, + rxo::detail::amb, value_type, observable<value_type>, Coordination> + { + }; + + /// amb -> + /// All sources must be synchronized! This means that calls across all the subscribers must be serial. + /// for each item from this observable subscribe. + /// for each item from only the first of the nested observables deliver from the new observable that is returned. + /// + auto amb() const + -> typename defer_amb<identity_one_worker>::observable_type { + return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread()); + } + + /// amb -> + /// The coordination is used to synchronize sources from different contexts. + /// for each item from this observable subscribe. + /// for each item from only the first of the nested observables deliver from the new observable that is returned. + /// + template<class Coordination> + auto amb(Coordination cn) const + -> typename std::enable_if< + defer_amb<Coordination>::value, + typename defer_amb<Coordination>::observable_type>::type { + return defer_amb<Coordination>::make(*this, *this, std::move(cn)); + } + + template<class Coordination, class Value0> + struct defer_amb_from : public defer_observable< + rxu::all_true< + is_coordination<Coordination>::value, + is_observable<Value0>::value>, + this_type, + rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination> + { + }; + + /// amb -> + /// All sources must be synchronized! This means that calls across all the subscribers must be serial. + /// for each item from this observable subscribe. + /// for each item from only the first of the nested observables deliver from the new observable that is returned. + /// + template<class Value0, class... ValueN> + auto amb(Value0 v0, ValueN... vn) const + -> typename std::enable_if< + defer_amb_from<identity_one_worker, Value0>::value, + typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type { + return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); + } + + /// amb -> + /// The coordination is used to synchronize sources from different contexts. + /// for each item from this observable subscribe. + /// for each item from only the first of the nested observables deliver from the new observable that is returned. + /// + template<class Coordination, class Value0, class... ValueN> + auto amb(Coordination cn, Value0 v0, ValueN... vn) const + -> typename std::enable_if< + defer_amb_from<Coordination, Value0>::value, + typename defer_amb_from<Coordination, Value0>::observable_type>::type { + return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); + } + /// flat_map (AKA SelectMany) -> /// All sources must be synchronized! This means that calls across all the subscribers must be serial. /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 9d9aec8..894f091 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -34,6 +34,7 @@ namespace rxo=operators; } +#include "operators/rx-amb.hpp" #include "operators/rx-buffer_count.hpp" #include "operators/rx-buffer_time.hpp" #include "operators/rx-buffer_time_count.hpp" diff --git a/Rx/v2/test/operators/amb.cpp b/Rx/v2/test/operators/amb.cpp new file mode 100644 index 0000000..79e1f7f --- /dev/null +++ b/Rx/v2/test/operators/amb.cpp @@ -0,0 +1,915 @@ +#include "rxcpp/rx.hpp" +namespace rx=rxcpp; +namespace rxu=rxcpp::util; +namespace rxs=rxcpp::sources; +namespace rxsc=rxcpp::schedulers; +namespace rxn=rx::notifications; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("amb never 3", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2) + }); + + auto ys3 = sc.make_hot_observable({ + on.next(120, 3) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(200) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 1000) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 1000) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 1000) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb never empty", "[amb][join][operators]"){ + GIVEN("1 cold observable with 2 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2), + on.completed(400) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.completed(150) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only complete message"){ + auto required = rxu::to_vector({ + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 350) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 400) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 400) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb completes", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(100) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(410, 102), + on.next(510, 103), + on.completed(610) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 610) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb winner throws", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.error(310, ex) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(100) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(410, 102), + on.next(510, 103), + on.error(610, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 610) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb loser throws", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.error(20, ex) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(100) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(410, 102), + on.next(510, 103), + on.completed(610) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 610) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb throws before selection", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(110, 1), + on.completed(200) + }); + + auto ys2 = sc.make_cold_observable({ + on.error(50, ex) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(130, 3), + on.completed(300) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(100) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only an error"){ + auto required = rxu::to_vector({ + on.error(350, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb throws before selection and emission end", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(110, 1), + on.completed(200) + }); + + auto ys2 = sc.make_cold_observable({ + on.error(50, ex) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(130, 3), + on.completed(300) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.completed(500) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only an error"){ + auto required = rxu::to_vector({ + on.error(350, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 350) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 350) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb loser comes when winner has already emitted", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(200, ys3), + o_on.completed(200) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(410, 102), + on.next(510, 103), + on.completed(610) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 610) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were no subscriptions to the ys3"){ + auto required = std::vector<rxn::subscription>(); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb empty list", "[amb][join][operators]"){ + GIVEN("1 empty cold observable of observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto xs = sc.make_cold_observable({ + o_on.completed(200) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only comlpete message"){ + auto required = rxu::to_vector({ + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb source throws before selection", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.error(100, ex) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only an error"){ + auto required = rxu::to_vector({ + on.error(300, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 300) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 300) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 300) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("amb source throws after selection", "[amb][join][operators]"){ + GIVEN("1 cold observable with 3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.next(100, ys3), + o_on.error(300, ex) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(310, 101), + on.next(410, 102), + on.error(500, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 500) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(300, 500) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(300, 310) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} diff --git a/Rx/v2/test/operators/amb_variadic.cpp b/Rx/v2/test/operators/amb_variadic.cpp new file mode 100644 index 0000000..038cc5a --- /dev/null +++ b/Rx/v2/test/operators/amb_variadic.cpp @@ -0,0 +1,559 @@ +#include "rxcpp/rx.hpp" +namespace rx=rxcpp; +namespace rxu=rxcpp::util; +namespace rxs=rxcpp::sources; +namespace rxsc=rxcpp::schedulers; +namespace rxn=rx::notifications; + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("variadic amb never 3", "[amb][join][operators]"){ + GIVEN("3 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2) + }); + + auto ys3 = sc.make_hot_observable({ + on.next(120, 3) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb never empty", "[amb][join][operators]"){ + GIVEN("2 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2), + on.completed(400) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only complete message"){ + auto required = rxu::to_vector({ + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb empty never", "[amb][join][operators]"){ + GIVEN("2 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1), + on.completed(400) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only complete message"){ + auto required = rxu::to_vector({ + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb completes", "[amb][join][operators]"){ + GIVEN("3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(310, 102), + on.next(410, 103), + on.completed(510) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 510) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb winner&owner throws", "[amb][join][operators]"){ + GIVEN("3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.error(310, ex) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(310, 102), + on.next(410, 103), + on.error(510, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 510) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb winner&non-owner throws", "[amb][join][operators]"){ + GIVEN("3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.error(310, ex) + }); + + auto ys2 = sc.make_cold_observable({ + on.next(20, 201), + on.next(120, 202), + on.next(220, 203), + on.completed(320) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys2 + .amb(ys1, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(310, 102), + on.next(410, 103), + on.error(510, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 510) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb loser&non-owner throws", "[amb][join][operators]"){ + GIVEN("3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.error(20, ex) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys2 + .amb(ys1, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(310, 102), + on.next(410, 103), + on.completed(510) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 510) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){ + GIVEN("3 cold observables of ints."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("amb on_error from source"); + + auto ys1 = sc.make_cold_observable({ + on.next(10, 101), + on.next(110, 102), + on.next(210, 103), + on.completed(310) + }); + + auto ys2 = sc.make_cold_observable({ + on.error(20, ex) + }); + + auto ys3 = sc.make_cold_observable({ + on.next(30, 301), + on.next(130, 302), + on.next(230, 303), + on.completed(330) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(ys2, ys3) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints from the first observable"){ + auto required = rxu::to_vector({ + on.next(210, 101), + on.next(310, 102), + on.next(410, 103), + on.completed(510) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys1"){ + auto required = rxu::to_vector({ + on.subscribe(200, 510) + }); + auto actual = ys1.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys2"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys2.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ys3"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = ys3.subscriptions(); + REQUIRE(required == actual); + } + } + } +} diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 2a0110f..0515b55 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -49,6 +49,8 @@ set(TEST_SOURCES ${TEST_DIR}/sources/interval.cpp ${TEST_DIR}/sources/scope.cpp ${TEST_DIR}/sources/timer.cpp + ${TEST_DIR}/operators/amb.cpp + ${TEST_DIR}/operators/amb_variadic.cpp ${TEST_DIR}/operators/buffer.cpp ${TEST_DIR}/operators/combine_latest.1.cpp ${TEST_DIR}/operators/combine_latest.2.cpp |