// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-take.hpp \brief For the first count items from this observable emit them from the new observable that is returned. \tparam Count the type of the items counter. \param t the number of items to take. \return An observable that emits only the first t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items. \sample \snippet take.cpp take sample \snippet output.txt take sample */ #if !defined(RXCPP_OPERATORS_RX_TAKE_HPP) #define RXCPP_OPERATORS_RX_TAKE_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct take_invalid_arguments {}; template struct take_invalid : public rxo::operator_base> { using type = observable, take_invalid>; }; template using take_invalid_t = typename take_invalid::type; template struct take : public operator_base { typedef rxu::decay_t source_type; typedef rxu::decay_t count_type; struct values { values(source_type s, count_type t) : source(std::move(s)) , count(std::move(t)) { } source_type source; count_type count; }; values initial; take(source_type s, count_type t) : initial(std::move(s), std::move(t)) { } struct mode { enum type { taking, // capture messages triggered, // ignore messages errored, // error occured stopped // observable completed }; }; template void on_subscribe(const Subscriber& s) const { typedef Subscriber output_type; struct state_type : public std::enable_shared_from_this , public values { state_type(const values& i, const output_type& oarg) : values(i) , mode_value(mode::taking) , out(oarg) { } typename mode::type mode_value; output_type out; }; // take a copy of the values for each subscription auto state = std::make_shared(initial, s); composite_subscription source_lifetime; s.add(source_lifetime); state->source.subscribe( // split subscription lifetime source_lifetime, // on_next [state, source_lifetime](T t) { if (state->mode_value < mode::triggered) { if (--state->count > 0) { state->out.on_next(t); } else { state->mode_value = mode::triggered; state->out.on_next(t); // must shutdown source before signaling completion source_lifetime.unsubscribe(); state->out.on_completed(); } } }, // on_error [state](std::exception_ptr e) { state->mode_value = mode::errored; state->out.on_error(e); }, // on_completed [state]() { state->mode_value = mode::stopped; state->out.on_completed(); } ); } }; } /*! @copydoc rx-take.hpp */ template auto take(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template>, class SourceValue = rxu::value_type_t, class Take = rxo::detail::take, rxu::decay_t>, class Value = rxu::value_type_t, class Result = observable> static Result member(Observable&& o, Count&& c) { return Result(Take(std::forward(o), std::forward(c))); } template static operators::detail::take_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "take takes (optional Count)"); } }; } #endif