summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-all.hpp109
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp22
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
3 files changed, 132 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-all.hpp b/Rx/v2/src/rxcpp/operators/rx-all.hpp
new file mode 100644
index 0000000..a9ce654
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-all.hpp
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_ALL_HPP)
+#define RXCPP_OPERATORS_RX_ALL_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Predicate>
+struct all
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Predicate> test_type;
+ test_type test;
+
+ all(test_type t)
+ : test(std::move(t))
+ {
+ }
+
+ template<class Subscriber>
+ struct all_observer
+ {
+ typedef all_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ test_type test;
+ mutable bool done;
+
+ all_observer(dest_type d, test_type t)
+ : dest(std::move(d))
+ , test(std::move(t)),
+ done(false)
+ {
+ }
+ void on_next(source_value_type v) const {
+ auto filtered = on_exception([&]() {
+ return !this->test(v); },
+ dest);
+ if (filtered.empty()) {
+ return;
+ }
+ if (filtered.get() && !done) {
+ done = true;
+ dest.on_next(false);
+ dest.on_completed();
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ if(!done) {
+ done = true;
+ dest.on_next(true);
+ dest.on_completed();
+ }
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, test_type t) {
+ return make_subscriber<value_type>(d, this_type(d, std::move(t)));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) {
+ return all_observer<Subscriber>::make(std::move(dest), test);
+ }
+};
+
+template <class Predicate>
+class all_factory
+{
+ typedef rxu::decay_t<Predicate> test_type;
+
+ test_type test;
+public:
+ all_factory(test_type t) : test(t) { }
+
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test));
+ }
+};
+
+}
+
+template <class Predicate>
+inline auto all(Predicate test)
+-> detail::all_factory<Predicate> {
+return detail::all_factory<Predicate>(test);
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index e5d068c..cdeb1e0 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -723,6 +723,28 @@ public:
return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
}
+ /*! Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false.
+ Emits true if the source Observable terminates without emitting any item.
+
+ \tparam Predicate the type of the test function.
+
+ \param p the test function to test items emitted by the source Observable.
+
+ \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false.
+
+ \sample
+ \snippet all.cpp all sample
+ \snippet output.txt all sample
+ */
+ template<class Predicate>
+ auto all(Predicate p) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<bool>(rxo::detail::all<T, Predicate>(std::move(p))))
+ /// \endcond
+ {
+ return lift<bool>(rxo::detail::all<T, Predicate>(std::move(p)));
+ }
+
/*! Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false.
Emits false if the source Observable terminates without emitting any item.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 4cc5327..aebd930 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -34,6 +34,7 @@ namespace rxo=operators;
}
+#include "operators/rx-all.hpp"
#include "operators/rx-amb.hpp"
#include "operators/rx-any.hpp"
#include "operators/rx-buffer_count.hpp"