diff options
-rw-r--r-- | Rx/v2/examples/doxygen/all.cpp | 29 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-all.hpp | 109 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 22 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/all.cpp | 216 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 1 | ||||
-rw-r--r-- | projects/doxygen/CMakeLists.txt | 1 |
8 files changed, 380 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/all.cpp b/Rx/v2/examples/doxygen/all.cpp new file mode 100644 index 0000000..a5adb30 --- /dev/null +++ b/Rx/v2/examples/doxygen/all.cpp @@ -0,0 +1,29 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("all sample") { + printf("//! [all sample]\n"); + auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).all([](int n) { return n < 6; }); + values. + subscribe( + [](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); }, + []() { printf("OnCompleted\n"); }); + printf("//! [all sample]\n"); +} + +SCENARIO("all - operator syntax sample") { + using namespace rxcpp; + using namespace rxcpp::sources; + using namespace rxcpp::operators; + + printf("//! [all - operator syntax sample]\n"); + auto values = range(1, 10) + | all([](int n) { return n < 100; }); + values. + subscribe( + [](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); }, + []() { printf("OnCompleted\n"); }); + printf("//! [all - operator syntax sample]\n"); +}
\ No newline at end of file diff --git a/Rx/v2/src/rxcpp/operators/rx-all.hpp b/Rx/v2/src/rxcpp/operators/rx-all.hpp new file mode 100644 index 0000000..a9ce654 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-all.hpp @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_ALL_HPP) +#define RXCPP_OPERATORS_RX_ALL_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Predicate> +struct all +{ + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Predicate> test_type; + test_type test; + + all(test_type t) + : test(std::move(t)) + { + } + + template<class Subscriber> + struct all_observer + { + typedef all_observer<Subscriber> this_type; + typedef source_value_type value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + dest_type dest; + test_type test; + mutable bool done; + + all_observer(dest_type d, test_type t) + : dest(std::move(d)) + , test(std::move(t)), + done(false) + { + } + void on_next(source_value_type v) const { + auto filtered = on_exception([&]() { + return !this->test(v); }, + dest); + if (filtered.empty()) { + return; + } + if (filtered.get() && !done) { + done = true; + dest.on_next(false); + dest.on_completed(); + } + } + void on_error(std::exception_ptr e) const { + dest.on_error(e); + } + void on_completed() const { + if(!done) { + done = true; + dest.on_next(true); + dest.on_completed(); + } + } + + static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, test_type t) { + return make_subscriber<value_type>(d, this_type(d, std::move(t))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) { + return all_observer<Subscriber>::make(std::move(dest), test); + } +}; + +template <class Predicate> +class all_factory +{ + typedef rxu::decay_t<Predicate> test_type; + + test_type test; +public: + all_factory(test_type t) : test(t) { } + + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(all<rxu::value_type_t<rxu::decay_t<Observable>>, Predicate>(test)); + } +}; + +} + +template <class Predicate> +inline auto all(Predicate test) +-> detail::all_factory<Predicate> { +return detail::all_factory<Predicate>(test); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index e5d068c..cdeb1e0 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -723,6 +723,28 @@ public: return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...)); } + /*! Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. + Emits true if the source Observable terminates without emitting any item. + + \tparam Predicate the type of the test function. + + \param p the test function to test items emitted by the source Observable. + + \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false. + + \sample + \snippet all.cpp all sample + \snippet output.txt all sample + */ + template<class Predicate> + auto all(Predicate p) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<bool>(rxo::detail::all<T, Predicate>(std::move(p)))) + /// \endcond + { + return lift<bool>(rxo::detail::all<T, Predicate>(std::move(p))); + } + /*! Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 4cc5327..aebd930 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -34,6 +34,7 @@ namespace rxo=operators; } +#include "operators/rx-all.hpp" #include "operators/rx-amb.hpp" #include "operators/rx-any.hpp" #include "operators/rx-buffer_count.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index db83221..b900031 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -27,6 +27,7 @@ set(TEST_SOURCES ${TEST_DIR}/sources/interval.cpp ${TEST_DIR}/sources/scope.cpp ${TEST_DIR}/sources/timer.cpp + ${TEST_DIR}/operators/all.cpp ${TEST_DIR}/operators/amb.cpp ${TEST_DIR}/operators/amb_variadic.cpp ${TEST_DIR}/operators/buffer.cpp diff --git a/Rx/v2/test/operators/all.cpp b/Rx/v2/test/operators/all.cpp new file mode 100644 index 0000000..7730cb0 --- /dev/null +++ b/Rx/v2/test/operators/all.cpp @@ -0,0 +1,216 @@ +#include "../test.h" + +SCENARIO("all emits true if every item emitted by the source observable evaluated as true", "[all][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<bool> on_all; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 2), + on.completed(250) + }); + + WHEN("a predicate function is passed to the all operator") { + + auto res = w.start( + [xs]() { + return xs + .all([](int n) { return n == 2; }) + .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013 + } + ); + + THEN("the output only contains true") { + auto required = rxu::to_vector({ + on_all.next(250, true), + on_all.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("all emits false if any item emitted by the source observable evaluated as false", "[all][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<bool> on_all; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.completed(250) + }); + + WHEN("a predicate function is passed to the all operator") { + + auto res = w.start( + [xs]() { + return xs + .all([](int n) { return n == 2; }) + .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013 + + } + ); + + THEN("the output only contains false") { + auto required = rxu::to_vector({ + on_all.next(220, false), + on_all.completed(220) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 220) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("all emits true if the source observable is empty", "[all][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<bool> on_all; + + auto xs = sc.make_hot_observable({ + on.completed(250) + }); + + WHEN("a predicate function is passed to the all operator") { + + auto res = w.start( + [xs]() { + return xs + .all([](int n) { return n == 2; }) + .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013 + } + ); + + THEN("the output only contains true") { + auto required = rxu::to_vector({ + on_all.next(250, true), + on_all.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("all never emits if the source observable never emits any items", "[all][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<bool> on_all; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("a predicate function is passed to the all operator") { + + auto res = w.start( + [xs]() { + return xs + .all([](int n) { return n == 2; }) + .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013 + } + ); + + THEN("the output is empty") { + auto required = std::vector<rxsc::test::messages<bool>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("all emits an error if the source observable emit an error", "[all][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<bool> on_all; + + std::runtime_error ex("all on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("a predicate function is passed to the all operator") { + + auto res = w.start( + [xs]() { + return xs + .all([](int n) { return n == 2; }) + .as_dynamic(); // forget type to workaround lambda deduction bug on msvc 2013 + } + ); + + THEN("the output only contains an error") { + auto required = rxu::to_vector({ + on_all.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +}
\ No newline at end of file diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index dcec5ca..c593830 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -26,6 +26,7 @@ add_subdirectory(${EXAMPLES_DIR}/tests ${CMAKE_CURRENT_BINARY_DIR}/examples/test # The list of RxCpp source files. Please add every new file to this list set(RX_SOURCES + ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-all.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-amb.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-any.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index 8ab7a16..6681d8d 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -43,6 +43,7 @@ if(DOXYGEN_FOUND) # Target to build examples set(DOXY_EXAMPLE_SRC_LIST ${DOXY_EXAMPLES_SRC_DIR}/main.cpp + ${DOXY_EXAMPLES_SRC_DIR}/all.cpp ${DOXY_EXAMPLES_SRC_DIR}/amb.cpp ${DOXY_EXAMPLES_SRC_DIR}/as_dynamic.cpp ${DOXY_EXAMPLES_SRC_DIR}/blocking_observable.cpp |