diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-concat_map.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat_map.hpp | 158 |
1 files changed, 118 insertions, 40 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp index d7b32c8..91ccfa5 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp @@ -2,6 +2,32 @@ #pragma once +/*! \file rx-concat_map.hpp + + \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type) + \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type) + \tparam Coordination the type of the scheduler (optional). + + \param s a function that returns an observable for each item emitted by the source observable. + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional). + \param cn the scheduler to synchronize sources from different contexts. (optional). + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. + + \sample + \snippet concat_map.cpp concat_map sample + \snippet output.txt concat_map sample + + \sample + \snippet concat_map.cpp threaded concat_map sample + \snippet output.txt threaded concat_map sample +*/ + #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP) #define RXCPP_OPERATORS_RX_CONCATMAP_HPP @@ -13,6 +39,16 @@ namespace operators { namespace detail { +template<class... AN> +struct concat_map_invalid_arguments {}; + +template<class... AN> +struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> { + using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>; +}; +template<class... AN> +using concat_map_invalid_t = typename concat_map_invalid<AN...>::type; + template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> struct concat_traits { typedef rxu::decay_t<Observable> source_type; @@ -233,54 +269,96 @@ private: concat_map& operator=(const concat_map&) RXCPP_DELETE; }; -template<class CollectionSelector, class ResultSelector, class Coordination> -class concat_map_factory -{ - typedef rxu::decay_t<CollectionSelector> collection_selector_type; - typedef rxu::decay_t<ResultSelector> result_selector_type; - typedef rxu::decay_t<Coordination> coordination_type; - - collection_selector_type selectorCollection; - result_selector_type selectorResult; - coordination_type coordination; -public: - concat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf) - : selectorCollection(std::move(s)) - , selectorResult(std::move(rs)) - , coordination(std::move(sf)) - { - } - - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>> { - return observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>( - concat_map<Observable, CollectionSelector, ResultSelector, Coordination>(std::forward<Observable>(source), selectorCollection, selectorResult, coordination)); - } -}; - } -template<class CollectionSelector, class ResultSelector, class Coordination> -auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) - -> detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination> { - return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)); +/*! @copydoc rx-concat_map.hpp +*/ +template<class... AN> +auto concat_map(AN&&... an) +-> operator_factory<concat_map_tag, AN...> { + return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } -template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type> -auto concat_map(CollectionSelector&& s, Coordination&& sf) - -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> { - return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf)); } -template<class CollectionSelector> -auto concat_map(CollectionSelector&& s) - -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> { - return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread()); -} +template<> +struct member_overload<concat_map_tag> +{ + template<class Observable, class CollectionSelector, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread())); + } + template<class Observable, class CollectionSelector, class Coordination, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + is_coordination<Coordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn))); + } -} + template<class Observable, class CollectionSelector, class ResultSelector, + class IsCoordination = is_coordination<ResultSelector>, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + rxu::negation<IsCoordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class ResultSelectorType = rxu::decay_t<ResultSelector>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); + } + + template<class Observable, class CollectionSelector, class ResultSelector, class Coordination, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + is_coordination<Coordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class ResultSelectorType = rxu::decay_t<ResultSelector>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::concat_map_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)"); + } +}; } |