summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
blob: 4c1fbef0d9fee715e4f183adc2b29a37fbc329d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_SOURCES_RX_ITERATE_HPP)
#define RXCPP_SOURCES_RX_ITERATE_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace sources {

namespace detail {

template<class Collection>
struct is_iterable
{
    typedef typename std::decay<Collection>::type collection_type;

    struct not_void {};
    template<class CC>
    static auto check(int) -> decltype(std::begin(*(CC*)nullptr));
    template<class CC>
    static not_void check(...);

    static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
};

template<class Collection>
struct iterate_traits
{
    typedef typename std::decay<Collection>::type collection_type;
    typedef decltype(std::begin(*(collection_type*)nullptr)) iterator_type;
    typedef typename std::iterator_traits<iterator_type>::value_type value_type;
};

template<class Collection, class Coordination>
struct iterate : public source_base<typename iterate_traits<Collection>::value_type>
{
    typedef iterate<Collection, Coordination> this_type;
    typedef iterate_traits<Collection> traits;

    typedef typename std::decay<Coordination>::type coordination_type;
    typedef typename coordination_type::coordinator_type coordinator_type;

    typedef typename traits::collection_type collection_type;
    typedef typename traits::iterator_type iterator_type;

    struct iterate_initial_type
    {
        iterate_initial_type(collection_type c, coordination_type cn)
            : collection(std::move(c))
            , coordination(std::move(cn))
        {
        }
        collection_type collection;
        coordination_type coordination;
    };
    iterate_initial_type initial;

    iterate(collection_type c, coordination_type cn)
        : initial(std::move(c), std::move(cn))
    {
    }
    template<class Subscriber>
    void on_subscribe(Subscriber o) const {
        static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");

        typedef typename coordinator_type::template get<Subscriber>::type output_type;

        struct iterate_state_type
            : public iterate_initial_type
        {
            iterate_state_type(const iterate_initial_type& i, output_type o)
                : iterate_initial_type(i)
                , cursor(std::begin(iterate_initial_type::collection))
                , end(std::end(iterate_initial_type::collection))
                , out(std::move(o))
            {
            }
            iterate_state_type(const iterate_state_type& o)
                : iterate_initial_type(o)
                , cursor(std::begin(iterate_initial_type::collection))
                , end(std::end(iterate_initial_type::collection))
                , out(std::move(o.out)) // since lambda capture does not yet support move
            {
            }
            mutable iterator_type cursor;
            iterator_type end;
            mutable output_type out;
        };

        // creates a worker whose lifetime is the same as this subscription
        auto coordinator = initial.coordination.create_coordinator(o.get_subscription());

        iterate_state_type state(initial, o);

        auto controller = coordinator.get_worker();

        auto producer = [state](const rxsc::schedulable& self){
            if (!state.out.is_subscribed()) {
                // terminate loop
                return;
            }

            if (state.cursor != state.end) {
                // send next value
                state.out.on_next(*state.cursor);
                ++state.cursor;
            }

            if (state.cursor == state.end) {
                state.out.on_completed();
                // o is unsubscribed
                return;
            }

            // tail recurse this same action to continue loop
            self();
        };
        auto selectedProducer = on_exception(
            [&](){return coordinator.act(producer);},
            o);
        if (selectedProducer.empty()) {
            return;
        }
        controller.schedule(selectedProducer.get());

    }
};

}

template<class Collection>
auto iterate(Collection c)
    ->      observable<typename detail::iterate_traits<Collection>::value_type, detail::iterate<Collection, identity_one_worker>> {
    return  observable<typename detail::iterate_traits<Collection>::value_type, detail::iterate<Collection, identity_one_worker>>(
                                                                                detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
}
template<class Collection, class Coordination>
auto iterate(Collection c, Coordination cn)
    ->      observable<typename detail::iterate_traits<Collection>::value_type, detail::iterate<Collection, Coordination>> {
    return  observable<typename detail::iterate_traits<Collection>::value_type, detail::iterate<Collection, Coordination>>(
                                                                                detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
}

template<class T>
auto from()
    -> decltype(iterate(std::array<T, 0>(), identity_immediate())) {
    return      iterate(std::array<T, 0>(), identity_immediate());
}
template<class T, class Coordination>
auto from(Coordination cn)
    -> typename std::enable_if<is_coordination<Coordination>::value,
        decltype(   iterate(std::array<T, 0>(), std::move(cn)))>::type {
    return          iterate(std::array<T, 0>(), std::move(cn));
}
template<class Value0, class... ValueN>
auto from(Value0 v0, ValueN... vn)
    -> typename std::enable_if<!is_coordination<Value0>::value,
        decltype(iterate(std::array<Value0, sizeof...(ValueN) + 1>{v0, vn...}, identity_immediate()))>::type {
    std::array<Value0, sizeof...(ValueN) + 1> c = {v0, vn...};
    return iterate(std::move(c), identity_immediate());
}
template<class Coordination, class Value0, class... ValueN>
auto from(Coordination cn, Value0 v0, ValueN... vn)
    -> typename std::enable_if<is_coordination<Coordination>::value,
        decltype(iterate(std::array<Value0, sizeof...(ValueN) + 1>{v0, vn...}, std::move(cn)))>::type {
    std::array<Value0, sizeof...(ValueN) + 1> c = {v0, vn...};
    return iterate(std::move(c), std::move(cn));
}

}

}

#endif