summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/examples/doxygen/all.cpp29
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-all.hpp109
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp22
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/all.cpp216
-rw-r--r--projects/CMake/CMakeLists.txt1
-rw-r--r--projects/doxygen/CMakeLists.txt1
8 files changed, 380 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/all.cpp b/Rx/v2/examples/doxygen/all.cpp
new file mode 100644
index 0000000..a5adb30
--- /dev/null
+++ b/Rx/v2/examples/doxygen/all.cpp
@@ -0,0 +1,29 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("all sample") {
+ printf("//! [all sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).all([](int n) { return n < 6; });
+ values.
+ subscribe(
+ [](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [all sample]\n");
+}
+
+SCENARIO("all - operator syntax sample") {
+ using namespace rxcpp;
+ using namespace rxcpp::sources;
+ using namespace rxcpp::operators;
+
+ printf("//! [all - operator syntax sample]\n");
+ auto values = range(1, 10)
+ | all([](int n) { return n < 100; });
+ values.
+ subscribe(
+ [](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [all - operator syntax sample]\n");
+} \ No newline at end of file
diff --git a/Rx/v2/src/rxcpp/operators/rx-all.hpp b/Rx/v2/src/rxcpp/operators/rx-all.hpp
new file mode 100644
index 0000000..a9ce654
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-all.hpp
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_ALL_HPP)
+#define RXCPP_OPERATORS_RX_ALL_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Predicate>
+struct all
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Predicate> test_type;
+ test_type test;
+
+ all(test_type t)
+ : test(std::move(t))
+ {
+ }
+
+ template<class Subscriber>
+ struct all_observer
+ {
+ typedef all_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ test_type test;
+ mutable bool done;
+
+ all_observer(dest_type d, test_type t)
+ : dest(std::move(d))
+ , test(std::move(t)),
+ done(false)
+ {
+ }
+ void on_next(source_value_type v) const {
+ auto filtered = on_exception([&]() {
+ return !this->test(v); },
+ dest);
+ if (filtered.empty()) {
+ return;
+ }
+ if (filtered.get() && !done) {
+ done = true;
+ dest.on_next(false);
+ dest.on_completed();
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ if(!done) {
+ done = true;
+ dest.on_next(true);
+ dest.on_completed();
+ }
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, test_type t) {
+ return make_subscriber<value_type>(d, this_type(d, std::move(t)));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) {
+ return all_observer<Subscriber>::make(std::move(dest), test);
+ }
+};
+
+template <class Predicate>
+class all_factory
+{
+ typedef rxu::decay_t<Predicate> test_type;
+
+ test_type test;
+public:
+ all_factory(test_type t) : test(t) { }
+
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test));
+ }
+};
+
+}
+
+template <class Predicate>
+inline auto all(Predicate test)
+-> detail::all_factory<Predicate> {
+return detail::all_factory<Predicate>(test);
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index e5d068c..cdeb1e0 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -723,6 +723,28 @@ public:
return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
}
+ /*! Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false.
+ Emits true if the source Observable terminates without emitting any item.
+
+ \tparam Predicate the type of the test function.
+
+ \param p the test function to test items emitted by the source Observable.
+
+ \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false.
+
+ \sample
+ \snippet all.cpp all sample
+ \snippet output.txt all sample
+ */
+ template<class Predicate>
+ auto all(Predicate p) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<bool>(rxo::detail::all<T, Predicate>(std::move(p))))
+ /// \endcond
+ {
+ return lift<bool>(rxo::detail::all<T, Predicate>(std::move(p)));
+ }
+
/*! Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false.
Emits false if the source Observable terminates without emitting any item.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 4cc5327..aebd930 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -34,6 +34,7 @@ namespace rxo=operators;
}
+#include "operators/rx-all.hpp"
#include "operators/rx-amb.hpp"
#include "operators/rx-any.hpp"
#include "operators/rx-buffer_count.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index db83221..b900031 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -27,6 +27,7 @@ set(TEST_SOURCES
${TEST_DIR}/sources/interval.cpp
${TEST_DIR}/sources/scope.cpp
${TEST_DIR}/sources/timer.cpp
+ ${TEST_DIR}/operators/all.cpp
${TEST_DIR}/operators/amb.cpp
${TEST_DIR}/operators/amb_variadic.cpp
${TEST_DIR}/operators/buffer.cpp
diff --git a/Rx/v2/test/operators/all.cpp b/Rx/v2/test/operators/all.cpp
new file mode 100644
index 0000000..7730cb0
--- /dev/null
+++ b/Rx/v2/test/operators/all.cpp
@@ -0,0 +1,216 @@
+#include "../test.h"
+
+SCENARIO("all emits true if every item emitted by the source observable evaluated as true", "[all][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<bool> on_all;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 2),
+ on.completed(250)
+ });
+
+ WHEN("a predicate function is passed to the all operator") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .all([](int n) { return n == 2; })
+ .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013
+ }
+ );
+
+ THEN("the output only contains true") {
+ auto required = rxu::to_vector({
+ on_all.next(250, true),
+ on_all.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("all emits false if any item emitted by the source observable evaluated as false", "[all][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<bool> on_all;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.completed(250)
+ });
+
+ WHEN("a predicate function is passed to the all operator") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .all([](int n) { return n == 2; })
+ .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013
+
+ }
+ );
+
+ THEN("the output only contains false") {
+ auto required = rxu::to_vector({
+ on_all.next(220, false),
+ on_all.completed(220)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 220)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("all emits true if the source observable is empty", "[all][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<bool> on_all;
+
+ auto xs = sc.make_hot_observable({
+ on.completed(250)
+ });
+
+ WHEN("a predicate function is passed to the all operator") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .all([](int n) { return n == 2; })
+ .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013
+ }
+ );
+
+ THEN("the output only contains true") {
+ auto required = rxu::to_vector({
+ on_all.next(250, true),
+ on_all.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("all never emits if the source observable never emits any items", "[all][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<bool> on_all;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("a predicate function is passed to the all operator") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .all([](int n) { return n == 2; })
+ .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013
+ }
+ );
+
+ THEN("the output is empty") {
+ auto required = std::vector<rxsc::test::messages<bool>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("all emits an error if the source observable emit an error", "[all][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<bool> on_all;
+
+ std::runtime_error ex("all on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("a predicate function is passed to the all operator") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .all([](int n) { return n == 2; })
+ .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013
+ }
+ );
+
+ THEN("the output only contains an error") {
+ auto required = rxu::to_vector({
+ on_all.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+} \ No newline at end of file
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index dcec5ca..c593830 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -26,6 +26,7 @@ add_subdirectory(${EXAMPLES_DIR}/tests ${CMAKE_CURRENT_BINARY_DIR}/examples/test
# The list of RxCpp source files. Please add every new file to this list
set(RX_SOURCES
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-all.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-amb.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-any.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 8ab7a16..6681d8d 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -43,6 +43,7 @@ if(DOXYGEN_FOUND)
# Target to build examples
set(DOXY_EXAMPLE_SRC_LIST
${DOXY_EXAMPLES_SRC_DIR}/main.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/all.cpp
${DOXY_EXAMPLES_SRC_DIR}/amb.cpp
${DOXY_EXAMPLES_SRC_DIR}/as_dynamic.cpp
${DOXY_EXAMPLES_SRC_DIR}/blocking_observable.cpp