summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
blob: a4dea3b2dea8f6325b344581c79b022887f98ed1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-buffer_count.hpp

    \brief Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
           If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.

    \param count  the maximum size of each buffers before it should be emitted.
    \param skip   how many items need to be skipped before starting a new buffers (optional).

    \return  Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable.
             If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.

    \sample
    \snippet buffer.cpp buffer count sample
    \snippet output.txt buffer count sample

    \sample
    \snippet buffer.cpp buffer count+skip sample
    \snippet output.txt buffer count+skip sample
*/

#if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
#define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct buffer_count_invalid_arguments {};

template<class... AN>
struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
    using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>;
};
template<class... AN>
using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type;

template<class T>
struct buffer_count
{
    typedef rxu::decay_t<T> source_value_type;
    typedef std::vector<source_value_type> value_type;

    struct buffer_count_values
    {
        buffer_count_values(int c, int s)
            : count(c)
            , skip(s)
        {
        }
        int count;
        int skip;
    };

    buffer_count_values initial;

    buffer_count(int count, int skip)
        : initial(count, skip)
    {
    }

    template<class Subscriber>
    struct buffer_count_observer : public buffer_count_values
    {
        typedef buffer_count_observer<Subscriber> this_type;
        typedef std::vector<T> value_type;
        typedef rxu::decay_t<Subscriber> dest_type;
        typedef observer<value_type, this_type> observer_type;
        dest_type dest;
        mutable int cursor;
        mutable std::deque<value_type> chunks;

        buffer_count_observer(dest_type d, buffer_count_values v)
            : buffer_count_values(v)
            , dest(std::move(d))
            , cursor(0)
        {
        }
        void on_next(T v) const {
            if (cursor++ % this->skip == 0) {
                chunks.emplace_back();
            }
            for(auto& chunk : chunks) {
                chunk.push_back(v);
            }
            while (!chunks.empty() && int(chunks.front().size()) == this->count) {
                dest.on_next(std::move(chunks.front()));
                chunks.pop_front();
            }
        }
        void on_error(std::exception_ptr e) const {
            dest.on_error(e);
        }
        void on_completed() const {
            auto done = on_exception(
                [&](){
                    while (!chunks.empty()) {
                        dest.on_next(std::move(chunks.front()));
                        chunks.pop_front();
                    }
                    return true;
                },
                dest);
            if (done.empty()) {
                return;
            }
            dest.on_completed();
        }

        static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
            auto cs = d.get_subscription();
            return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
        }
    };

    template<class Subscriber>
    auto operator()(Subscriber dest) const
        -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
        return      buffer_count_observer<Subscriber>::make(std::move(dest), initial);
    }
};

}

/*! @copydoc rx-buffer_count.hpp
*/
template<class... AN>
auto buffer(AN&&... an)
    ->      operator_factory<buffer_count_tag, AN...> {
     return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<buffer_count_tag>
{
    template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class BufferCount = rxo::detail::buffer_count<SourceValue>,
        class Value = rxu::value_type_t<BufferCount>>
    static auto member(Observable&& o, int count, int skip)
        -> decltype(o.template lift<Value>(BufferCount(count, skip))) {
        return      o.template lift<Value>(BufferCount(count, skip));
    }

     template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class BufferCount = rxo::detail::buffer_count<SourceValue>,
        class Value = rxu::value_type_t<BufferCount>>
    static auto member(Observable&& o, int count)
        -> decltype(o.template lift<Value>(BufferCount(count, count))) {
        return      o.template lift<Value>(BufferCount(count, count));
    }

    template<class... AN>
    static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)");
    }
};

}

#endif