// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-all.hpp \brief Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. Emits true if the source Observable terminates without emitting any item. \tparam Predicate the type of the test function. \param p the test function to test items emitted by the source Observable. \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false. \sample \snippet all.cpp all sample \snippet output.txt all sample */ #if !defined(RXCPP_OPERATORS_RX_ALL_HPP) #define RXCPP_OPERATORS_RX_ALL_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct all_invalid_arguments {}; template struct all_invalid : public rxo::operator_base> { using type = observable, all_invalid>; }; template using all_invalid_t = typename all_invalid::type; template struct all { typedef rxu::decay_t source_value_type; typedef rxu::decay_t test_type; test_type test; typedef bool value_type; all(test_type t) : test(std::move(t)) { } template struct all_observer { typedef all_observer this_type; typedef source_value_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; test_type test; mutable bool done; all_observer(dest_type d, test_type t) : dest(std::move(d)) , test(std::move(t)), done(false) { } void on_next(source_value_type v) const { auto filtered = on_exception([&]() { return !this->test(v); }, dest); if (filtered.empty()) { return; } if (filtered.get() && !done) { done = true; dest.on_next(false); dest.on_completed(); } } void on_error(std::exception_ptr e) const { dest.on_error(e); } void on_completed() const { if(!done) { done = true; dest.on_next(true); dest.on_completed(); } } static subscriber make(dest_type d, test_type t) { return make_subscriber(d, this_type(d, std::move(t))); } }; template auto operator()(Subscriber dest) const -> decltype(all_observer::make(std::move(dest), test)) { return all_observer::make(std::move(dest), test); } }; } /*! @copydoc rx-all.hpp */ template auto all(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } /*! \brief Returns an Observable that emits true if the source Observable is empty, otherwise false. \return An observable that emits a boolean value. \sample \snippet is_empty.cpp is_empty sample \snippet output.txt is_empty sample */ template auto is_empty(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, class Enabled = rxu::enable_if_all_true_type_t< is_observable>, class All = rxo::detail::all>, class Value = rxu::value_type_t> static auto member(Observable&& o, Predicate&& p) -> decltype(o.template lift(All(std::forward(p)))) { return o.template lift(All(std::forward(p))); } template static operators::detail::all_invalid_t member(const AN&...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "all takes (Predicate)"); } }; template<> struct member_overload { template, class Enabled = rxu::enable_if_all_true_type_t< is_observable>, class Predicate = std::function, class IsEmpty = rxo::detail::all>, class Value = rxu::value_type_t> static auto member(Observable&& o) -> decltype(o.template lift(IsEmpty(nullptr))) { return o.template lift(IsEmpty([](SourceValue) { return false; })); } template static operators::detail::all_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "is_empty takes no arguments"); } }; } #endif