summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
blob: 594834768ca16ee70c744e869ccef9ead82c31ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP)
#define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class T, class Duration, class Coordination>
struct buffer_with_time
{
    static_assert(std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, "Duration parameter must convert to rxsc::scheduler::clock_type::duration");
    static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");

    typedef rxu::decay_t<T> source_value_type;
    typedef rxu::decay_t<Coordination> coordination_type;
    typedef typename coordination_type::coordinator_type coordinator_type;
    typedef rxu::decay_t<Duration> duration_type;

    struct buffer_with_time_values
    {
        buffer_with_time_values(duration_type p, duration_type s, coordination_type c)
            : period(p)
            , skip(s)
            , coordination(c)
        {
        }
        duration_type period;
        duration_type skip;
        coordination_type coordination;
    };
    buffer_with_time_values initial;

    buffer_with_time(duration_type period, duration_type skip, coordination_type coordination)
        : initial(period, skip, coordination)
    {
    }

    template<class Subscriber>
    struct buffer_with_time_observer
    {
        typedef buffer_with_time_observer<Subscriber> this_type;
        typedef std::vector<T> value_type;
        typedef rxu::decay_t<Subscriber> dest_type;
        typedef observer<value_type, this_type> observer_type;

        struct buffer_with_time_subscriber_values : public buffer_with_time_values
        {
            buffer_with_time_subscriber_values(dest_type d, buffer_with_time_values v, coordinator_type c)
                : buffer_with_time_values(v)
                , dest(std::move(d))
                , coordinator(std::move(c))
                , worker(std::move(coordinator.get_worker()))
                , expected(worker.now())
            {
            }
            dest_type dest;
            coordinator_type coordinator;
            rxsc::worker worker;
            mutable std::deque<value_type> chunks;
            rxsc::scheduler::clock_type::time_point expected;
        };
        std::shared_ptr<buffer_with_time_subscriber_values> state;

        buffer_with_time_observer(dest_type d, buffer_with_time_values v, coordinator_type c)
            : state(std::make_shared<buffer_with_time_subscriber_values>(buffer_with_time_subscriber_values(std::move(d), v, std::move(c))))
        {
            auto localState = state;
            auto produce_buffer = [localState](const rxsc::schedulable&) {
                localState->dest.on_next(std::move(localState->chunks.front()));
                localState->chunks.pop_front();
            };
            auto create_buffer = [localState, produce_buffer](const rxsc::schedulable&) {
                localState->chunks.emplace_back();
                auto produce_at = localState->expected + localState->period;
                localState->expected += localState->skip;
                localState->worker.schedule(produce_at, produce_buffer);
            };

            state->worker.schedule_periodically(
                state->expected,
                state->skip,
                create_buffer);
        }
        void on_next(T v) const {
            for(auto& chunk : state->chunks) {
                chunk.push_back(v);
            }
        }
        void on_error(std::exception_ptr e) const {
            state->dest.on_error(e);
        }
        void on_completed() const {
            auto done = on_exception(
                [&](){
                    while (!state->chunks.empty()) {
                        state->dest.on_next(std::move(state->chunks.front()));
                        state->chunks.pop_front();
                    }
                    return true;
                },
                state->dest);
            if (done.empty()) {
                return;
            }
            state->dest.on_completed();
        }

        static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_values v) {
            auto cs = d.get_subscription();
            auto coordinator = v.coordination.create_coordinator(cs);

            return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v), std::move(coordinator)));
        }
    };

    template<class Subscriber>
    auto operator()(Subscriber dest) const
        -> decltype(buffer_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
        return      buffer_with_time_observer<Subscriber>::make(std::move(dest), initial);
    }
};

template<class Duration, class Coordination>
class buffer_with_time_factory
{
    typedef rxu::decay_t<Duration> duration_type;
    typedef rxu::decay_t<Coordination> coordination_type;

    duration_type period;
    duration_type skip;
    coordination_type coordination;
public:
    buffer_with_time_factory(duration_type p, duration_type s, coordination_type c) : period(p), skip(s), coordination(c) {}
    template<class Observable>
    auto operator()(Observable&& source)
        -> decltype(source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination))) {
        return      source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination));
    }
};

}

template<class Duration, class Coordination>
inline auto buffer_with_time(Duration period, Coordination coordination)
    ->      detail::buffer_with_time_factory<Duration, Coordination> {
    return  detail::buffer_with_time_factory<Duration, Coordination>(period, period, coordination);
}

template<class Duration, class Coordination>
inline auto buffer_with_time(Duration period, Duration skip, Coordination coordination)
    ->      detail::buffer_with_time_factory<Duration, Coordination> {
    return  detail::buffer_with_time_factory<Duration, Coordination>(period, skip, coordination);
}

}

}

#endif