summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-concat_map.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat_map.hpp158
1 files changed, 118 insertions, 40 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
index d7b32c8..91ccfa5 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
@@ -2,6 +2,32 @@
#pragma once
+/*! \file rx-concat_map.hpp
+
+ \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
+ \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param s a function that returns an observable for each item emitted by the source observable.
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
+ \param cn the scheduler to synchronize sources from different contexts. (optional).
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
+
+ \sample
+ \snippet concat_map.cpp concat_map sample
+ \snippet output.txt concat_map sample
+
+ \sample
+ \snippet concat_map.cpp threaded concat_map sample
+ \snippet output.txt threaded concat_map sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP)
#define RXCPP_OPERATORS_RX_CONCATMAP_HPP
@@ -13,6 +39,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct concat_map_invalid_arguments {};
+
+template<class... AN>
+struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
+ using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>;
+};
+template<class... AN>
+using concat_map_invalid_t = typename concat_map_invalid<AN...>::type;
+
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
struct concat_traits {
typedef rxu::decay_t<Observable> source_type;
@@ -233,54 +269,96 @@ private:
concat_map& operator=(const concat_map&) RXCPP_DELETE;
};
-template<class CollectionSelector, class ResultSelector, class Coordination>
-class concat_map_factory
-{
- typedef rxu::decay_t<CollectionSelector> collection_selector_type;
- typedef rxu::decay_t<ResultSelector> result_selector_type;
- typedef rxu::decay_t<Coordination> coordination_type;
-
- collection_selector_type selectorCollection;
- result_selector_type selectorResult;
- coordination_type coordination;
-public:
- concat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf)
- : selectorCollection(std::move(s))
- , selectorResult(std::move(rs))
- , coordination(std::move(sf))
- {
- }
-
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>> {
- return observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>(
- concat_map<Observable, CollectionSelector, ResultSelector, Coordination>(std::forward<Observable>(source), selectorCollection, selectorResult, coordination));
- }
-};
-
}
-template<class CollectionSelector, class ResultSelector, class Coordination>
-auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
- -> detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination> {
- return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
+/*! @copydoc rx-concat_map.hpp
+*/
+template<class... AN>
+auto concat_map(AN&&... an)
+-> operator_factory<concat_map_tag, AN...> {
+ return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
-auto concat_map(CollectionSelector&& s, Coordination&& sf)
- -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
- return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}
-template<class CollectionSelector>
-auto concat_map(CollectionSelector&& s)
- -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
- return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
-}
+template<>
+struct member_overload<concat_map_tag>
+{
+ template<class Observable, class CollectionSelector,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
+ }
+ template<class Observable, class CollectionSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class ResultSelectorType = rxu::detail::take_at<1>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
+ }
-}
+ template<class Observable, class CollectionSelector, class ResultSelector,
+ class IsCoordination = is_coordination<ResultSelector>,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ rxu::negation<IsCoordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
+ }
+
+ template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
+ class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, CollectionType>,
+ is_coordination<Coordination>>,
+ class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
+ class CollectionValueType = rxu::value_type_t<CollectionType>,
+ class ResultSelectorType = rxu::decay_t<ResultSelector>,
+ class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
+ class Result = observable<Value, ConcatMap>
+ >
+ static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
+ return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::concat_map_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
+ }
+};
}