diff options
author | Diorcet Yann <diorcety@users.noreply.github.com> | 2018-07-10 16:37:21 +0200 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@gmail.com> | 2018-07-10 07:37:21 -0700 |
commit | 546e91b8b07edea7816afcf4dc638622534e8234 (patch) | |
tree | 92cff64aa669b0cd8532a9c6ecc0f5be11c8b3ac /Rx | |
parent | 0b93c186708460962d5f47414fe80b289a24bbe1 (diff) | |
download | RxCpp-546e91b8b07edea7816afcf4dc638622534e8234.tar.gz |
group_by support DurationSelector (#447)
* group_by support DurationSelector
* remove unused names
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 116 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 9 |
2 files changed, 92 insertions, 33 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index 4eadbab..8b451e5 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -9,6 +9,7 @@ \tparam KeySelector the type of the key extracting function \tparam MarbleSelector the type of the element extracting function \tparam BinaryPredicate the type of the key comparing function + \tparam DurationSelector the type of the duration observable function \param ks a function that extracts the key for each item (optional) \param ms a function that extracts the return element for each item (optional) @@ -63,7 +64,7 @@ struct is_group_by_selector_for { static const bool value = !std::is_same<type, tag_not_valid>::value; }; -template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> struct group_by_traits { typedef T source_value_type; @@ -71,6 +72,7 @@ struct group_by_traits typedef rxu::decay_t<KeySelector> key_selector_type; typedef rxu::decay_t<MarbleSelector> marble_selector_type; typedef rxu::decay_t<BinaryPredicate> predicate_type; + typedef rxu::decay_t<DurationSelector> duration_selector_type; static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); @@ -87,14 +89,15 @@ struct group_by_traits typedef grouped_observable<key_type, marble_type> grouped_observable_type; }; -template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> struct group_by { - typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; + typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type; typedef typename traits_type::key_selector_type key_selector_type; typedef typename traits_type::marble_selector_type marble_selector_type; typedef typename traits_type::marble_type marble_type; typedef typename traits_type::predicate_type predicate_type; + typedef typename traits_type::duration_selector_type duration_selector_type; typedef typename traits_type::subject_type subject_type; typedef typename traits_type::key_type key_type; @@ -130,21 +133,23 @@ struct group_by struct group_by_values { - group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } mutable key_selector_type keySelector; mutable marble_selector_type marbleSelector; mutable predicate_type predicate; + mutable duration_selector_type durationSelector; }; group_by_values initial; - group_by(key_selector_type ks, marble_selector_type ms, predicate_type p) - : initial(std::move(ks), std::move(ms), std::move(p)) + group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) + : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds)) { } @@ -206,7 +211,35 @@ struct group_by } auto sub = subject_type(); g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; - dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()))); + auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())); + auto durationObs = on_exception( + [&](){ + return this->durationSelector(obs);}, + [this](std::exception_ptr e){on_error(e);}); + if (durationObs.empty()) { + return; + } + + dest.on_next(obs); + composite_subscription duration_sub; + auto ssub = state->source_lifetime.add(duration_sub); + + auto expire_state = state; + auto expire_dest = g->second; + auto expire = [=]() { + auto g = expire_state->groups.find(selectedKey.get()); + if (g != expire_state->groups.end()) { + expire_state->groups.erase(g); + expire_dest.on_completed(); + } + expire_state->source_lifetime.remove(ssub); + }; + auto robs = durationObs.get().take(1); + duration_sub.add(robs.subscribe( + [](const typename decltype(robs)::value_type &){}, + [=](std::exception_ptr ) {expire();}, + [=](){expire();} + )); } auto selectedMarble = on_exception( [&](){ @@ -243,33 +276,36 @@ struct group_by } }; -template<class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> class group_by_factory { typedef rxu::decay_t<KeySelector> key_selector_type; typedef rxu::decay_t<MarbleSelector> marble_selector_type; typedef rxu::decay_t<BinaryPredicate> predicate_type; + typedef rxu::decay_t<DurationSelector> duration_selector_type; key_selector_type keySelector; marble_selector_type marbleSelector; predicate_type predicate; + duration_selector_type durationSelector; public: - group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } template<class Observable> struct group_by_factory_traits { typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type; - typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; - typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type; + typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type; + typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type; }; template<class Observable> auto operator()(Observable&& source) - -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) { - return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate))); + -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) { + return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector))); } }; @@ -288,61 +324,75 @@ auto group_by(AN&&... an) template<> struct member_overload<group_by_tag> { - template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector, + class SourceValue = rxu::value_type_t<Observable>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, + class Value = typename Traits::grouped_observable_type> + static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds) + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds))); + } + + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p))); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>())); } - template<class Observable, class KeySelector, class MarbleSelector, + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class Observable, class KeySelector, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class Observable, class KeySelector=rxu::detail::take_at<0>, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class Enabled = rxu::enable_if_all_true_type_t< all_observables<Observable>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o) - -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class... AN> static operators::detail::group_by_invalid_t<AN...> member(const AN&...) { std::terminate(); return {}; - static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool"); - } + static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable"); + } }; diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 97ff73f..d4d5536 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -416,6 +416,15 @@ struct less { return std::forward<LHS>(lhs) < std::forward<RHS>(rhs); } }; +template <class T> +struct ret +{ + template <class LHS> + auto operator()(LHS&& ) const + -> decltype(T()) + { return T(); } +}; + template<class T = void> struct equal_to { |