summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-08-02 11:35:04 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-08-02 11:35:04 -0700
commit3bf429778a08eda9498c61ec1daa3664d5f1bbfd (patch)
tree7618ab45b97da371d4eaa95fa37719889de1077a /Rx/v2/src
parentfc4f3fa47e67e7b1d99dcec1a7092761426ab375 (diff)
downloadRxCpp-3bf429778a08eda9498c61ec1daa3664d5f1bbfd.tar.gz
added group_by
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp220
-rw-r--r--Rx/v2/src/rxcpp/rx-grouped_observable.hpp203
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp9
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-predef.hpp37
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp4
8 files changed, 474 insertions, 3 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index 84aeb66..f8fd1d7 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -32,9 +32,7 @@ struct flat_map_traits {
typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
-//#if _MSC_VER >= 1900
static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
-//#endif
typedef typename collection_type::value_type collection_value_type;
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
new file mode 100644
index 0000000..3982e8b
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -0,0 +1,220 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP)
+#define RXCPP_OPERATORS_RX_GROUP_BY_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Selector>
+struct is_group_by_selector_for {
+
+ typedef typename std::decay<Selector>::type selector_type;
+ typedef T source_value_type;
+
+ struct tag_not_valid {};
+ template<class CV, class CS>
+ static auto check(int) -> decltype((*(CS*)nullptr)(*(CV*)nullptr));
+ template<class CV, class CS>
+ static tag_not_valid check(...);
+
+ typedef decltype(check<source_value_type, selector_type>(0)) type;
+ static const bool value = !std::is_same<type, tag_not_valid>::value;
+};
+
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+struct group_by_traits
+{
+ typedef T source_value_type;
+ typedef typename std::decay<Observable>::type source_type;
+ typedef typename std::decay<KeySelector>::type key_selector_type;
+ typedef typename std::decay<MarbleSelector>::type marble_selector_type;
+ typedef typename std::decay<BinaryPredicate>::type predicate_type;
+
+ static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
+
+ typedef typename is_group_by_selector_for<source_value_type, key_selector_type>::type key_type;
+
+ static_assert(is_group_by_selector_for<source_value_type, marble_selector_type>::value, "group_by MarbleSelector must be a function with the signature marble_type(source_value_type)");
+
+ typedef typename is_group_by_selector_for<source_value_type, marble_selector_type>::type marble_type;
+
+ typedef rxsub::subject<marble_type> subject_type;
+
+ typedef std::map<key_type, typename subject_type::subscriber_type, predicate_type> key_subscriber_map_type;
+
+ typedef grouped_observable<key_type, source_value_type> grouped_observable_type;
+};
+
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+struct group_by
+{
+ typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+ typedef typename traits_type::key_selector_type key_selector_type;
+ typedef typename traits_type::marble_selector_type marble_selector_type;
+ typedef typename traits_type::predicate_type predicate_type;
+ typedef typename traits_type::subject_type subject_type;
+ typedef typename traits_type::key_type key_type;
+
+ struct group_by_values
+ {
+ group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ : keySelector(std::move(ks))
+ , marbleSelector(std::move(ms))
+ , predicate(std::move(p))
+ {
+ }
+ mutable key_selector_type keySelector;
+ mutable marble_selector_type marbleSelector;
+ mutable predicate_type predicate;
+ };
+
+ group_by_values initial;
+
+ group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ : initial(std::move(ks), std::move(ms), std::move(p))
+ {
+ }
+
+ struct group_by_observable
+ {
+ subject_type subject;
+ key_type key;
+
+ group_by_observable(subject_type s, key_type k)
+ : subject(std::move(s))
+ , key(k)
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(Subscriber&& o) const {
+ subject.get_observable().subscribe(std::forward<Subscriber>(o));
+ }
+
+ key_type on_get_key() {
+ return key;
+ }
+ };
+
+ template<class Subscriber>
+ struct group_by_observer : public group_by_values
+ {
+ typedef group_by_observer<Subscriber> this_type;
+ typedef typename traits_type::grouped_observable_type value_type;
+ typedef typename std::decay<Subscriber>::type dest_type;
+ typedef observer<T, this_type> observer_type;
+ dest_type dest;
+
+ mutable typename traits_type::key_subscriber_map_type groups;
+
+ group_by_observer(dest_type d, group_by_values v)
+ : group_by_values(v)
+ , dest(std::move(d))
+ , groups(group_by_values::predicate)
+ {
+ }
+ void on_next(T v) const {
+ auto selectedKey = on_exception(
+ [&](){
+ return this->keySelector(v);},
+ *this);
+ if (selectedKey.empty()) {
+ return;
+ }
+ auto g = groups.find(selectedKey.get());
+ if (g == groups.end()) {
+ auto sub = subject_type();
+ g = groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
+ dest.on_next(make_dynamic_grouped_observable<key_type, T>(group_by_observable(sub, selectedKey.get())));
+ }
+ auto selectedMarble = on_exception(
+ [&](){
+ return this->marbleSelector(v);},
+ *this);
+ if (selectedMarble.empty()) {
+ return;
+ }
+ g->second.on_next(std::move(selectedMarble.get()));
+ }
+ void on_error(std::exception_ptr e) const {
+ (*this)(e);
+ }
+ void operator()(std::exception_ptr e) const {
+ for(auto& g : groups) {
+ g.second.on_error(e);
+ }
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ for(auto& g : groups) {
+ g.second.on_completed();
+ }
+ dest.on_completed();
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, group_by_values v) {
+ auto cs = d.get_subscription();
+ return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(group_by_observer<Subscriber>::make(std::move(dest), initial)) {
+ return group_by_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+class group_by_factory
+{
+ typedef typename std::decay<KeySelector>::type key_selector_type;
+ typedef typename std::decay<MarbleSelector>::type marble_selector_type;
+ typedef typename std::decay<BinaryPredicate>::type predicate_type;
+ key_selector_type keySelector;
+ marble_selector_type marbleSelector;
+ predicate_type predicate;
+public:
+ group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ : keySelector(std::move(ks))
+ , marbleSelector(std::move(ms))
+ , predicate(std::move(p))
+ {
+ }
+ template<class Observable>
+ struct group_by_factory_traits
+ {
+ typedef typename Observable::value_type value_type;
+ typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+ typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
+ };
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
+ return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
+ }
+};
+
+}
+
+template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p)
+ -> detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate> {
+ return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p));
+}
+
+
+}
+
+}
+
+#endif
+
diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
new file mode 100644
index 0000000..8dca40b
--- /dev/null
+++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
@@ -0,0 +1,203 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_RX_GROUPED_OBSERVABLE_HPP)
+#define RXCPP_RX_GROUPED_OBSERVABLE_HPP
+
+#include "rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace detail {
+
+template<class K, class Source>
+struct has_on_get_key_for
+{
+ struct not_void {};
+ template<class CS>
+ static auto check(int) -> decltype((*(CS*)nullptr).on_get_key());
+ template<class CS>
+ static not_void check(...);
+
+ typedef decltype(check<Source>(0)) detail_result;
+ static const bool value = std::is_same<detail_result, typename std::decay<K>::type>::value;
+};
+
+}
+
+template<class K, class T>
+class dynamic_grouped_observable
+ : public rxs::source_base<T>
+{
+public:
+ typedef typename std::decay<K>::type key_type;
+ typedef tag_dynamic_observable dynamic_observable_tag;
+
+private:
+ struct state_type
+ : public std::enable_shared_from_this<state_type>
+ {
+ typedef std::function<void(subscriber<T>)> onsubscribe_type;
+ typedef std::function<key_type()> ongetkey_type;
+
+ onsubscribe_type on_subscribe;
+ ongetkey_type on_get_key;
+ };
+ std::shared_ptr<state_type> state;
+
+ template<class U>
+ void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
+ state = o.state;
+ }
+
+ template<class U>
+ void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
+ state = std::move(o.state);
+ }
+
+ template<class SO>
+ void construct(SO&& source, rxs::tag_source&&) {
+ auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
+ state->on_subscribe = [so](subscriber<T> o) mutable {
+ so->on_subscribe(std::move(o));
+ };
+ state->on_get_key = [so]() mutable {
+ return so->on_get_key();
+ };
+ }
+
+public:
+
+ dynamic_grouped_observable()
+ {
+ }
+
+ template<class SOF>
+ explicit dynamic_grouped_observable(SOF&& sof)
+ : state(std::make_shared<state_type>())
+ {
+ construct(std::forward<SOF>(sof),
+ typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
+ }
+
+ template<class SF, class CF>
+ dynamic_grouped_observable(SF&& sf, CF&& cf)
+ : state(std::make_shared<state_type>())
+ {
+ state->on_subscribe = std::forward<SF>(sf);
+ state->on_connect = std::forward<CF>(cf);
+ }
+
+ void on_subscribe(subscriber<T> o) const {
+ state->on_subscribe(std::move(o));
+ }
+
+ template<class Subscriber>
+ typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
+ on_subscribe(Subscriber&& o) const {
+ auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
+ state->on_subscribe(
+ make_subscriber<T>(
+ *so,
+ // on_next
+ [so](T t){
+ so->on_next(t);
+ },
+ // on_error
+ [so](std::exception_ptr e){
+ so->on_error(e);
+ },
+ // on_completed
+ [so](){
+ so->on_completed();
+ }).
+ as_dynamic());
+ }
+
+ key_type on_get_key() const {
+ return state->on_get_key();
+ }
+};
+
+template<class K, class T, class Source>
+grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s) {
+ return grouped_observable<K, T>(dynamic_grouped_observable<K, T>(std::forward<Source>(s)));
+}
+
+
+
+template<class K, class T, class SourceOperator>
+class grouped_observable
+ : public observable<T, SourceOperator>
+{
+ typedef grouped_observable<K, T, SourceOperator> this_type;
+ typedef observable<T, SourceOperator> base_type;
+ typedef typename std::decay<SourceOperator>::type source_operator_type;
+
+ static_assert(detail::has_on_get_key_for<K, source_operator_type>::value, "inner must have on_get_key method key_type()");
+
+public:
+ typedef typename std::decay<K>::type key_type;
+ typedef tag_grouped_observable observable_tag;
+
+ grouped_observable()
+ {
+ }
+
+ explicit grouped_observable(const SourceOperator& o)
+ : base_type(o)
+ {
+ }
+ explicit grouped_observable(SourceOperator&& o)
+ : base_type(std::move(o))
+ {
+ }
+
+ // implicit conversion between observables of the same value_type
+ template<class SO>
+ grouped_observable(const grouped_observable<K, T, SO>& o)
+ : base_type(o)
+ {}
+ // implicit conversion between observables of the same value_type
+ template<class SO>
+ grouped_observable(grouped_observable<K, T, SO>&& o)
+ : base_type(std::move(o))
+ {}
+
+ ///
+ /// performs type-forgetting conversion to a new grouped_observable
+ ///
+ grouped_observable<K, T> as_dynamic() const {
+ return *this;
+ }
+
+ key_type get_key() const {
+ return base_type::source_operator.on_get_key();
+ }
+};
+
+
+}
+
+//
+// support range() >> filter() >> subscribe() syntax
+// '>>' is spelled 'stream'
+//
+template<class T, class SourceOperator, class OperatorFactory>
+auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of)
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
+}
+
+//
+// support range() | filter() | subscribe() syntax
+// '|' is spelled 'pipe'
+//
+template<class T, class SourceOperator, class OperatorFactory>
+auto operator | (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of)
+ -> decltype(source.op(std::forward<OperatorFactory>(of))) {
+ return source.op(std::forward<OperatorFactory>(of));
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 86fd34a..5ca50e1 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -132,6 +132,7 @@
#include "rx-operators.hpp"
#include "rx-observable.hpp"
#include "rx-connectable_observable.hpp"
+#include "rx-grouped_observable.hpp"
#pragma pop_macro("min")
#pragma pop_macro("max")
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 8297014..c93d408 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -662,6 +662,15 @@ public:
return defer_combine_latest<Coordination, rxu::detail::pack, this_type, ObservableN...>::make(*this, std::move(cn), rxu::pack(), std::make_tuple(*this, on...));
}
+ /// group_by ->
+ ///
+ template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) const
+ -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)))) {
+ return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)));
+ }
+
+
/// multicast ->
/// allows connections to the source to be independent of subscriptions
///
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 1c43ff5..b62ddd6 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -43,6 +43,7 @@ namespace rxo=operators;
#include "operators/rx-filter.hpp"
#include "operators/rx-finally.hpp"
#include "operators/rx-flat_map.hpp"
+#include "operators/rx-group_by.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp
index 1b82f40..bd94151 100644
--- a/Rx/v2/src/rxcpp/rx-predef.hpp
+++ b/Rx/v2/src/rxcpp/rx-predef.hpp
@@ -194,6 +194,43 @@ public:
static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_connectable_observable>::value;
};
+struct tag_dynamic_grouped_observable : public tag_dynamic_observable {};
+
+template<class T>
+class is_dynamic_grouped_observable
+{
+ struct not_void {};
+ template<class C>
+ static typename C::dynamic_observable_tag* check(int);
+ template<class C>
+ static not_void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_dynamic_grouped_observable*>::value;
+};
+
+template<class K, class T>
+class dynamic_grouped_observable;
+
+template<class K, class T,
+ class SourceObservable = typename std::conditional<std::is_same<T, void>::value,
+ void, dynamic_grouped_observable<K, T>>::type>
+class grouped_observable;
+
+template<class K, class T, class Source>
+grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s);
+
+struct tag_grouped_observable : public tag_observable {};
+template<class T>
+class is_grouped_observable
+{
+ template<class C>
+ static typename C::observable_tag check(int);
+ template<class C>
+ static void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_grouped_observable>::value;
+};
+
//
// this type is the default used by operators that subscribe to
// multiple sources. It assumes that the sources are already synchronized
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index e07cc69..4ead228 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -214,6 +214,8 @@ class subject
detail::multicast_observer<T> s;
public:
+ typedef subscriber<T, observer<T, detail::multicast_observer<T>>> subscriber_type;
+ typedef observable<T> observable_type;
subject()
: s(composite_subscription())
{
@@ -227,7 +229,7 @@ public:
return s.has_observers();
}
- subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const {
+ subscriber_type get_subscriber() const {
return s.get_subscriber();
}