// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-filter.hpp \brief For each item from this observable use Predicate to select which items to emit from the new observable that is returned. \tparam Predicate the type of the filter function \param p the filter function \return Observable that emits only those items emitted by the source observable that the filter evaluates as true. \sample \snippet filter.cpp filter sample \snippet output.txt filter sample */ #if !defined(RXCPP_OPERATORS_RX_FILTER_HPP) #define RXCPP_OPERATORS_RX_FILTER_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct filter_invalid_arguments {}; template struct filter_invalid : public rxo::operator_base> { using type = observable, filter_invalid>; }; template using filter_invalid_t = typename filter_invalid::type; template struct filter { typedef rxu::decay_t source_value_type; typedef rxu::decay_t test_type; test_type test; filter(test_type t) : test(std::move(t)) { } template struct filter_observer { typedef filter_observer this_type; typedef source_value_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; mutable test_type test; filter_observer(dest_type d, test_type t) : dest(std::move(d)) , test(std::move(t)) { } template void on_next(Value&& v) const { auto filtered = on_exception([&](){ return !this->test(rxu::as_const(v)); }, dest); if (filtered.empty()) { return; } if (!filtered.get()) { dest.on_next(std::forward(v)); } } void on_error(std::exception_ptr e) const { dest.on_error(e); } void on_completed() const { dest.on_completed(); } static subscriber make(dest_type d, test_type t) { return make_subscriber(d, this_type(d, std::move(t))); } }; template auto operator()(Subscriber dest) const -> decltype(filter_observer::make(std::move(dest), test)) { return filter_observer::make(std::move(dest), test); } }; } /*! @copydoc rx-filter.hpp */ template auto filter(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, class Filter = rxo::detail::filter>> static auto member(Observable&& o, Predicate&& p) -> decltype(o.template lift(Filter(std::forward(p)))) { return o.template lift(Filter(std::forward(p))); } template static operators::detail::filter_invalid_t member(const AN&...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "filter takes (Predicate)"); } }; } #endif