From 3bf429778a08eda9498c61ec1daa3664d5f1bbfd Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Sat, 2 Aug 2014 11:35:04 -0700 Subject: added group_by --- Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 220 ++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 Rx/v2/src/rxcpp/operators/rx-group_by.hpp (limited to 'Rx/v2/src/rxcpp/operators/rx-group_by.hpp') diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp new file mode 100644 index 0000000..3982e8b --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -0,0 +1,220 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP) +#define RXCPP_OPERATORS_RX_GROUP_BY_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template +struct is_group_by_selector_for { + + typedef typename std::decay::type selector_type; + typedef T source_value_type; + + struct tag_not_valid {}; + template + static auto check(int) -> decltype((*(CS*)nullptr)(*(CV*)nullptr)); + template + static tag_not_valid check(...); + + typedef decltype(check(0)) type; + static const bool value = !std::is_same::value; +}; + +template +struct group_by_traits +{ + typedef T source_value_type; + typedef typename std::decay::type source_type; + typedef typename std::decay::type key_selector_type; + typedef typename std::decay::type marble_selector_type; + typedef typename std::decay::type predicate_type; + + static_assert(is_group_by_selector_for::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); + + typedef typename is_group_by_selector_for::type key_type; + + static_assert(is_group_by_selector_for::value, "group_by MarbleSelector must be a function with the signature marble_type(source_value_type)"); + + typedef typename is_group_by_selector_for::type marble_type; + + typedef rxsub::subject subject_type; + + typedef std::map key_subscriber_map_type; + + typedef grouped_observable grouped_observable_type; +}; + +template +struct group_by +{ + typedef group_by_traits traits_type; + typedef typename traits_type::key_selector_type key_selector_type; + typedef typename traits_type::marble_selector_type marble_selector_type; + typedef typename traits_type::predicate_type predicate_type; + typedef typename traits_type::subject_type subject_type; + typedef typename traits_type::key_type key_type; + + struct group_by_values + { + group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p) + : keySelector(std::move(ks)) + , marbleSelector(std::move(ms)) + , predicate(std::move(p)) + { + } + mutable key_selector_type keySelector; + mutable marble_selector_type marbleSelector; + mutable predicate_type predicate; + }; + + group_by_values initial; + + group_by(key_selector_type ks, marble_selector_type ms, predicate_type p) + : initial(std::move(ks), std::move(ms), std::move(p)) + { + } + + struct group_by_observable + { + subject_type subject; + key_type key; + + group_by_observable(subject_type s, key_type k) + : subject(std::move(s)) + , key(k) + { + } + + template + void on_subscribe(Subscriber&& o) const { + subject.get_observable().subscribe(std::forward(o)); + } + + key_type on_get_key() { + return key; + } + }; + + template + struct group_by_observer : public group_by_values + { + typedef group_by_observer this_type; + typedef typename traits_type::grouped_observable_type value_type; + typedef typename std::decay::type dest_type; + typedef observer observer_type; + dest_type dest; + + mutable typename traits_type::key_subscriber_map_type groups; + + group_by_observer(dest_type d, group_by_values v) + : group_by_values(v) + , dest(std::move(d)) + , groups(group_by_values::predicate) + { + } + void on_next(T v) const { + auto selectedKey = on_exception( + [&](){ + return this->keySelector(v);}, + *this); + if (selectedKey.empty()) { + return; + } + auto g = groups.find(selectedKey.get()); + if (g == groups.end()) { + auto sub = subject_type(); + g = groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; + dest.on_next(make_dynamic_grouped_observable(group_by_observable(sub, selectedKey.get()))); + } + auto selectedMarble = on_exception( + [&](){ + return this->marbleSelector(v);}, + *this); + if (selectedMarble.empty()) { + return; + } + g->second.on_next(std::move(selectedMarble.get())); + } + void on_error(std::exception_ptr e) const { + (*this)(e); + } + void operator()(std::exception_ptr e) const { + for(auto& g : groups) { + g.second.on_error(e); + } + dest.on_error(e); + } + void on_completed() const { + for(auto& g : groups) { + g.second.on_completed(); + } + dest.on_completed(); + } + + static subscriber make(dest_type d, group_by_values v) { + auto cs = d.get_subscription(); + return make_subscriber(std::move(cs), observer_type(this_type(std::move(d), std::move(v)))); + } + }; + + template + auto operator()(Subscriber dest) const + -> decltype(group_by_observer::make(std::move(dest), initial)) { + return group_by_observer::make(std::move(dest), initial); + } +}; + +template +class group_by_factory +{ + typedef typename std::decay::type key_selector_type; + typedef typename std::decay::type marble_selector_type; + typedef typename std::decay::type predicate_type; + key_selector_type keySelector; + marble_selector_type marbleSelector; + predicate_type predicate; +public: + group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p) + : keySelector(std::move(ks)) + , marbleSelector(std::move(ms)) + , predicate(std::move(p)) + { + } + template + struct group_by_factory_traits + { + typedef typename Observable::value_type value_type; + typedef detail::group_by_traits traits_type; + typedef detail::group_by group_by_type; + }; + template + auto operator()(Observable&& source) + -> decltype(source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) { + return source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate))); + } +}; + +} + +template +inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) + -> detail::group_by_factory { + return detail::group_by_factory(std::move(ks), std::move(ms), std::move(p)); +} + + +} + +} + +#endif + -- cgit v1.2.3