summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
blob: 95947151c8ffdd3a30f02ec79056562db57620e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-skip_last.hpp

    \brief Make new observable with skipped last count items from this observable.

    \tparam Count the type of the items counter.

    \param  t the number of last items to skip.

    \return  An observable that is identical to the source observable except that it does not emit the last t items that the source observable emits.

    \sample
    \snippet skip_last.cpp skip_last sample
    \snippet output.txt skip_last sample
*/

#if !defined(RXCPP_OPERATORS_RX_SKIP_LAST_HPP)
#define RXCPP_OPERATORS_RX_SKIP_LAST_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct skip_last_invalid_arguments {};

template<class... AN>
struct skip_last_invalid : public rxo::operator_base<skip_last_invalid_arguments<AN...>> {
    using type = observable<skip_last_invalid_arguments<AN...>, skip_last_invalid<AN...>>;
};
template<class... AN>
using skip_last_invalid_t = typename skip_last_invalid<AN...>::type;

template<class T, class Observable, class Count>
struct skip_last : public operator_base<T>
{
    typedef rxu::decay_t<Observable> source_type;
    typedef rxu::decay_t<Count> count_type;

    typedef std::queue<T> queue_type;
    typedef typename queue_type::size_type queue_size_type;

    struct values
    {
        values(source_type s, count_type t)
            : source(std::move(s))
            , count(static_cast<queue_size_type>(t))
        {
        }
        source_type source;
        queue_size_type count;
    };
    values initial;

    skip_last(source_type s, count_type t)
        : initial(std::move(s), std::move(t))
    {
    }

    template<class Subscriber>
    void on_subscribe(const Subscriber& s) const {

        typedef Subscriber output_type;
        struct state_type
            : public std::enable_shared_from_this<state_type>
            , public values
        {
            state_type(const values& i, const output_type& oarg)
                : values(i)
                , out(oarg)
            {
            }
            queue_type items;
            output_type out;
        };
        // take a copy of the values for each subscription
        auto state = std::make_shared<state_type>(initial, s);

        composite_subscription source_lifetime;

        s.add(source_lifetime);

        state->source.subscribe(
        // split subscription lifetime
            source_lifetime,
        // on_next
            [state](T t) {
                if(state->count > 0) {
                    if (state->items.size() == state->count) {
                        state->out.on_next(std::move(state->items.front()));
                        state->items.pop();
                    }
                    state->items.push(t);
                } else {
                    state->out.on_next(t);
                }
            },
        // on_error
            [state](rxu::error_ptr e) {
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                state->out.on_completed();
            }
        );
    }
};

}

/*! @copydoc rx-skip_last.hpp
*/
template<class... AN>
auto skip_last(AN&&... an)
    ->     operator_factory<skip_last_tag, AN...> {
    return operator_factory<skip_last_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<skip_last_tag>
{
    template<class Observable, class Count,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class SkipLast = rxo::detail::skip_last<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
        class Value = rxu::value_type_t<SkipLast>,
        class Result = observable<Value, SkipLast>>
    static Result member(Observable&& o, Count&& c) {
        return Result(SkipLast(std::forward<Observable>(o), std::forward<Count>(c)));
    }

    template<class... AN>
    static operators::detail::skip_last_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "skip_last takes (Count)");
    }
};

}

#endif