summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-take_last.hpp
blob: 12e28b61b740ea4aa59018b9f4236750d193d9e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-take_last.hpp

    \brief Emit only the final t items emitted by the source Observable.

    \tparam Count  the type of the items counter.

    \param t  the number of last items to take.

    \return  An observable that emits only the last t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.

    \sample
    \snippet take_last.cpp take_last sample
    \snippet output.txt take_last sample
*/

#if !defined(RXCPP_OPERATORS_RX_TAKE_LAST_HPP)
#define RXCPP_OPERATORS_RX_TAKE_LAST_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct take_last_invalid_arguments {};

template<class... AN>
struct take_last_invalid : public rxo::operator_base<take_last_invalid_arguments<AN...>> {
    using type = observable<take_last_invalid_arguments<AN...>, take_last_invalid<AN...>>;
};
template<class... AN>
using take_last_invalid_t = typename take_last_invalid<AN...>::type;

template<class T, class Observable, class Count>
struct take_last : public operator_base<T>
{
    typedef rxu::decay_t<Observable> source_type;
    typedef rxu::decay_t<Count> count_type;

    typedef std::queue<T> queue_type;
    typedef typename queue_type::size_type queue_size_type;

    struct values
    {
        values(source_type s, count_type t)
            : source(std::move(s))
            , count(static_cast<queue_size_type>(t))
        {
        }
        source_type source;
        queue_size_type count;
    };
    values initial;

    take_last(source_type s, count_type t)
        : initial(std::move(s), std::move(t))
    {
    }

    template<class Subscriber>
    void on_subscribe(const Subscriber& s) const {

        typedef Subscriber output_type;
        struct state_type
            : public std::enable_shared_from_this<state_type>
            , public values
        {
            state_type(const values& i, const output_type& oarg)
                : values(i)
                , out(oarg)
            {
            }
            queue_type items;
            output_type out;
        };
        // take a copy of the values for each subscription
        auto state = std::make_shared<state_type>(initial, s);

        composite_subscription source_lifetime;

        s.add(source_lifetime);

        state->source.subscribe(
        // split subscription lifetime
            source_lifetime,
        // on_next
            [state, source_lifetime](T t) {
                if(state->count > 0) {
                    if (state->items.size() == state->count) {
                        state->items.pop();
                    }
                    state->items.push(t);
                }
            },
        // on_error
            [state](rxu::error_ptr e) {
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                while(!state->items.empty()) {
                    state->out.on_next(std::move(state->items.front()));
                    state->items.pop();
                }
                state->out.on_completed();
            }
        );
    }
};

}

/*! @copydoc rx-take_last.hpp
*/
template<class... AN>
auto take_last(AN&&... an)
    ->     operator_factory<take_last_tag, AN...> {
    return operator_factory<take_last_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<take_last_tag>
{
    template<class Observable,
        class Count,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class TakeLast = rxo::detail::take_last<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
        class Value = rxu::value_type_t<TakeLast>,
        class Result = observable<Value, TakeLast>>
    static Result member(Observable&& o, Count&& c) {
        return Result(TakeLast(std::forward<Observable>(o), std::forward<Count>(c)));
    }

    template<class... AN>
    static operators::detail::take_last_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "take_last takes (Count)");
    }
};

}

#endif