summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-18 22:07:02 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-18 16:08:48 -0800
commit3ada27ecd97f762cb9b3465f1757fdda1b87b9f2 (patch)
treeb8733f4b6d864cc71fc017558e9d963356ac0a35 /Rx/v2/src
parentdda07ac5ef860f384463a3f6b27e4d5096a45532 (diff)
downloadRxCpp-3ada27ecd97f762cb9b3465f1757fdda1b87b9f2.tar.gz
decouple concat_map from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat_map.hpp158
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp57
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp10
4 files changed, 133 insertions, 93 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
index d7b32c8..91ccfa5 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
@@ -2,6 +2,32 @@
#pragma once
+/*! \file rx-concat_map.hpp
+
+ \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
+ \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param s a function that returns an observable for each item emitted by the source observable.
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
+ \param cn the scheduler to synchronize sources from different contexts. (optional).
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
+
+ \sample
+ \snippet concat_map.cpp concat_map sample
+ \snippet output.txt concat_map sample
+
+ \sample
+ \snippet concat_map.cpp threaded concat_map sample
+ \snippet output.txt threaded concat_map sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP)
#define RXCPP_OPERATORS_RX_CONCATMAP_HPP
@@ -13,6 +39,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct concat_map_invalid_arguments {};
+
+template<class... AN>
+struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
+ using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>;
+};
+template<class... AN>
+using concat_map_invalid_t = typename concat_map_invalid<AN...>::type;
+
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
struct concat_traits {
typedef rxu::decay_t<Observable> source_type;
@@ -233,54 +269,96 @@ private:
concat_map& operator=(const concat_map&) RXCPP_DELETE;
};
-template<class CollectionSelector, class ResultSelector, class Coordination>
-class concat_map_factory
-{
- typedef rxu::decay_t<CollectionSelector> collection_selector_type;
- typedef rxu::decay_t<ResultSelector> result_selector_type;
- typedef rxu::decay_t<Coordination> coordination_type;
-
- collection_selector_type selectorCollection;
- result_selector_type selectorResult;
- coordination_type coordination;
-public:
- concat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf)
- : selectorCollection(std::move(s))
- , selectorResult(std::move(rs))
- , coordination(std::move(sf))
- {
- }
-
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>> {
- return observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>(
- concat_map<Observable, CollectionSelector, ResultSelector, Coordination>(std::forward<Observable>(source), selectorCollection, selectorResult, coordination));
- }
-};
-
}
-template<class CollectionSelector, class ResultSelector, class Coordination>
-auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
- -> detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination> {
- return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
+/*! @copydoc rx-concat_map.hpp
+*/
+template<class... AN>
+auto concat_map(AN&&... an)
+-> operator_factory<concat_map_tag, AN...> {
+ return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
-auto concat_map(CollectionSelector&& s, Coordination&& sf)
- -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
- return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}
-template<class CollectionSelector>
-auto concat_map(CollectionSelector&& s)
- -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
- return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
-}
+template<>
+struct member_overload<concat_map_tag>
+{
+ template<class Observable, class CollectionSelector,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
+ }
+ template<class Observable, class CollectionSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
+ }
-}
+ template<class Observable, class CollectionSelector, class ResultSelector,
+ class IsCoordination = is_coordination<ResultSelector>,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ rxu::negation<IsCoordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
+ }
+
+ template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::concat_map_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 1d47997..ac70f5b 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -190,6 +190,7 @@
#include "operators/rx-buffer_time_count.hpp"
#include "operators/rx-combine_latest.hpp"
#include "operators/rx-concat.hpp"
+#include "operators/rx-concat_map.hpp"
#include "operators/rx-debounce.hpp"
#include "operators/rx-delay.hpp"
#include "operators/rx-distinct.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 9f90663..c035f91 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1139,60 +1139,15 @@ public:
return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
}
- /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
- For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
-
- \tparam CollectionSelector the type of the observable producing function
- \tparam ResultSelector the type of the aggregation function
-
- \param s a function that returns an observable for each item emitted by the source observable
- \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
-
- \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
-
- Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
-
- \sample
- \snippet concat_map.cpp concat_map sample
- \snippet output.txt concat_map sample
- */
- template<class CollectionSelector, class ResultSelector>
- auto concat_map(CollectionSelector&& s, ResultSelector&& rs) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>
- /// \endcond
- {
- return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>(
- rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
- }
-
- /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
- For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
-
- \tparam CollectionSelector the type of the observable producing function
- \tparam ResultSelector the type of the aggregation function
- \tparam Coordination the type of the scheduler
-
- \param s a function that returns an observable for each item emitted by the source observable
- \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
- \param cn the scheduler to synchronize sources from different contexts.
-
- \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
-
- Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
-
- \sample
- \snippet concat_map.cpp threaded concat_map sample
- \snippet output.txt threaded concat_map sample
- */
- template<class CollectionSelector, class ResultSelector, class Coordination>
- auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const
+ /*! @copydoc rx-concat_map.hpp
+ */
+ template<class... AN>
+ auto concat_map(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>
+ -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>(
- rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
+ return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-with_latest_from.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index e5a3eaf..5a70f26 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -95,7 +95,6 @@ public:
}
-#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-flat_map.hpp"
#include "operators/rx-lift.hpp"
@@ -169,7 +168,14 @@ struct concat_tag {
struct include_header{
static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat.hpp>");
};
-};
+};
+
+struct concat_map_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat_map.hpp>");
+ };
+};
struct debounce_tag {
template<class Included>