summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
diff options
context:
space:
mode:
authorDiorcet Yann <diorcety@users.noreply.github.com>2018-07-10 16:37:21 +0200
committerKirk Shoop <kirk.shoop@gmail.com>2018-07-10 07:37:21 -0700
commit546e91b8b07edea7816afcf4dc638622534e8234 (patch)
tree92cff64aa669b0cd8532a9c6ecc0f5be11c8b3ac /Rx/v2/src/rxcpp/operators/rx-group_by.hpp
parent0b93c186708460962d5f47414fe80b289a24bbe1 (diff)
downloadRxCpp-546e91b8b07edea7816afcf4dc638622534e8234.tar.gz
group_by support DurationSelector (#447)
* group_by support DurationSelector * remove unused names
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-group_by.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp116
1 files changed, 83 insertions, 33 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index 4eadbab..8b451e5 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -9,6 +9,7 @@
\tparam KeySelector the type of the key extracting function
\tparam MarbleSelector the type of the element extracting function
\tparam BinaryPredicate the type of the key comparing function
+ \tparam DurationSelector the type of the duration observable function
\param ks a function that extracts the key for each item (optional)
\param ms a function that extracts the return element for each item (optional)
@@ -63,7 +64,7 @@ struct is_group_by_selector_for {
static const bool value = !std::is_same<type, tag_not_valid>::value;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by_traits
{
typedef T source_value_type;
@@ -71,6 +72,7 @@ struct group_by_traits
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
@@ -87,14 +89,15 @@ struct group_by_traits
typedef grouped_observable<key_type, marble_type> grouped_observable_type;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by
{
- typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+ typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
typedef typename traits_type::key_selector_type key_selector_type;
typedef typename traits_type::marble_selector_type marble_selector_type;
typedef typename traits_type::marble_type marble_type;
typedef typename traits_type::predicate_type predicate_type;
+ typedef typename traits_type::duration_selector_type duration_selector_type;
typedef typename traits_type::subject_type subject_type;
typedef typename traits_type::key_type key_type;
@@ -130,21 +133,23 @@ struct group_by
struct group_by_values
{
- group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
mutable key_selector_type keySelector;
mutable marble_selector_type marbleSelector;
mutable predicate_type predicate;
+ mutable duration_selector_type durationSelector;
};
group_by_values initial;
- group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
- : initial(std::move(ks), std::move(ms), std::move(p))
+ group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
+ : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds))
{
}
@@ -206,7 +211,35 @@ struct group_by
}
auto sub = subject_type();
g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
- dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())));
+ auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()));
+ auto durationObs = on_exception(
+ [&](){
+ return this->durationSelector(obs);},
+ [this](std::exception_ptr e){on_error(e);});
+ if (durationObs.empty()) {
+ return;
+ }
+
+ dest.on_next(obs);
+ composite_subscription duration_sub;
+ auto ssub = state->source_lifetime.add(duration_sub);
+
+ auto expire_state = state;
+ auto expire_dest = g->second;
+ auto expire = [=]() {
+ auto g = expire_state->groups.find(selectedKey.get());
+ if (g != expire_state->groups.end()) {
+ expire_state->groups.erase(g);
+ expire_dest.on_completed();
+ }
+ expire_state->source_lifetime.remove(ssub);
+ };
+ auto robs = durationObs.get().take(1);
+ duration_sub.add(robs.subscribe(
+ [](const typename decltype(robs)::value_type &){},
+ [=](std::exception_ptr ) {expire();},
+ [=](){expire();}
+ ));
}
auto selectedMarble = on_exception(
[&](){
@@ -243,33 +276,36 @@ struct group_by
}
};
-template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
class group_by_factory
{
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
key_selector_type keySelector;
marble_selector_type marbleSelector;
predicate_type predicate;
+ duration_selector_type durationSelector;
public:
- group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
template<class Observable>
struct group_by_factory_traits
{
typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
- typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
- typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
+ typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
+ typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type;
};
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
- return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
+ -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) {
+ return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)));
}
};
@@ -288,61 +324,75 @@ auto group_by(AN&&... an)
template<>
struct member_overload<group_by_tag>
{
- template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
+ class Value = typename Traits::grouped_observable_type>
+ static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds)
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)));
+ }
+
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
- template<class Observable, class KeySelector, class MarbleSelector,
+ template<class Observable, class KeySelector, class MarbleSelector,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable, class KeySelector,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable,
class KeySelector=rxu::detail::take_at<0>,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class Enabled = rxu::enable_if_all_true_type_t<
all_observables<Observable>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o)
- -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class... AN>
static operators::detail::group_by_invalid_t<AN...> member(const AN&...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool");
- }
+ static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable");
+ }
};