diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-08-02 11:35:04 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-08-02 11:35:04 -0700 |
commit | 3bf429778a08eda9498c61ec1daa3664d5f1bbfd (patch) | |
tree | 7618ab45b97da371d4eaa95fa37719889de1077a /Rx/v2/src | |
parent | fc4f3fa47e67e7b1d99dcec1a7092761426ab375 (diff) | |
download | RxCpp-3bf429778a08eda9498c61ec1daa3664d5f1bbfd.tar.gz |
added group_by
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-flat_map.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 220 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-grouped_observable.hpp | 203 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 9 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-predef.hpp | 37 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-subject.hpp | 4 |
8 files changed, 474 insertions, 3 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index 84aeb66..f8fd1d7 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -32,9 +32,7 @@ struct flat_map_traits { typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type; -//#if _MSC_VER >= 1900 static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable"); -//#endif typedef typename collection_type::value_type collection_value_type; diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp new file mode 100644 index 0000000..3982e8b --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -0,0 +1,220 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP) +#define RXCPP_OPERATORS_RX_GROUP_BY_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Selector> +struct is_group_by_selector_for { + + typedef typename std::decay<Selector>::type selector_type; + typedef T source_value_type; + + struct tag_not_valid {}; + template<class CV, class CS> + static auto check(int) -> decltype((*(CS*)nullptr)(*(CV*)nullptr)); + template<class CV, class CS> + static tag_not_valid check(...); + + typedef decltype(check<source_value_type, selector_type>(0)) type; + static const bool value = !std::is_same<type, tag_not_valid>::value; +}; + +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +struct group_by_traits +{ + typedef T source_value_type; + typedef typename std::decay<Observable>::type source_type; + typedef typename std::decay<KeySelector>::type key_selector_type; + typedef typename std::decay<MarbleSelector>::type marble_selector_type; + typedef typename std::decay<BinaryPredicate>::type predicate_type; + + static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); + + typedef typename is_group_by_selector_for<source_value_type, key_selector_type>::type key_type; + + static_assert(is_group_by_selector_for<source_value_type, marble_selector_type>::value, "group_by MarbleSelector must be a function with the signature marble_type(source_value_type)"); + + typedef typename is_group_by_selector_for<source_value_type, marble_selector_type>::type marble_type; + + typedef rxsub::subject<marble_type> subject_type; + + typedef std::map<key_type, typename subject_type::subscriber_type, predicate_type> key_subscriber_map_type; + + typedef grouped_observable<key_type, source_value_type> grouped_observable_type; +}; + +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +struct group_by +{ + typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; + typedef typename traits_type::key_selector_type key_selector_type; + typedef typename traits_type::marble_selector_type marble_selector_type; + typedef typename traits_type::predicate_type predicate_type; + typedef typename traits_type::subject_type subject_type; + typedef typename traits_type::key_type key_type; + + struct group_by_values + { + group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p) + : keySelector(std::move(ks)) + , marbleSelector(std::move(ms)) + , predicate(std::move(p)) + { + } + mutable key_selector_type keySelector; + mutable marble_selector_type marbleSelector; + mutable predicate_type predicate; + }; + + group_by_values initial; + + group_by(key_selector_type ks, marble_selector_type ms, predicate_type p) + : initial(std::move(ks), std::move(ms), std::move(p)) + { + } + + struct group_by_observable + { + subject_type subject; + key_type key; + + group_by_observable(subject_type s, key_type k) + : subject(std::move(s)) + , key(k) + { + } + + template<class Subscriber> + void on_subscribe(Subscriber&& o) const { + subject.get_observable().subscribe(std::forward<Subscriber>(o)); + } + + key_type on_get_key() { + return key; + } + }; + + template<class Subscriber> + struct group_by_observer : public group_by_values + { + typedef group_by_observer<Subscriber> this_type; + typedef typename traits_type::grouped_observable_type value_type; + typedef typename std::decay<Subscriber>::type dest_type; + typedef observer<T, this_type> observer_type; + dest_type dest; + + mutable typename traits_type::key_subscriber_map_type groups; + + group_by_observer(dest_type d, group_by_values v) + : group_by_values(v) + , dest(std::move(d)) + , groups(group_by_values::predicate) + { + } + void on_next(T v) const { + auto selectedKey = on_exception( + [&](){ + return this->keySelector(v);}, + *this); + if (selectedKey.empty()) { + return; + } + auto g = groups.find(selectedKey.get()); + if (g == groups.end()) { + auto sub = subject_type(); + g = groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; + dest.on_next(make_dynamic_grouped_observable<key_type, T>(group_by_observable(sub, selectedKey.get()))); + } + auto selectedMarble = on_exception( + [&](){ + return this->marbleSelector(v);}, + *this); + if (selectedMarble.empty()) { + return; + } + g->second.on_next(std::move(selectedMarble.get())); + } + void on_error(std::exception_ptr e) const { + (*this)(e); + } + void operator()(std::exception_ptr e) const { + for(auto& g : groups) { + g.second.on_error(e); + } + dest.on_error(e); + } + void on_completed() const { + for(auto& g : groups) { + g.second.on_completed(); + } + dest.on_completed(); + } + + static subscriber<T, observer_type> make(dest_type d, group_by_values v) { + auto cs = d.get_subscription(); + return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(group_by_observer<Subscriber>::make(std::move(dest), initial)) { + return group_by_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template<class KeySelector, class MarbleSelector, class BinaryPredicate> +class group_by_factory +{ + typedef typename std::decay<KeySelector>::type key_selector_type; + typedef typename std::decay<MarbleSelector>::type marble_selector_type; + typedef typename std::decay<BinaryPredicate>::type predicate_type; + key_selector_type keySelector; + marble_selector_type marbleSelector; + predicate_type predicate; +public: + group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p) + : keySelector(std::move(ks)) + , marbleSelector(std::move(ms)) + , predicate(std::move(p)) + { + } + template<class Observable> + struct group_by_factory_traits + { + typedef typename Observable::value_type value_type; + typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; + typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type; + }; + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) { + return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate))); + } +}; + +} + +template<class KeySelector, class MarbleSelector, class BinaryPredicate> +inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) + -> detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate> { + return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)); +} + + +} + +} + +#endif + diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp new file mode 100644 index 0000000..8dca40b --- /dev/null +++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp @@ -0,0 +1,203 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_RX_GROUPED_OBSERVABLE_HPP) +#define RXCPP_RX_GROUPED_OBSERVABLE_HPP + +#include "rx-includes.hpp" + +namespace rxcpp { + +namespace detail { + +template<class K, class Source> +struct has_on_get_key_for +{ + struct not_void {}; + template<class CS> + static auto check(int) -> decltype((*(CS*)nullptr).on_get_key()); + template<class CS> + static not_void check(...); + + typedef decltype(check<Source>(0)) detail_result; + static const bool value = std::is_same<detail_result, typename std::decay<K>::type>::value; +}; + +} + +template<class K, class T> +class dynamic_grouped_observable + : public rxs::source_base<T> +{ +public: + typedef typename std::decay<K>::type key_type; + typedef tag_dynamic_observable dynamic_observable_tag; + +private: + struct state_type + : public std::enable_shared_from_this<state_type> + { + typedef std::function<void(subscriber<T>)> onsubscribe_type; + typedef std::function<key_type()> ongetkey_type; + + onsubscribe_type on_subscribe; + ongetkey_type on_get_key; + }; + std::shared_ptr<state_type> state; + + template<class U> + void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) { + state = o.state; + } + + template<class U> + void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) { + state = std::move(o.state); + } + + template<class SO> + void construct(SO&& source, rxs::tag_source&&) { + auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source)); + state->on_subscribe = [so](subscriber<T> o) mutable { + so->on_subscribe(std::move(o)); + }; + state->on_get_key = [so]() mutable { + return so->on_get_key(); + }; + } + +public: + + dynamic_grouped_observable() + { + } + + template<class SOF> + explicit dynamic_grouped_observable(SOF&& sof) + : state(std::make_shared<state_type>()) + { + construct(std::forward<SOF>(sof), + typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); + } + + template<class SF, class CF> + dynamic_grouped_observable(SF&& sf, CF&& cf) + : state(std::make_shared<state_type>()) + { + state->on_subscribe = std::forward<SF>(sf); + state->on_connect = std::forward<CF>(cf); + } + + void on_subscribe(subscriber<T> o) const { + state->on_subscribe(std::move(o)); + } + + template<class Subscriber> + typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type + on_subscribe(Subscriber&& o) const { + auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); + state->on_subscribe( + make_subscriber<T>( + *so, + // on_next + [so](T t){ + so->on_next(t); + }, + // on_error + [so](std::exception_ptr e){ + so->on_error(e); + }, + // on_completed + [so](){ + so->on_completed(); + }). + as_dynamic()); + } + + key_type on_get_key() const { + return state->on_get_key(); + } +}; + +template<class K, class T, class Source> +grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s) { + return grouped_observable<K, T>(dynamic_grouped_observable<K, T>(std::forward<Source>(s))); +} + + + +template<class K, class T, class SourceOperator> +class grouped_observable + : public observable<T, SourceOperator> +{ + typedef grouped_observable<K, T, SourceOperator> this_type; + typedef observable<T, SourceOperator> base_type; + typedef typename std::decay<SourceOperator>::type source_operator_type; + + static_assert(detail::has_on_get_key_for<K, source_operator_type>::value, "inner must have on_get_key method key_type()"); + +public: + typedef typename std::decay<K>::type key_type; + typedef tag_grouped_observable observable_tag; + + grouped_observable() + { + } + + explicit grouped_observable(const SourceOperator& o) + : base_type(o) + { + } + explicit grouped_observable(SourceOperator&& o) + : base_type(std::move(o)) + { + } + + // implicit conversion between observables of the same value_type + template<class SO> + grouped_observable(const grouped_observable<K, T, SO>& o) + : base_type(o) + {} + // implicit conversion between observables of the same value_type + template<class SO> + grouped_observable(grouped_observable<K, T, SO>&& o) + : base_type(std::move(o)) + {} + + /// + /// performs type-forgetting conversion to a new grouped_observable + /// + grouped_observable<K, T> as_dynamic() const { + return *this; + } + + key_type get_key() const { + return base_type::source_operator.on_get_key(); + } +}; + + +} + +// +// support range() >> filter() >> subscribe() syntax +// '>>' is spelled 'stream' +// +template<class T, class SourceOperator, class OperatorFactory> +auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of) + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); +} + +// +// support range() | filter() | subscribe() syntax +// '|' is spelled 'pipe' +// +template<class T, class SourceOperator, class OperatorFactory> +auto operator | (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of) + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 86fd34a..5ca50e1 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -132,6 +132,7 @@ #include "rx-operators.hpp" #include "rx-observable.hpp" #include "rx-connectable_observable.hpp" +#include "rx-grouped_observable.hpp" #pragma pop_macro("min") #pragma pop_macro("max") diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 8297014..c93d408 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -662,6 +662,15 @@ public: return defer_combine_latest<Coordination, rxu::detail::pack, this_type, ObservableN...>::make(*this, std::move(cn), rxu::pack(), std::make_tuple(*this, on...)); } + /// group_by -> + /// + template<class KeySelector, class MarbleSelector, class BinaryPredicate> + inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) const + -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)))) { + return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p))); + } + + /// multicast -> /// allows connections to the source to be independent of subscriptions /// diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 1c43ff5..b62ddd6 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -43,6 +43,7 @@ namespace rxo=operators; #include "operators/rx-filter.hpp" #include "operators/rx-finally.hpp" #include "operators/rx-flat_map.hpp" +#include "operators/rx-group_by.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp index 1b82f40..bd94151 100644 --- a/Rx/v2/src/rxcpp/rx-predef.hpp +++ b/Rx/v2/src/rxcpp/rx-predef.hpp @@ -194,6 +194,43 @@ public: static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_connectable_observable>::value; }; +struct tag_dynamic_grouped_observable : public tag_dynamic_observable {}; + +template<class T> +class is_dynamic_grouped_observable +{ + struct not_void {}; + template<class C> + static typename C::dynamic_observable_tag* check(int); + template<class C> + static not_void check(...); +public: + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_dynamic_grouped_observable*>::value; +}; + +template<class K, class T> +class dynamic_grouped_observable; + +template<class K, class T, + class SourceObservable = typename std::conditional<std::is_same<T, void>::value, + void, dynamic_grouped_observable<K, T>>::type> +class grouped_observable; + +template<class K, class T, class Source> +grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s); + +struct tag_grouped_observable : public tag_observable {}; +template<class T> +class is_grouped_observable +{ + template<class C> + static typename C::observable_tag check(int); + template<class C> + static void check(...); +public: + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_grouped_observable>::value; +}; + // // this type is the default used by operators that subscribe to // multiple sources. It assumes that the sources are already synchronized diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index e07cc69..4ead228 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -214,6 +214,8 @@ class subject detail::multicast_observer<T> s; public: + typedef subscriber<T, observer<T, detail::multicast_observer<T>>> subscriber_type; + typedef observable<T> observable_type; subject() : s(composite_subscription()) { @@ -227,7 +229,7 @@ public: return s.has_observers(); } - subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const { + subscriber_type get_subscriber() const { return s.get_subscriber(); } |