summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-19 23:00:10 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-19 15:56:31 -0800
commit1a5d65be7548069c4df12750f5d8f78bb852c6c2 (patch)
tree66fe94a4c36f2e6d1fcdb9b25c7dbf43244f03b7
parent3ada27ecd97f762cb9b3465f1757fdda1b87b9f2 (diff)
downloadRxCpp-1a5d65be7548069c4df12750f5d8f78bb852c6c2.tar.gz
decouple flat_map from observable
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp159
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp57
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/flat_map.cpp181
5 files changed, 306 insertions, 100 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index 7f5aae1..371d8fd 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -2,6 +2,32 @@
#pragma once
+/*! \file rx-flat_map.hpp
+
+ \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
+ \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param s a function that returns an observable for each item emitted by the source observable.
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
+ \param cn the scheduler to synchronize sources from different contexts (optional).
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.
+
+ \sample
+ \snippet flat_map.cpp flat_map sample
+ \snippet output.txt flat_map sample
+
+ \sample
+ \snippet flat_map.cpp threaded flat_map sample
+ \snippet output.txt threaded flat_map sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP)
#define RXCPP_OPERATORS_RX_FLATMAP_HPP
@@ -13,6 +39,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct flat_map_invalid_arguments {};
+
+template<class... AN>
+struct flat_map_invalid : public rxo::operator_base<flat_map_invalid_arguments<AN...>> {
+ using type = observable<flat_map_invalid_arguments<AN...>, flat_map_invalid<AN...>>;
+};
+template<class... AN>
+using flat_map_invalid_t = typename flat_map_invalid<AN...>::type;
+
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
struct flat_map_traits {
typedef rxu::decay_t<Observable> source_type;
@@ -200,53 +236,96 @@ struct flat_map
}
};
-template<class CollectionSelector, class ResultSelector, class Coordination>
-class flat_map_factory
-{
- typedef rxu::decay_t<CollectionSelector> collection_selector_type;
- typedef rxu::decay_t<ResultSelector> result_selector_type;
- typedef rxu::decay_t<Coordination> coordination_type;
-
- collection_selector_type selectorCollection;
- result_selector_type selectorResult;
- coordination_type coordination;
-public:
- flat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf)
- : selectorCollection(std::move(s))
- , selectorResult(std::move(rs))
- , coordination(std::move(sf))
- {
- }
-
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<flat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, flat_map<Observable, CollectionSelector, ResultSelector, Coordination>> {
- return observable<rxu::value_type_t<flat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, flat_map<Observable, CollectionSelector, ResultSelector, Coordination>>(
- flat_map<Observable, CollectionSelector, ResultSelector, Coordination>(std::forward<Observable>(source), selectorCollection, selectorResult, coordination));
- }
-};
-
}
-template<class CollectionSelector, class ResultSelector, class Coordination>
-auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
- -> detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination> {
- return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
+/*! @copydoc rx-flat_map.hpp
+*/
+template<class... AN>
+auto flat_map(AN&&... an)
+-> operator_factory<flat_map_tag, AN...> {
+ return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
-auto flat_map(CollectionSelector&& s, Coordination&& sf)
- -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
- return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}
-template<class CollectionSelector>
-auto flat_map(CollectionSelector&& s)
- -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
- return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
-}
+template<>
+struct member_overload<flat_map_tag>
+{
+ template<class Observable, class CollectionSelector,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>>,
+ class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, FlatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s) {
+ return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
+ }
-}
+ template<class Observable, class CollectionSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, FlatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
+ return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
+ }
+
+ template<class Observable, class CollectionSelector, class ResultSelector,
+ class IsCoordination = is_coordination<ResultSelector>,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ rxu::negation<IsCoordination>>,
+ class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, FlatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
+ return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
+ }
+
+ template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, FlatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
+ return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::flat_map_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index ac70f5b..bc2a435 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -198,6 +198,7 @@
#include "operators/rx-element_at.hpp"
#include "operators/rx-filter.hpp"
#include "operators/rx-finally.hpp"
+#include "operators/rx-flat_map.hpp"
#include "operators/rx-group_by.hpp"
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-map.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index c035f91..9afceae 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1072,60 +1072,15 @@ public:
return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
}
- /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
- For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
-
- \tparam CollectionSelector the type of the observable producing function
- \tparam ResultSelector the type of the aggregation function
-
- \param s a function that returns an observable for each item emitted by the source observable
- \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
-
- \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
-
- Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables.
-
- \sample
- \snippet flat_map.cpp flat_map sample
- \snippet output.txt flat_map sample
- */
- template<class CollectionSelector, class ResultSelector>
- auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>
- /// \endcond
- {
- return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>(
- rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
- }
-
- /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
- For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
-
- \tparam CollectionSelector the type of the observable producing function
- \tparam ResultSelector the type of the aggregation function
- \tparam Coordination the type of the scheduler
-
- \param s a function that returns an observable for each item emitted by the source observable
- \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
- \param cn the scheduler to synchronize sources from different contexts.
-
- \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
-
- Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables.
-
- \sample
- \snippet flat_map.cpp threaded flat_map sample
- \snippet output.txt threaded flat_map sample
- */
- template<class CollectionSelector, class ResultSelector, class Coordination>
- auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const
+ /*! @copydoc rx-flat_map.hpp
+ */
+ template<class... AN>
+ auto flat_map(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>
+ -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>(
- rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
+ return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-concat.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 5a70f26..9fb23de 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -96,7 +96,6 @@ public:
}
#include "operators/rx-connect_forever.hpp"
-#include "operators/rx-flat_map.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-multicast.hpp"
#include "operators/rx-observe_on.hpp"
@@ -226,6 +225,13 @@ struct finally_tag {
};
};
+struct flat_map_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-flat_map.hpp>");
+ };
+};
+
struct group_by_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 724df65..03cb36e 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -3,6 +3,7 @@
#include <rxcpp/operators/rx-filter.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-flat_map.hpp>
static const int static_tripletCount = 100;
@@ -259,13 +260,13 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
auto res = w.start(
[&]() {
return xs
- .flat_map(
+ | rxo::flat_map(
[&](int){
return ys;},
[](int, std::string s){
return s;})
// forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
+ | rxo::as_dynamic();
}
);
@@ -313,19 +314,19 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
}
}
- WHEN("streamed, each int is mapped to the strings"){
+ WHEN("each int is mapped to the strings with coordinator"){
auto res = w.start(
[&]() {
- return xs >>
- rxo::flat_map(
+ return xs
+ .flat_map(
[&](int){
return ys;},
[](int, std::string s){
return s;},
- rx::identity_current_thread()) >>
+ rx::identity_current_thread())
// forget type to workaround lambda deduction bug on msvc 2013
- rxo::as_dynamic();
+ .as_dynamic();
}
);
@@ -375,7 +376,6 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
}
}
-
SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
@@ -536,3 +536,168 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
}
}
}
+
+SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][operators]"){
+ GIVEN("two cold observables. one of ints. one of strings."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> i_on;
+ const rxsc::test::messages<std::string> s_on;
+
+ auto xs = sc.make_cold_observable({
+ i_on.next(100, 4),
+ i_on.next(200, 2),
+ i_on.next(300, 3),
+ i_on.next(400, 1),
+ i_on.completed(500)
+ });
+
+ auto ys = sc.make_cold_observable({
+ s_on.next(50, "foo"),
+ s_on.next(100, "bar"),
+ s_on.next(150, "baz"),
+ s_on.next(200, "qux"),
+ s_on.completed(250)
+ });
+
+ WHEN("each int is mapped to the strings"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .flat_map(
+ [&](int){
+ return ys;})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains strings repeated for each int"){
+ auto required = rxu::to_vector({
+ s_on.next(350, "foo"),
+ s_on.next(400, "bar"),
+ s_on.next(450, "baz"),
+ s_on.next(450, "foo"),
+ s_on.next(500, "qux"),
+ s_on.next(500, "bar"),
+ s_on.next(550, "baz"),
+ s_on.next(550, "foo"),
+ s_on.next(600, "qux"),
+ s_on.next(600, "bar"),
+ s_on.next(650, "baz"),
+ s_on.next(650, "foo"),
+ s_on.next(700, "qux"),
+ s_on.next(700, "bar"),
+ s_on.next(750, "baz"),
+ s_on.next(800, "qux"),
+ s_on.completed(850)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ints"){
+ auto required = rxu::to_vector({
+ i_on.subscribe(200, 700)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were four subscription and unsubscription to the strings"){
+ auto required = rxu::to_vector({
+ s_on.subscribe(300, 550),
+ s_on.subscribe(400, 650),
+ s_on.subscribe(500, 750),
+ s_on.subscribe(600, 850)
+ });
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][operators]"){
+ GIVEN("two cold observables. one of ints. one of strings."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> i_on;
+ const rxsc::test::messages<std::string> s_on;
+
+ auto xs = sc.make_cold_observable({
+ i_on.next(100, 4),
+ i_on.next(200, 2),
+ i_on.next(300, 3),
+ i_on.next(400, 1),
+ i_on.completed(500)
+ });
+
+ auto ys = sc.make_cold_observable({
+ s_on.next(50, "foo"),
+ s_on.next(100, "bar"),
+ s_on.next(150, "baz"),
+ s_on.next(200, "qux"),
+ s_on.completed(250)
+ });
+
+ WHEN("each int is mapped to the strings"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .flat_map(
+ [&](int){
+ return ys;},
+ rx::identity_current_thread())
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains strings repeated for each int"){
+ auto required = rxu::to_vector({
+ s_on.next(350, "foo"),
+ s_on.next(400, "bar"),
+ s_on.next(450, "baz"),
+ s_on.next(450, "foo"),
+ s_on.next(500, "qux"),
+ s_on.next(500, "bar"),
+ s_on.next(550, "baz"),
+ s_on.next(550, "foo"),
+ s_on.next(600, "qux"),
+ s_on.next(600, "bar"),
+ s_on.next(650, "baz"),
+ s_on.next(650, "foo"),
+ s_on.next(700, "qux"),
+ s_on.next(700, "bar"),
+ s_on.next(750, "baz"),
+ s_on.next(800, "qux"),
+ s_on.completed(850)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ints"){
+ auto required = rxu::to_vector({
+ i_on.subscribe(200, 700)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were four subscription and unsubscription to the strings"){
+ auto required = rxu::to_vector({
+ s_on.subscribe(300, 550),
+ s_on.subscribe(400, 650),
+ s_on.subscribe(500, 750),
+ s_on.subscribe(600, 850)
+ });
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+} \ No newline at end of file