diff options
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-all.hpp | 109 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 22 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 |
3 files changed, 132 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-all.hpp b/Rx/v2/src/rxcpp/operators/rx-all.hpp new file mode 100644 index 0000000..a9ce654 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-all.hpp @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_ALL_HPP) +#define RXCPP_OPERATORS_RX_ALL_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Predicate> +struct all +{ + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Predicate> test_type; + test_type test; + + all(test_type t) + : test(std::move(t)) + { + } + + template<class Subscriber> + struct all_observer + { + typedef all_observer<Subscriber> this_type; + typedef source_value_type value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + dest_type dest; + test_type test; + mutable bool done; + + all_observer(dest_type d, test_type t) + : dest(std::move(d)) + , test(std::move(t)), + done(false) + { + } + void on_next(source_value_type v) const { + auto filtered = on_exception([&]() { + return !this->test(v); }, + dest); + if (filtered.empty()) { + return; + } + if (filtered.get() && !done) { + done = true; + dest.on_next(false); + dest.on_completed(); + } + } + void on_error(std::exception_ptr e) const { + dest.on_error(e); + } + void on_completed() const { + if(!done) { + done = true; + dest.on_next(true); + dest.on_completed(); + } + } + + static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, test_type t) { + return make_subscriber<value_type>(d, this_type(d, std::move(t))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) { + return all_observer<Subscriber>::make(std::move(dest), test); + } +}; + +template <class Predicate> +class all_factory +{ + typedef rxu::decay_t<Predicate> test_type; + + test_type test; +public: + all_factory(test_type t) : test(t) { } + + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test)); + } +}; + +} + +template <class Predicate> +inline auto all(Predicate test) +-> detail::all_factory<Predicate> { +return detail::all_factory<Predicate>(test); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index e5d068c..cdeb1e0 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -723,6 +723,28 @@ public: return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...)); } + /*! Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. + Emits true if the source Observable terminates without emitting any item. + + \tparam Predicate the type of the test function. + + \param p the test function to test items emitted by the source Observable. + + \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false. + + \sample + \snippet all.cpp all sample + \snippet output.txt all sample + */ + template<class Predicate> + auto all(Predicate p) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<bool>(rxo::detail::all<T, Predicate>(std::move(p)))) + /// \endcond + { + return lift<bool>(rxo::detail::all<T, Predicate>(std::move(p))); + } + /*! Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4cc5327..aebd930 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -34,6 +34,7 @@ namespace rxo=operators; } +#include "operators/rx-all.hpp" #include "operators/rx-amb.hpp" #include "operators/rx-any.hpp" #include "operators/rx-buffer_count.hpp" |