1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
/*! \file rx-repeat.hpp
\brief Repeat this observable for the given number of times or infinitely.
\tparam Count the type of the counter (optional).
\param t The number of times the source observable items are repeated (optional). If not specified, infinitely repeats the source observable. Specifying 0 returns an empty sequence immediately
\return An observable that repeats the sequence of items emitted by the source observable for t times.
\sample
\snippet repeat.cpp repeat count sample
\snippet output.txt repeat count sample
If the source observable calls on_error, repeat stops:
\snippet repeat.cpp repeat error sample
\snippet output.txt repeat error sample
*/
#if !defined(RXCPP_OPERATORS_RX_REPEAT_HPP)
#define RXCPP_OPERATORS_RX_REPEAT_HPP
#include "../rx-includes.hpp"
#include "rx-retry-repeat-common.hpp"
namespace rxcpp {
namespace operators {
namespace detail {
template<class... AN>
struct repeat_invalid_arguments {};
template<class... AN>
struct repeat_invalid : public rxo::operator_base<repeat_invalid_arguments<AN...>> {
using type = observable<repeat_invalid_arguments<AN...>, repeat_invalid<AN...>>;
};
template<class... AN>
using repeat_invalid_t = typename repeat_invalid<AN...>::type;
// Contain repeat variations in a namespace
namespace repeat {
struct event_handlers {
template <typename State>
static inline void on_error(State& state, std::exception_ptr& e) {
state->out.on_error(e);
}
template <typename State>
static inline void on_completed(State& state) {
// Functions update() and completed_predicate() vary between finite and infinte versions
state->update();
if (state->completed_predicate()) {
state->out.on_completed();
} else {
state->do_subscribe();
}
}
};
// Finite repeat case (explicitely limited with the number of times)
template <class T, class Observable, class Count>
using finite = ::rxcpp::operators::detail::retry_repeat_common::finite
<event_handlers, T, Observable, Count>;
// Infinite repeat case
template <class T, class Observable>
using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite
<event_handlers, T, Observable>;
}
} // detail
/*! @copydoc rx-repeat.hpp
*/
template<class... AN>
auto repeat(AN&&... an)
-> operator_factory<repeat_tag, AN...> {
return operator_factory<repeat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
template<>
struct member_overload<repeat_tag> {
template<class Observable,
class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
class SourceValue = rxu::value_type_t<Observable>,
class Repeat = rxo::detail::repeat::infinite<SourceValue, rxu::decay_t<Observable>>,
class Value = rxu::value_type_t<Repeat>,
class Result = observable<Value, Repeat>>
static Result member(Observable&& o) {
return Result(Repeat(std::forward<Observable>(o)));
}
template<class Observable,
class Count,
class Enabled = rxu::enable_if_all_true_type_t<is_observable<Observable>>,
class SourceValue = rxu::value_type_t<Observable>,
class Repeat = rxo::detail::repeat::finite<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
class Value = rxu::value_type_t<Repeat>,
class Result = observable<Value, Repeat>>
static Result member(Observable&& o, Count&& c) {
return Result(Repeat(std::forward<Observable>(o), std::forward<Count>(c)));
}
template<class... AN>
static operators::detail::repeat_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
static_assert(sizeof...(AN) == 10000, "repeat takes (optional Count)");
}
};
}
#endif
|