summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
blob: 88327758574cf792856b36b35f3d5bd189e7da0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_SOURCES_RX_ITERATE_HPP)
#define RXCPP_SOURCES_RX_ITERATE_HPP

#include "../rx-includes.hpp"

/*! \file rx-iterate.hpp

    \brief Returns an observable that sends each value in the collection, on the specified scheduler.

    \tparam Collection    the type of the collection of values that this observable emits
    \tparam Coordination  the type of the scheduler (optional)

    \param  c   collection containing values to send
    \param  cn  the scheduler to use for scheduling the items (optional)

    \return  Observable that sends each value in the collection.

    \sample
    \snippet iterate.cpp iterate sample
    \snippet output.txt iterate sample

    \sample
    \snippet iterate.cpp threaded iterate sample
    \snippet output.txt threaded iterate sample

*/

namespace rxcpp {

namespace sources {

namespace detail {

template<class Collection>
struct is_iterable
{
    typedef rxu::decay_t<Collection> collection_type;

    struct not_void {};
    template<class CC>
    static auto check(int) -> decltype(std::begin(*(CC*)nullptr));
    template<class CC>
    static not_void check(...);

    static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
};

template<class Collection>
struct iterate_traits
{
    typedef rxu::decay_t<Collection> collection_type;
    typedef decltype(std::begin(*(collection_type*)nullptr)) iterator_type;
    typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
};

template<class Collection, class Coordination>
struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>>>
{
    typedef iterate<Collection, Coordination> this_type;
    typedef iterate_traits<Collection> traits;

    typedef rxu::decay_t<Coordination> coordination_type;
    typedef typename coordination_type::coordinator_type coordinator_type;

    typedef typename traits::collection_type collection_type;
    typedef typename traits::iterator_type iterator_type;

    struct iterate_initial_type
    {
        iterate_initial_type(collection_type c, coordination_type cn)
            : collection(std::move(c))
            , coordination(std::move(cn))
        {
        }
        collection_type collection;
        coordination_type coordination;
    };
    iterate_initial_type initial;

    iterate(collection_type c, coordination_type cn)
        : initial(std::move(c), std::move(cn))
    {
    }
    template<class Subscriber>
    void on_subscribe(Subscriber o) const {
        static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");

        typedef typename coordinator_type::template get<Subscriber>::type output_type;

        struct iterate_state_type
            : public iterate_initial_type
        {
            iterate_state_type(const iterate_initial_type& i, output_type o)
                : iterate_initial_type(i)
                , cursor(std::begin(iterate_initial_type::collection))
                , end(std::end(iterate_initial_type::collection))
                , out(std::move(o))
            {
            }
            iterate_state_type(const iterate_state_type& o)
                : iterate_initial_type(o)
                , cursor(std::begin(iterate_initial_type::collection))
                , end(std::end(iterate_initial_type::collection))
                , out(std::move(o.out)) // since lambda capture does not yet support move
            {
            }
            mutable iterator_type cursor;
            iterator_type end;
            mutable output_type out;
        };

        // creates a worker whose lifetime is the same as this subscription
        auto coordinator = initial.coordination.create_coordinator(o.get_subscription());

        iterate_state_type state(initial, o);

        auto controller = coordinator.get_worker();

        auto producer = [state](const rxsc::schedulable& self){
            if (!state.out.is_subscribed()) {
                // terminate loop
                return;
            }

            if (state.cursor != state.end) {
                // send next value
                state.out.on_next(*state.cursor);
                ++state.cursor;
            }

            if (state.cursor == state.end) {
                state.out.on_completed();
                // o is unsubscribed
                return;
            }

            // tail recurse this same action to continue loop
            self();
        };
        auto selectedProducer = on_exception(
            [&](){return coordinator.act(producer);},
            o);
        if (selectedProducer.empty()) {
            return;
        }
        controller.schedule(selectedProducer.get());

    }
};

}

/*! @copydoc rx-iterate.hpp
 */
template<class Collection>
auto iterate(Collection c)
    ->      observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
    return  observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
                                                                              detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
}
/*! @copydoc rx-iterate.hpp
 */
template<class Collection, class Coordination>
auto iterate(Collection c, Coordination cn)
    ->      observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
    return  observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>>(
                                                                              detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
}

/*! Returns an observable that sends an empty set of values and then completes.

    \tparam T  the type of elements (not) to be sent

    \return  Observable that sends an empty set of values and then completes.

    This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator.

    \note This is a degenerate case of ```from(Value0 v0, ValueN... vn)``` operator.
*/
template<class T>
auto from()
    -> decltype(iterate(std::array<T, 0>(), identity_immediate())) {
    return      iterate(std::array<T, 0>(), identity_immediate());
}
/*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler.

    \tparam T  the type of elements (not) to be sent
    \tparam Coordination  the type of the scheduler

    \return  Observable that sends an empty set of values and then completes.

    \note This is a degenerate case of ```from(Coordination cn, Value0 v0, ValueN... vn)``` operator.
*/
template<class T, class Coordination>
auto from(Coordination cn)
    -> typename std::enable_if<is_coordination<Coordination>::value,
        decltype(   iterate(std::array<T, 0>(), std::move(cn)))>::type {
    return          iterate(std::array<T, 0>(), std::move(cn));
}
/*! Returns an observable that sends each value from its arguments list.

    \tparam Value0  ...
    \tparam ValueN  the type of sending values

    \param  v0  ...
    \param  vn  values to send

    \return  Observable that sends each value from its arguments list.

    \sample
    \snippet from.cpp from sample
    \snippet output.txt from sample

    \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
*/
template<class Value0, class... ValueN>
auto from(Value0 v0, ValueN... vn)
    -> typename std::enable_if<!is_coordination<Value0>::value,
        decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
    std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
    return iterate(std::move(c), identity_immediate());
}
/*! Returns an observable that sends each value from its arguments list, on the specified scheduler.

    \tparam Coordination  the type of the scheduler
    \tparam Value0  ...
    \tparam ValueN  the type of sending values

    \param  cn  the scheduler to use for scheduling the items
    \param  v0  ...
    \param  vn  values to send

    \return  Observable that sends each value from its arguments list.

    \sample
    \snippet from.cpp threaded from sample
    \snippet output.txt threaded from sample

    \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
*/
template<class Coordination, class Value0, class... ValueN>
auto from(Coordination cn, Value0 v0, ValueN... vn)
    -> typename std::enable_if<is_coordination<Coordination>::value,
        decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
    std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
    return iterate(std::move(c), std::move(cn));
}


/*! Returns an observable that sends the specified item to observer and then completes.

    \tparam T  the type of the emitted item

    \param v  the value to send

    \return  Observable that sends the specified item to observer and then completes.

    \sample
    \snippet just.cpp just sample
    \snippet output.txt just sample
*/
template<class Value0>
auto just(Value0 v0)
    -> typename std::enable_if<!is_coordination<Value0>::value,
        decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
    std::array<Value0, 1> c{{v0}};
    return iterate(std::move(c), identity_immediate());
}
/*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.

    \tparam T             the type of the emitted item
    \tparam Coordination  the type of the scheduler

    \param v   the value to send
    \param cn  the scheduler to use for scheduling the items

    \return  Observable that sends the specified item to observer and then completes.

    \sample
    \snippet just.cpp threaded just sample
    \snippet output.txt threaded just sample
*/
template<class Value0, class Coordination>
auto just(Value0 v0, Coordination cn)
    -> typename std::enable_if<is_coordination<Coordination>::value,
        decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
    std::array<Value0, 1> c{{v0}};
    return iterate(std::move(c), std::move(cn));
}

/*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable.

    \tparam Observable  the type of the observable that emits values for resending
    \tparam Value0      ...
    \tparam ValueN      the type of sending values

    \param  o   the observable that emits values for resending
    \param  v0  ...
    \param  vn  values to send

    \return  Observable that sends the specified values before it begins to send items emitted by the given observable.

    \sample
    \snippet start_with.cpp full start_with sample
    \snippet output.txt full start_with sample

    Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable:
    \snippet start_with.cpp short start_with sample
    \snippet output.txt short start_with sample
*/
template<class Observable, class Value0, class... ValueN>
auto start_with(Observable o, Value0 v0, ValueN... vn)
    -> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
    return      from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
}

}

}

#endif