summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2017-02-01 08:58:39 -0800
committerGrigoriy Chudnov <g.chudnov@gmail.com>2017-02-03 09:08:56 +0300
commit6245a18869836ef29f9116808435cefac0eebcb7 (patch)
tree7c42f28dfc2df14930bc11d97fcb5324f3c45ee3
parent07d5e235baabc841d01bfeac6d4476deda0a1bb0 (diff)
downloadRxCpp-6245a18869836ef29f9116808435cefac0eebcb7.tar.gz
move sources docs out of observable
shift tests to include aliases
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-subscribe.hpp80
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp480
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-create.hpp37
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-defer.hpp17
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-empty.hpp24
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-error.hpp26
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-interval.hpp37
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-iterate.hpp147
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-never.hpp15
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-range.hpp31
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-scope.hpp21
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-timer.hpp34
-rw-r--r--Rx/v2/test/operators/concat_map.cpp116
-rw-r--r--Rx/v2/test/operators/flat_map.cpp134
-rw-r--r--Rx/v2/test/operators/on_error_resume_next.cpp71
-rw-r--r--Rx/v2/test/operators/reduce.cpp55
17 files changed, 839 insertions, 488 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
index cd49196..7e54eb8 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
@@ -250,7 +250,7 @@ struct buffer_with_time
}
-/*! @copydoc rx-buffer_with_time.hpp
+/*! @copydoc rx-buffer_time.hpp
*/
template<class... AN>
auto buffer_with_time(AN&&... an)
diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
index 7488bde..c3e3917 100644
--- a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
@@ -81,41 +81,13 @@ public:
}
-template<class T, class Arg0>
-auto subscribe(Arg0&& a0)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))>
- (make_subscriber<T>(std::forward<Arg0>(a0)));
-}
-template<class T, class Arg0, class Arg1>
-auto subscribe(Arg0&& a0, Arg1&& a1)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))>
- (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)));
-}
-template<class T, class Arg0, class Arg1, class Arg2>
-auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))>
- (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)));
-}
-template<class T, class Arg0, class Arg1, class Arg2, class Arg3>
-auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))>
- (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)));
-}
-template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4>
-auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))>
- (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)));
-}
-template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5>
-auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5)
- -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> {
- return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))>
- (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)));
+/*! @copydoc rx-subscribe.hpp
+*/
+template<class T, class... ArgN>
+auto subscribe(ArgN&&... an)
+ -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))> {
+ return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))>
+ (make_subscriber<T>(std::forward<ArgN>(an)...));
}
namespace detail {
@@ -132,11 +104,49 @@ public:
}
+/*! Return a new observable that performs type-forgetting conversion of this observable.
+
+ \return The source observable converted to observable<T>.
+
+ \note This operator could be useful to workaround lambda deduction bug on msvc 2013.
+
+ \sample
+ \snippet as_dynamic.cpp as_dynamic sample
+ \snippet output.txt as_dynamic sample
+*/
inline auto as_dynamic()
-> detail::dynamic_factory {
return detail::dynamic_factory();
}
+namespace detail {
+
+class blocking_factory
+{
+public:
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(std::forward<Observable>(source).as_blocking()) {
+ return std::forward<Observable>(source).as_blocking();
+ }
+};
+
+}
+
+/*! Return a new observable that contains the blocking methods for this observable.
+
+ \return An observable that contains the blocking methods for this observable.
+
+ \sample
+ \snippet from.cpp threaded from sample
+ \snippet output.txt threaded from sample
+*/
+inline auto as_blocking()
+ -> detail::blocking_factory {
+ return detail::blocking_factory();
+}
+
+
}
}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 3db90ab..6ad8da7 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -598,30 +598,16 @@ public:
}
#endif
- /*! Return a new observable that performs type-forgetting conversion of this observable.
-
- \return The source observable converted to observable<T>.
-
- \note This operator could be useful to workaround lambda deduction bug on msvc 2013.
-
- \sample
- \snippet as_dynamic.cpp as_dynamic sample
- \snippet output.txt as_dynamic sample
- */
+ /*! @copydoc rxcpp::operators::as_dynamic
+ */
template<class... AN>
observable<T> as_dynamic(AN**...) const {
return *this;
static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
}
- /*! Return a new observable that contains the blocking methods for this observable.
-
- \return An observable that contains the blocking methods for this observable.
-
- \sample
- \snippet from.cpp threaded from sample
- \snippet output.txt threaded from sample
- */
+ /*! @copydoc rxcpp::operators::as_blocking
+ */
template<class... AN>
blocking_observable<T, this_type> as_blocking(AN**...) const {
return blocking_observable<T, this_type>(*this);
@@ -1571,535 +1557,223 @@ class observable<void, void>
{
~observable();
public:
- /*! Returns an observable that executes the specified function when a subscriber subscribes to it.
-
- \tparam T the type of the items that this observable emits
- \tparam OnSubscribe the type of OnSubscribe handler function
-
- \param os OnSubscribe event handler
-
- \return Observable that executes the specified function when a Subscriber subscribes to it.
-
- \sample
- \snippet create.cpp Create sample
- \snippet output.txt Create sample
-
- \warning
- It is good practice to check the observer's is_subscribed state from within the function you pass to create
- so that your observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.
-
- \badcode
- \snippet create.cpp Create bad code
- \snippet output.txt Create bad code
-
- \goodcode
- \snippet create.cpp Create good code
- \snippet output.txt Create good code
-
- \warning
- It is good practice to use operators like observable::take to control lifetime rather than use the subscription explicitly.
-
- \goodcode
- \snippet create.cpp Create great code
- \snippet output.txt Create great code
- */
+ /*! @copydoc rx-create.hpp
+ */
template<class T, class OnSubscribe>
static auto create(OnSubscribe os)
-> decltype(rxs::create<T>(std::move(os))) {
return rxs::create<T>(std::move(os));
}
- /*! Returns an observable that sends values in the range first-last by adding step to the previous value.
-
- \tparam T the type of the values that this observable emits
- \param first first value to send
- \param last last value to send
- \param step value to add to the previous value to get the next value
-
- \return Observable that sends values in the range first-last by adding step to the previous value.
-
- \sample
- \snippet range.cpp range sample
- \snippet output.txt range sample
- */
+ /*! @copydoc rx-range.hpp
+ */
template<class T>
static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
-> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
return rxs::range<T>(first, last, step, identity_current_thread());
}
- /*! Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler.
-
- \tparam T the type of the values that this observable emits
- \tparam Coordination the type of the scheduler
-
- \param first first value to send
- \param last last value to send
- \param step value to add to the previous value to get the next value
- \param cn the scheduler to run the generator loop on
-
- \return Observable that sends values in the range first-last by adding step to the previous value using the specified scheduler.
-
- \note `step` or both `step` & `last` may be omitted.
-
- \sample
- \snippet range.cpp threaded range sample
- \snippet output.txt threaded range sample
-
- An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator
- \snippet range.cpp subscribe_on range sample
- \snippet output.txt subscribe_on range sample
- */
+ /*! @copydoc rx-range.hpp
+ */
template<class T, class Coordination>
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
-> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
return rxs::range<T>(first, last, step, std::move(cn));
}
- /// Returns an observable that sends values in the range ```first```-```last``` by adding 1 to the previous value. The values are sent on the specified scheduler.
- ///
- /// \see rxcpp::observable<void,void>#range(T first, T last, std::ptrdiff_t step, Coordination cn)
+ /*! @copydoc rx-range.hpp
+ */
template<class T, class Coordination>
static auto range(T first, T last, Coordination cn)
-> decltype(rxs::range<T>(first, last, std::move(cn))) {
return rxs::range<T>(first, last, std::move(cn));
}
- /// Returns an observable that infinitely (until overflow) sends values starting from ```first```. The values are sent on the specified scheduler.
- ///
- /// \see rxcpp::observable<void,void>#range(T first, T last, std::ptrdiff_t step, Coordination cn)
+ /*! @copydoc rx-range.hpp
+ */
template<class T, class Coordination>
static auto range(T first, Coordination cn)
-> decltype(rxs::range<T>(first, std::move(cn))) {
return rxs::range<T>(first, std::move(cn));
}
- /*! Returns an observable that never sends any items or notifications to observer.
-
- \tparam T the type of (not) emitted items
-
- \return Observable that never sends any items or notifications to observer.
- \sample
- \snippet never.cpp never sample
- \snippet output.txt never sample
- */
+ /*! @copydoc rx-never.hpp
+ */
template<class T>
static auto never()
-> decltype(rxs::never<T>()) {
return rxs::never<T>();
}
- /*! Returns an observable that calls the specified observable factory to create an observable for each new observer that subscribes.
- \tparam ObservableFactory the type of the observable factory
-
- \param of the observable factory function to invoke for each observer that subscribes to the resulting observable
-
- \return observable whose observers' subscriptions trigger an invocation of the given observable factory function
-
- \sample
- \snippet defer.cpp defer sample
- \snippet output.txt defer sample
- */
+ /*! @copydoc rx-defer.hpp
+ */
template<class ObservableFactory>
static auto defer(ObservableFactory of)
-> decltype(rxs::defer(std::move(of))) {
return rxs::defer(std::move(of));
}
- /*! Returns an observable that emits a sequential integer every specified time interval.
- \param period period between emitted values
-
- \return Observable that sends a sequential integer each time interval
-
- \sample
- \snippet interval.cpp immediate interval sample
- \snippet output.txt immediate interval sample
- */
+ /*! @copydoc rx-interval.hpp
+ */
template<class... AN>
static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
-> decltype(rxs::interval(period)) {
return rxs::interval(period);
static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
}
- /*! Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param period period between emitted values
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends a sequential integer each time interval
-
- \sample
- \snippet interval.cpp threaded immediate interval sample
- \snippet output.txt threaded immediate interval sample
- */
+ /*! @copydoc rx-interval.hpp
+ */
template<class Coordination>
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
-> decltype(rxs::interval(period, std::move(cn))) {
return rxs::interval(period, std::move(cn));
}
- /*! Returns an observable that emits a sequential integer every specified time interval starting from the specified time point.
-
- \param initial time when the first value is sent
- \param period period between emitted values
-
- \return Observable that sends a sequential integer each time interval
-
- \sample
- \snippet interval.cpp interval sample
- \snippet output.txt interval sample
- */
+ /*! @copydoc rx-interval.hpp
+ */
template<class... AN>
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
-> decltype(rxs::interval(initial, period)) {
return rxs::interval(initial, period);
static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
}
- /*! Returns an observable that emits a sequential integer every specified time interval starting from the specified time point, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param initial time when the first value is sent
- \param period period between emitted values
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends a sequential integer each time interval
-
- \sample
- \snippet interval.cpp threaded interval sample
- \snippet output.txt threaded interval sample
- */
+ /*! @copydoc rx-interval.hpp
+ */
template<class Coordination>
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
-> decltype(rxs::interval(initial, period, std::move(cn))) {
return rxs::interval(initial, period, std::move(cn));
}
- /*! Returns an observable that emits an integer at the specified time point.
-
- \param when time point when the value is emitted
-
- \return Observable that emits an integer at the specified time point
- \sample
- \snippet timer.cpp timepoint timer sample
- \snippet output.txt timepoint timer sample
- */
+ /*! @copydoc rx-timer.hpp
+ */
template<class... AN>
static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
-> decltype(rxs::timer(at)) {
return rxs::timer(at);
static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
}
- /*! Returns an observable that emits an integer in the specified time interval.
-
- \param when interval when the value is emitted
-
- \return Observable that emits an integer in the specified time interval
-
- \sample
- \snippet timer.cpp duration timer sample
- \snippet output.txt duration timer sample
- */
+ /*! @copydoc rx-timer.hpp
+ */
template<class... AN>
static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
-> decltype(rxs::timer(after)) {
return rxs::timer(after);
static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
}
- /*! Returns an observable that emits an integer at the specified time point, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param when time point when the value is emitted
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that emits an integer at the specified time point
-
- \sample
- \snippet timer.cpp threaded timepoint timer sample
- \snippet output.txt threaded timepoint timer sample
- */
+ /*! @copydoc rx-timer.hpp
+ */
template<class Coordination>
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
-> decltype(rxs::timer(when, std::move(cn))) {
return rxs::timer(when, std::move(cn));
}
- /*! Returns an observable that emits an integer in the specified time interval, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param when interval when the value is emitted
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that emits an integer in the specified time interval
-
- \sample
- \snippet timer.cpp threaded duration timer sample
- \snippet output.txt threaded duration timer sample
- */
+ /*! @copydoc rx-timer.hpp
+ */
template<class Coordination>
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
-> decltype(rxs::timer(when, std::move(cn))) {
return rxs::timer(when, std::move(cn));
}
- /*! Returns an observable that sends each value in the collection.
-
- \tparam Collection the type of the collection of values that this observable emits
-
- \param c collection containing values to send
-
- \return Observable that sends each value in the collection.
- \sample
- \snippet iterate.cpp iterate sample
- \snippet output.txt iterate sample
- */
+ /*! @copydoc rx-iterate.hpp
+ */
template<class Collection>
static auto iterate(Collection c)
-> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
return rxs::iterate(std::move(c), identity_current_thread());
}
- /*! Returns an observable that sends each value in the collection, on the specified scheduler.
-
- \tparam Collection the type of the collection of values that this observable emits
- \tparam Coordination the type of the scheduler
-
- \param c collection containing values to send
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends each value in the collection.
-
- \sample
- \snippet iterate.cpp threaded iterate sample
- \snippet output.txt threaded iterate sample
- */
+ /*! @copydoc rx-iterate.hpp
+ */
template<class Collection, class Coordination>
static auto iterate(Collection c, Coordination cn)
-> decltype(rxs::iterate(std::move(c), std::move(cn))) {
return rxs::iterate(std::move(c), std::move(cn));
}
- /*! Returns an observable that sends an empty set of values and then completes.
-
- \tparam T the type of elements (not) to be sent
- \return Observable that sends an empty set of values and then completes.
-
- This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator.
-
- \note This is a degenerate case of ```observable<void,void>::from(Value0 v0, ValueN... vn)``` operator.
- */
+ /*! @copydoc rxcpp::sources::from()
+ */
template<class T>
static auto from()
-> decltype( rxs::from<T>()) {
return rxs::from<T>();
}
- /*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler.
-
- \tparam T the type of elements (not) to be sent
- \tparam Coordination the type of the scheduler
-
- \return Observable that sends an empty set of values and then completes.
-
- \note This is a degenerate case of ```observable<void,void>::from(Coordination cn, Value0 v0, ValueN... vn)``` operator.
- */
+ /*! @copydoc rxcpp::sources::from(Coordination cn)
+ */
template<class T, class Coordination>
static auto from(Coordination cn)
-> typename std::enable_if<is_coordination<Coordination>::value,
decltype( rxs::from<T>(std::move(cn)))>::type {
return rxs::from<T>(std::move(cn));
}
- /*! Returns an observable that sends each value from its arguments list.
-
- \tparam Value0 ...
- \tparam ValueN the type of sending values
-
- \param v0 ...
- \param vn values to send
-
- \return Observable that sends each value from its arguments list.
-
- \sample
- \snippet from.cpp from sample
- \snippet output.txt from sample
-
- \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
- */
+ /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
+ */
template<class Value0, class... ValueN>
static auto from(Value0 v0, ValueN... vn)
-> typename std::enable_if<!is_coordination<Value0>::value,
decltype( rxs::from(v0, vn...))>::type {
return rxs::from(v0, vn...);
}
- /*! Returns an observable that sends each value from its arguments list, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
- \tparam Value0 ...
- \tparam ValueN the type of sending values
-
- \param cn the scheduler to use for scheduling the items
- \param v0 ...
- \param vn values to send
-
- \return Observable that sends each value from its arguments list.
-
- \sample
- \snippet from.cpp threaded from sample
- \snippet output.txt threaded from sample
-
- \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
- */
+ /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
+ */
template<class Coordination, class Value0, class... ValueN>
static auto from(Coordination cn, Value0 v0, ValueN... vn)
-> typename std::enable_if<is_coordination<Coordination>::value,
decltype( rxs::from(std::move(cn), v0, vn...))>::type {
return rxs::from(std::move(cn), v0, vn...);
}
- /*! Returns an observable that sends no items to observer and immediately completes.
- \tparam T the type of (not) emitted items
+ /*! @copydoc rxcpp::sources::just(Value0 v0)
+ */
+ template<class T>
+ static auto just(T v)
+ -> decltype(rxs::just(std::move(v))) {
+ return rxs::just(std::move(v));
+ }
+ /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
+ */
+ template<class T, class Coordination>
+ static auto just(T v, Coordination cn)
+ -> decltype(rxs::just(std::move(v), std::move(cn))) {
+ return rxs::just(std::move(v), std::move(cn));
+ }
- \return Observable that sends no items to observer and immediately completes.
+ /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
+ */
+ template<class Observable, class Value0, class... ValueN>
+ static auto start_with(Observable o, Value0 v0, ValueN... vn)
+ -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
+ return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
+ }
- \sample
- \snippet empty.cpp empty sample
- \snippet output.txt empty sample
- */
+ /*! @copydoc rx-empty.hpp
+ */
template<class T>
static auto empty()
-> decltype(from<T>()) {
return from<T>();
}
- /*! Returns an observable that sends no items to observer and immediately completes, on the specified scheduler.
-
- \tparam T the type of (not) emitted items
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends no items to observer and immediately completes.
-
- \sample
- \snippet empty.cpp threaded empty sample
- \snippet output.txt threaded empty sample
- */
+ /*! @copydoc rx-empty.hpp
+ */
template<class T, class Coordination>
static auto empty(Coordination cn)
-> decltype(from<T>(std::move(cn))) {
return from<T>(std::move(cn));
}
- /*! Returns an observable that sends the specified item to observer and then completes.
-
- \tparam T the type of the emitted item
-
- \param v the value to send
-
- \return Observable that sends the specified item to observer and then completes.
-
- \sample
- \snippet just.cpp just sample
- \snippet output.txt just sample
- */
- template<class T>
- static auto just(T v)
- -> decltype(from(std::move(v))) {
- return from(std::move(v));
- }
- /*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.
-
- \tparam T the type of the emitted item
- \tparam Coordination the type of the scheduler
-
- \param v the value to send
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends the specified item to observer and then completes.
-
- \sample
- \snippet just.cpp threaded just sample
- \snippet output.txt threaded just sample
- */
- template<class T, class Coordination>
- static auto just(T v, Coordination cn)
- -> decltype(from(std::move(cn), std::move(v))) {
- return from(std::move(cn), std::move(v));
- }
- /*! Returns an observable that sends no items to observer and immediately generates an error.
-
- \tparam T the type of (not) emitted items
- \tparam Exception the type of the error
-
- \param e the error to be passed to observers
-
- \return Observable that sends no items to observer and immediately generates an error.
- \sample
- \snippet error.cpp error sample
- \snippet output.txt error sample
- */
+ /*! @copydoc rx-error.hpp
+ */
template<class T, class Exception>
static auto error(Exception&& e)
-> decltype(rxs::error<T>(std::forward<Exception>(e))) {
return rxs::error<T>(std::forward<Exception>(e));
}
- /*! Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler.
-
- \tparam T the type of (not) emitted items
- \tparam Exception the type of the error
- \tparam Coordination the type of the scheduler
-
- \param e the error to be passed to observers
- \param cn the scheduler to use for scheduling the items
-
- \return Observable that sends no items to observer and immediately generates an error.
-
- \sample
- \snippet error.cpp threaded error sample
- \snippet output.txt threaded error sample
- */
+ /*! @copydoc rx-error.hpp
+ */
template<class T, class Exception, class Coordination>
static auto error(Exception&& e, Coordination cn)
-> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
}
- /*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable.
-
- \tparam Observable the type of the observable that emits values for resending
- \tparam Value0 ...
- \tparam ValueN the type of sending values
-
- \param o the observable that emits values for resending
- \param v0 ...
- \param vn values to send
-
- \return Observable that sends the specified values before it begins to send items emitted by the given observable.
-
- \sample
- \snippet start_with.cpp full start_with sample
- \snippet output.txt full start_with sample
-
- Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable:
- \snippet start_with.cpp short start_with sample
- \snippet output.txt short start_with sample
- */
- template<class Observable, class Value0, class... ValueN>
- static auto start_with(Observable o, Value0 v0, ValueN... vn)
- -> decltype(rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
- return rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
- }
- /*! Returns an observable that makes an observable by the specified observable factory
- using the resource provided by the specified resource factory for each new observer that subscribes.
- \tparam ResourceFactory the type of the resource factory
- \tparam ObservableFactory the type of the observable factory
-
- \param rf the resource factory function that resturn the rxcpp::resource that is used as a resource by the observable factory
- \param of the observable factory function to invoke for each observer that subscribes to the resulting observable
-
- \return observable that makes an observable by the specified observable factory
- using the resource provided by the specified resource factory for each new observer that subscribes.
-
- \sample
- \snippet scope.cpp scope sample
- \snippet output.txt scope sample
- */
+ /*! @copydoc rx-scope.hpp
+ */
template<class ResourceFactory, class ObservableFactory>
static auto scope(ResourceFactory rf, ObservableFactory of)
-> decltype(rxs::scope(std::move(rf), std::move(of))) {
diff --git a/Rx/v2/src/rxcpp/sources/rx-create.hpp b/Rx/v2/src/rxcpp/sources/rx-create.hpp
index ed27fea..95b9533 100644
--- a/Rx/v2/src/rxcpp/sources/rx-create.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-create.hpp
@@ -7,6 +7,41 @@
#include "../rx-includes.hpp"
+/*! \file rx-create.hpp
+
+ \brief Returns an observable that executes the specified function when a subscriber subscribes to it.
+
+ \tparam T the type of the items that this observable emits
+ \tparam OnSubscribe the type of OnSubscribe handler function
+
+ \param os OnSubscribe event handler
+
+ \return Observable that executes the specified function when a Subscriber subscribes to it.
+
+ \sample
+ \snippet create.cpp Create sample
+ \snippet output.txt Create sample
+
+ \warning
+ It is good practice to check the observer's is_subscribed state from within the function you pass to create
+ so that your observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.
+
+ \badcode
+ \snippet create.cpp Create bad code
+ \snippet output.txt Create bad code
+
+ \goodcode
+ \snippet create.cpp Create good code
+ \snippet output.txt Create good code
+
+ \warning
+ It is good practice to use operators like observable::take to control lifetime rather than use the subscription explicitly.
+
+ \goodcode
+ \snippet create.cpp Create great code
+ \snippet output.txt Create great code
+*/
+
namespace rxcpp {
namespace sources {
@@ -41,6 +76,8 @@ struct create : public source_base<T>
}
+/*! @copydoc rx-create.hpp
+ */
template<class T, class OnSubscribe>
auto create(OnSubscribe os)
-> observable<T, detail::create<T, OnSubscribe>> {
diff --git a/Rx/v2/src/rxcpp/sources/rx-defer.hpp b/Rx/v2/src/rxcpp/sources/rx-defer.hpp
index 3afff64..306b4d0 100644
--- a/Rx/v2/src/rxcpp/sources/rx-defer.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-defer.hpp
@@ -7,6 +7,21 @@
#include "../rx-includes.hpp"
+/*! \file rx-defer.hpp
+
+ \brief Returns an observable that calls the specified observable factory to create an observable for each new observer that subscribes.
+
+ \tparam ObservableFactory the type of the observable factory
+
+ \param of the observable factory function to invoke for each observer that subscribes to the resulting observable
+
+ \return observable whose observers' subscriptions trigger an invocation of the given observable factory function
+
+ \sample
+ \snippet defer.cpp defer sample
+ \snippet output.txt defer sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -52,6 +67,8 @@ struct defer : public source_base<rxu::value_type_t<defer_traits<ObservableFacto
}
+/*! @copydoc rx-defer.hpp
+ */
template<class ObservableFactory>
auto defer(ObservableFactory of)
-> observable<rxu::value_type_t<detail::defer_traits<ObservableFactory>>, detail::defer<ObservableFactory>> {
diff --git a/Rx/v2/src/rxcpp/sources/rx-empty.hpp b/Rx/v2/src/rxcpp/sources/rx-empty.hpp
index 1a6f1d8..2e11a16 100644
--- a/Rx/v2/src/rxcpp/sources/rx-empty.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-empty.hpp
@@ -7,15 +7,39 @@
#include "../rx-includes.hpp"
+/*! \file rx-empty.hpp
+
+ \brief Returns an observable that sends no items to observer and immediately completes, on the specified scheduler.
+
+ \tparam T the type of (not) emitted items
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param cn the scheduler to use for scheduling the items (optional)
+
+ \return Observable that sends no items to observer and immediately completes.
+
+ \sample
+ \snippet empty.cpp empty sample
+ \snippet output.txt empty sample
+
+ \sample
+ \snippet empty.cpp threaded empty sample
+ \snippet output.txt threaded empty sample
+*/
+
namespace rxcpp {
namespace sources {
+/*! @copydoc rx-empty.hpp
+ */
template<class T>
auto empty()
-> decltype(from<T>()) {
return from<T>();
}
+/*! @copydoc rx-empty.hpp
+ */
template<class T, class Coordination>
auto empty(Coordination cn)
-> decltype(from<T>(std::move(cn))) {
diff --git a/Rx/v2/src/rxcpp/sources/rx-error.hpp b/Rx/v2/src/rxcpp/sources/rx-error.hpp
index 3f3e07b..461c081 100644
--- a/Rx/v2/src/rxcpp/sources/rx-error.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-error.hpp
@@ -7,6 +7,28 @@
#include "../rx-includes.hpp"
+/*! \file rx-error.hpp
+
+ \brief Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler.
+
+ \tparam T the type of (not) emitted items
+ \tparam Exception the type of the error
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param e the error to be passed to observers
+ \param cn the scheduler to use for scheduling the items (optional)
+
+ \return Observable that sends no items to observer and immediately generates an error.
+
+ \sample
+ \snippet error.cpp error sample
+ \snippet output.txt error sample
+
+ \sample
+ \snippet error.cpp threaded error sample
+ \snippet output.txt threaded error sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -86,11 +108,15 @@ auto make_error(throw_instance_tag&&, E e, Coordination cn)
}
+/*! @copydoc rx-error.hpp
+ */
template<class T, class E>
auto error(E e)
-> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
}
+/*! @copydoc rx-error.hpp
+ */
template<class T, class E, class Coordination>
auto error(E e, Coordination cn)
-> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
diff --git a/Rx/v2/src/rxcpp/sources/rx-interval.hpp b/Rx/v2/src/rxcpp/sources/rx-interval.hpp
index b151155..7d3d47d 100644
--- a/Rx/v2/src/rxcpp/sources/rx-interval.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-interval.hpp
@@ -7,6 +7,34 @@
#include "../rx-includes.hpp"
+/*! \file rx-interval.hpp
+
+ \brief Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param period period between emitted values
+ \param cn the scheduler to use for scheduling the items (optional)
+
+ \return Observable that sends a sequential integer each time interval
+
+ \sample
+ \snippet interval.cpp interval sample
+ \snippet output.txt interval sample
+
+ \sample
+ \snippet interval.cpp immediate interval sample
+ \snippet output.txt immediate interval sample
+
+ \sample
+ \snippet interval.cpp threaded interval sample
+ \snippet output.txt threaded interval sample
+
+ \sample
+ \snippet interval.cpp threaded immediate interval sample
+ \snippet output.txt threaded immediate interval sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -78,6 +106,9 @@ struct defer_interval : public defer_observable<
}
+
+/*! @copydoc rx-interval.hpp
+ */
template<class Duration>
auto interval(Duration period)
-> typename std::enable_if<
@@ -86,6 +117,8 @@ auto interval(Duration period)
return detail::defer_interval<Duration, identity_one_worker>::make(identity_current_thread().now(), period, identity_current_thread());
}
+/*! @copydoc rx-interval.hpp
+ */
template<class Coordination>
auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
-> typename std::enable_if<
@@ -94,6 +127,8 @@ auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn));
}
+/*! @copydoc rx-interval.hpp
+ */
template<class Duration>
auto interval(rxsc::scheduler::clock_type::time_point when, Duration period)
-> typename std::enable_if<
@@ -102,6 +137,8 @@ auto interval(rxsc::scheduler::clock_type::time_point when, Duration period)
return detail::defer_interval<Duration, identity_one_worker>::make(when, period, identity_current_thread());
}
+/*! @copydoc rx-interval.hpp
+ */
template<class Coordination>
auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn)
-> typename std::enable_if<
diff --git a/Rx/v2/src/rxcpp/sources/rx-iterate.hpp b/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
index be98c0c..8832775 100644
--- a/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
@@ -7,6 +7,28 @@
#include "../rx-includes.hpp"
+/*! \file rx-iterate.hpp
+
+ \brief Returns an observable that sends each value in the collection, on the specified scheduler.
+
+ \tparam Collection the type of the collection of values that this observable emits
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param c collection containing values to send
+ \param cn the scheduler to use for scheduling the items (optional)
+
+ \return Observable that sends each value in the collection.
+
+ \sample
+ \snippet iterate.cpp iterate sample
+ \snippet output.txt iterate sample
+
+ \sample
+ \snippet iterate.cpp threaded iterate sample
+ \snippet output.txt threaded iterate sample
+
+*/
+
namespace rxcpp {
namespace sources {
@@ -132,12 +154,16 @@ struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>
}
+/*! @copydoc rx-iterate.hpp
+ */
template<class Collection>
auto iterate(Collection c)
-> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
}
+/*! @copydoc rx-iterate.hpp
+ */
template<class Collection, class Coordination>
auto iterate(Collection c, Coordination cn)
-> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
@@ -145,17 +171,52 @@ auto iterate(Collection c, Coordination cn)
detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
}
+/*! Returns an observable that sends an empty set of values and then completes.
+
+ \tparam T the type of elements (not) to be sent
+
+ \return Observable that sends an empty set of values and then completes.
+
+ This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator.
+
+ \note This is a degenerate case of ```from(Value0 v0, ValueN... vn)``` operator.
+*/
template<class T>
auto from()
-> decltype(iterate(std::array<T, 0>(), identity_immediate())) {
return iterate(std::array<T, 0>(), identity_immediate());
}
+/*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler.
+
+ \tparam T the type of elements (not) to be sent
+ \tparam Coordination the type of the scheduler
+
+ \return Observable that sends an empty set of values and then completes.
+
+ \note This is a degenerate case of ```from(Coordination cn, Value0 v0, ValueN... vn)``` operator.
+*/
template<class T, class Coordination>
auto from(Coordination cn)
-> typename std::enable_if<is_coordination<Coordination>::value,
decltype( iterate(std::array<T, 0>(), std::move(cn)))>::type {
return iterate(std::array<T, 0>(), std::move(cn));
}
+/*! Returns an observable that sends each value from its arguments list.
+
+ \tparam Value0 ...
+ \tparam ValueN the type of sending values
+
+ \param v0 ...
+ \param vn values to send
+
+ \return Observable that sends each value from its arguments list.
+
+ \sample
+ \snippet from.cpp from sample
+ \snippet output.txt from sample
+
+ \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
+*/
template<class Value0, class... ValueN>
auto from(Value0 v0, ValueN... vn)
-> typename std::enable_if<!is_coordination<Value0>::value,
@@ -163,6 +224,24 @@ auto from(Value0 v0, ValueN... vn)
std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
return iterate(std::move(c), identity_immediate());
}
+/*! Returns an observable that sends each value from its arguments list, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+ \tparam Value0 ...
+ \tparam ValueN the type of sending values
+
+ \param cn the scheduler to use for scheduling the items
+ \param v0 ...
+ \param vn values to send
+
+ \return Observable that sends each value from its arguments list.
+
+ \sample
+ \snippet from.cpp threaded from sample
+ \snippet output.txt threaded from sample
+
+ \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
+*/
template<class Coordination, class Value0, class... ValueN>
auto from(Coordination cn, Value0 v0, ValueN... vn)
-> typename std::enable_if<is_coordination<Coordination>::value,
@@ -171,6 +250,74 @@ auto from(Coordination cn, Value0 v0, ValueN... vn)
return iterate(std::move(c), std::move(cn));
}
+
+/*! Returns an observable that sends the specified item to observer and then completes.
+
+ \tparam T the type of the emitted item
+
+ \param v the value to send
+
+ \return Observable that sends the specified item to observer and then completes.
+
+ \sample
+ \snippet just.cpp just sample
+ \snippet output.txt just sample
+*/
+template<class Value0>
+auto just(Value0 v0)
+ -> typename std::enable_if<!is_coordination<Value0>::value,
+ decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
+ std::array<Value0, 1> c{{v0}};
+ return iterate(std::move(c), identity_immediate());
+}
+/*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.
+
+ \tparam T the type of the emitted item
+ \tparam Coordination the type of the scheduler
+
+ \param v the value to send
+ \param cn the scheduler to use for scheduling the items
+
+ \return Observable that sends the specified item to observer and then completes.
+
+ \sample
+ \snippet just.cpp threaded just sample
+ \snippet output.txt threaded just sample
+*/
+template<class Value0, class Coordination>
+auto just(Value0 v0, Coordination cn)
+ -> typename std::enable_if<is_coordination<Coordination>::value,
+ decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
+ std::array<Value0, 1> c{{v0}};
+ return iterate(std::move(c), std::move(cn));
+}
+
+/*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable.
+
+ \tparam Observable the type of the observable that emits values for resending
+ \tparam Value0 ...
+ \tparam ValueN the type of sending values
+
+ \param o the observable that emits values for resending
+ \param v0 ...
+ \param vn values to send
+
+ \return Observable that sends the specified values before it begins to send items emitted by the given observable.
+
+ \sample
+ \snippet start_with.cpp full start_with sample
+ \snippet output.txt full start_with sample
+
+ Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable:
+ \snippet start_with.cpp short start_with sample
+ \snippet output.txt short start_with sample
+*/
+template<class Observable, class Value0, class... ValueN>
+auto start_with(Observable o, Value0 v0, ValueN... vn)
+ -> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
+ return from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
+}
+
}
}
diff --git a/Rx/v2/src/rxcpp/sources/rx-never.hpp b/Rx/v2/src/rxcpp/sources/rx-never.hpp
index bcb991e..cd6f4d0 100644
--- a/Rx/v2/src/rxcpp/sources/rx-never.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-never.hpp
@@ -7,6 +7,19 @@
#include "../rx-includes.hpp"
+/*! \file rx-naver.hpp
+
+ \brief Returns an observable that never sends any items or notifications to observer.
+
+ \tparam T the type of (not) emitted items
+
+ \return Observable that never sends any items or notifications to observer.
+
+ \sample
+ \snippet never.cpp never sample
+ \snippet output.txt never sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -23,6 +36,8 @@ struct never : public source_base<T>
}
+/*! @copydoc rx-never.hpp
+ */
template<class T>
auto never()
-> observable<T, detail::never<T>> {
diff --git a/Rx/v2/src/rxcpp/sources/rx-range.hpp b/Rx/v2/src/rxcpp/sources/rx-range.hpp
index 76c0101..d55ffff 100644
--- a/Rx/v2/src/rxcpp/sources/rx-range.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-range.hpp
@@ -7,6 +7,29 @@
#include "../rx-includes.hpp"
+/*! \file rx-range.hpp
+
+ \brief Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler.
+
+ \tparam T the type of the values that this observable emits
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param first first value to send (optional)
+ \param last last value to send (optional)
+ \param step value to add to the previous value to get the next value (optional)
+ \param cn the scheduler to run the generator loop on (optional)
+
+ \return Observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value using the specified scheduler.
+
+ \sample
+ \snippet range.cpp threaded range sample
+ \snippet output.txt threaded range sample
+
+ An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator
+ \snippet range.cpp subscribe_on range sample
+ \snippet output.txt subscribe_on range sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -90,18 +113,24 @@ struct range : public source_base<T>
}
+/*! @copydoc rx-create.hpp
+ */
template<class T>
auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
-> observable<T, detail::range<T, identity_one_worker>> {
return observable<T, detail::range<T, identity_one_worker>>(
detail::range<T, identity_one_worker>(first, last, step, identity_current_thread()));
}
+/*! @copydoc rx-create.hpp
+ */
template<class T, class Coordination>
auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
-> observable<T, detail::range<T, Coordination>> {
return observable<T, detail::range<T, Coordination>>(
detail::range<T, Coordination>(first, last, step, std::move(cn)));
}
+/*! @copydoc rx-create.hpp
+ */
template<class T, class Coordination>
auto range(T first, T last, Coordination cn)
-> typename std::enable_if<is_coordination<Coordination>::value,
@@ -109,6 +138,8 @@ auto range(T first, T last, Coordination cn)
return observable<T, detail::range<T, Coordination>>(
detail::range<T, Coordination>(first, last, 1, std::move(cn)));
}
+/*! @copydoc rx-create.hpp
+ */
template<class T, class Coordination>
auto range(T first, Coordination cn)
-> typename std::enable_if<is_coordination<Coordination>::value,
diff --git a/Rx/v2/src/rxcpp/sources/rx-scope.hpp b/Rx/v2/src/rxcpp/sources/rx-scope.hpp
index 8105ef4..d7151f2 100644
--- a/Rx/v2/src/rxcpp/sources/rx-scope.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-scope.hpp
@@ -7,6 +7,23 @@
#include "../rx-includes.hpp"
+/*! \file rx-scope.hpp
+
+ \brief Returns an observable that makes an observable by the specified observable factory using the resource provided by the specified resource factory for each new observer that subscribes.
+
+ \tparam ResourceFactory the type of the resource factory
+ \tparam ObservableFactory the type of the observable factory
+
+ \param rf the resource factory function that resturn the rxcpp::resource that is used as a resource by the observable factory
+ \param of the observable factory function to invoke for each observer that subscribes to the resulting observable
+
+ \return observable that makes an observable by the specified observable factory using the resource provided by the specified resource factory for each new observer that subscribes.
+
+ \sample
+ \snippet scope.cpp scope sample
+ \snippet output.txt scope sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -77,7 +94,7 @@ struct scope : public source_base<rxu::value_type_t<scope_traits<ResourceFactory
return;
}
state->out.add(state->resource->get_subscription());
-
+
auto selectedCollection = on_exception(
[state](){return state->observable_factory(state->resource.get()); },
state->out);
@@ -91,6 +108,8 @@ struct scope : public source_base<rxu::value_type_t<scope_traits<ResourceFactory
}
+/*! @copydoc rx-scope.hpp
+ */
template<class ResourceFactory, class ObservableFactory>
auto scope(ResourceFactory rf, ObservableFactory of)
-> observable<rxu::value_type_t<detail::scope_traits<ResourceFactory, ObservableFactory>>, detail::scope<ResourceFactory, ObservableFactory>> {
diff --git a/Rx/v2/src/rxcpp/sources/rx-timer.hpp b/Rx/v2/src/rxcpp/sources/rx-timer.hpp
index 3ff7287..2b8e270 100644
--- a/Rx/v2/src/rxcpp/sources/rx-timer.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-timer.hpp
@@ -7,6 +7,34 @@
#include "../rx-includes.hpp"
+/*! \file rx-timer.hpp
+
+ \brief Returns an observable that emits an integer at the specified time point.
+
+ \tparam Coordination the type of the scheduler (optional)
+
+ \param when time point when the value is emitted
+ \param cn the scheduler to use for scheduling the items (optional)
+
+ \return Observable that emits an integer at the specified time point
+
+ \sample
+ \snippet timer.cpp timepoint timer sample
+ \snippet output.txt timepoint timer sample
+
+ \sample
+ \snippet timer.cpp duration timer sample
+ \snippet output.txt duration timer sample
+
+ \sample
+ \snippet timer.cpp threaded timepoint timer sample
+ \snippet output.txt threaded timepoint timer sample
+
+ \sample
+ \snippet timer.cpp threaded duration timer sample
+ \snippet output.txt threaded duration timer sample
+*/
+
namespace rxcpp {
namespace sources {
@@ -70,7 +98,7 @@ struct timer : public source_base<long>
template<class TimePointOrDuration, class Coordination>
struct defer_timer : public defer_observable<
rxu::all_true<
- std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
+ std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value,
is_coordination<Coordination>::value>,
void,
@@ -80,6 +108,8 @@ struct defer_timer : public defer_observable<
}
+/*! @copydoc rx-timer.hpp
+ */
template<class TimePointOrDuration>
auto timer(TimePointOrDuration when)
-> typename std::enable_if<
@@ -88,6 +118,8 @@ auto timer(TimePointOrDuration when)
return detail::defer_timer<TimePointOrDuration, identity_one_worker>::make(when, identity_current_thread());
}
+/*! @copydoc rx-timer.hpp
+ */
template<class TimePointOrDuration, class Coordination>
auto timer(TimePointOrDuration when, Coordination cn)
-> typename std::enable_if<
diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp
index 0a3ee6b..e6551d6 100644
--- a/Rx/v2/test/operators/concat_map.cpp
+++ b/Rx/v2/test/operators/concat_map.cpp
@@ -8,7 +8,7 @@
static const int static_tripletCount = 100;
-SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian][perf]"){
+SCENARIO("concat_transform pythagorian ranges", "[hide][range][concat_transform][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -25,14 +25,14 @@ SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .concat_map(
+ .concat_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .concat_map(
+ .concat_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
@@ -53,7 +53,7 @@ SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian
}
}
-SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map][synchronize][pythagorian][perf]"){
+SCENARIO("synchronize concat_transform pythagorian ranges", "[hide][range][concat_transform][synchronize][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -67,10 +67,10 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map]
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .concat_map(
+ .concat_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .concat_map(
+ .concat_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -79,7 +79,7 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map]
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -101,7 +101,7 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map]
}
}
-SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][observe_on][pythagorian][perf]"){
+SCENARIO("observe_on concat_transform pythagorian ranges", "[hide][range][concat_transform][observe_on][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -115,10 +115,10 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .concat_map(
+ .concat_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .concat_map(
+ .concat_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -127,7 +127,7 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -150,7 +150,7 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][
}
}
-SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][serialize][pythagorian][perf]"){
+SCENARIO("serialize concat_transform pythagorian ranges", "[hide][range][concat_transform][serialize][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -164,10 +164,10 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .concat_map(
+ .concat_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .concat_map(
+ .concat_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -176,7 +176,7 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -199,7 +199,7 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s
}
}
-SCENARIO("concat_map completes", "[concat_map][map][operators]"){
+SCENARIO("concat_map completes", "[concat_map][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -268,13 +268,85 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){
REQUIRE(required == actual);
}
}
+ }
+}
+
+SCENARIO("concat_transform completes", "[concat_transform][transform][map][operators]"){
+ GIVEN("two cold observables. one of ints. one of strings."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> i_on;
+ const rxsc::test::messages<std::string> s_on;
+
+ auto xs = sc.make_cold_observable({
+ i_on.next(100, 4),
+ i_on.next(200, 2),
+ i_on.completed(500)
+ });
+
+ auto ys = sc.make_cold_observable({
+ s_on.next(50, "foo"),
+ s_on.next(100, "bar"),
+ s_on.next(150, "baz"),
+ s_on.next(200, "qux"),
+ s_on.completed(250)
+ });
+
+ WHEN("each int is mapped to the strings"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ | rxo::concat_transform(
+ [&](int){
+ return ys;},
+ [](int, std::string s){
+ return s;})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains strings repeated for each int"){
+ auto required = rxu::to_vector({
+ s_on.next(350, "foo"),
+ s_on.next(400, "bar"),
+ s_on.next(450, "baz"),
+ s_on.next(500, "qux"),
+ s_on.next(600, "foo"),
+ s_on.next(650, "bar"),
+ s_on.next(700, "baz"),
+ s_on.next(750, "qux"),
+ s_on.completed(800)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ints"){
+ auto required = rxu::to_vector({
+ i_on.subscribe(200, 700)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were 2 subscription and unsubscription to the strings"){
+ auto required = rxu::to_vector({
+ s_on.subscribe(300, 550),
+ s_on.subscribe(550, 800)
+ });
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
WHEN("each int is mapped to the strings with coordinator"){
auto res = w.start(
[&]() {
return xs
- .concat_map(
+ .concat_transform(
[&](int){
return ys;},
[](int, std::string s){
@@ -321,7 +393,7 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){
}
}
-SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][operators]"){
+SCENARIO("concat_transform, no result selector, no coordination", "[concat_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -347,7 +419,7 @@ SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][o
auto res = w.start(
[&]() {
return xs
- .concat_map(
+ .concat_transform(
[&](int){
return ys;})
// forget type to workaround lambda deduction bug on msvc 2013
@@ -391,7 +463,7 @@ SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][o
}
}
-SCENARIO("concat_map, no result selector, with coordination", "[concat_map][map][operators]"){
+SCENARIO("concat_transform, no result selector, with coordination", "[concat_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -417,7 +489,7 @@ SCENARIO("concat_map, no result selector, with coordination", "[concat_map][map]
auto res = w.start(
[&]() {
return xs
- .concat_map(
+ .concat_transform(
[&](int){
return ys;},
rx::identity_current_thread())
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index e2e3cf8..2e01a30 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -45,7 +45,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
}
}
-SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][perf]"){
+SCENARIO("merge_transform pythagorian ranges", "[hide][range][merge_transform][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -60,14 +60,14 @@ SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][pe
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .flat_map(
+ .merge_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
.flat_map(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
@@ -89,7 +89,7 @@ SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][pe
}
}
-SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][synchronize][pythagorian][perf]"){
+SCENARIO("synchronize merge_transform pythagorian ranges", "[hide][range][merge_transform][synchronize][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -103,10 +103,10 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .flat_map(
+ .merge_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .flat_map(
+ .merge_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -115,7 +115,7 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -137,7 +137,7 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn
}
}
-SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][observe_on][pythagorian][perf]"){
+SCENARIO("observe_on merge_transform pythagorian ranges", "[hide][range][merge_transform][observe_on][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -151,10 +151,10 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .flat_map(
+ .merge_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .flat_map(
+ .merge_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -163,7 +163,7 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -185,7 +185,7 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse
}
}
-SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][serialize][pythagorian][perf]"){
+SCENARIO("serialize merge_transform pythagorian ranges", "[hide][range][merge_transform][serialize][pythagorian][perf]"){
const int& tripletCount = static_tripletCount;
GIVEN("some ranges"){
WHEN("generating pythagorian triplets"){
@@ -199,10 +199,10 @@ SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][seria
auto start = clock::now();
auto triples =
rxs::range(1, so)
- .flat_map(
+ .merge_transform(
[&c, so](int z){
return rxs::range(1, z, 1, so)
- .flat_map(
+ .merge_transform(
[&c, so, z](int x){
return rxs::range(x, z, 1, so)
.filter([&c, z, x](int y){
@@ -211,7 +211,7 @@ SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][seria
return true;}
else {
return false;}})
- .map([z, x](int y){return std::make_tuple(x, y, z);})
+ .transform([z, x](int y){return std::make_tuple(x, y, z);})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();},
[](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
@@ -314,13 +314,97 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
REQUIRE(required == actual);
}
}
+ }
+}
+
+SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){
+ GIVEN("two cold observables. one of ints. one of strings."){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> i_on;
+ const rxsc::test::messages<std::string> s_on;
+
+ auto xs = sc.make_cold_observable({
+ i_on.next(100, 4),
+ i_on.next(200, 2),
+ i_on.next(300, 3),
+ i_on.next(400, 1),
+ i_on.completed(500)
+ });
+
+ auto ys = sc.make_cold_observable({
+ s_on.next(50, "foo"),
+ s_on.next(100, "bar"),
+ s_on.next(150, "baz"),
+ s_on.next(200, "qux"),
+ s_on.completed(250)
+ });
+
+ WHEN("each int is mapped to the strings"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ | rxo::merge_transform(
+ [&](int){
+ return ys;},
+ [](int, std::string s){
+ return s;})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains strings repeated for each int"){
+ auto required = rxu::to_vector({
+ s_on.next(350, "foo"),
+ s_on.next(400, "bar"),
+ s_on.next(450, "baz"),
+ s_on.next(450, "foo"),
+ s_on.next(500, "qux"),
+ s_on.next(500, "bar"),
+ s_on.next(550, "baz"),
+ s_on.next(550, "foo"),
+ s_on.next(600, "qux"),
+ s_on.next(600, "bar"),
+ s_on.next(650, "baz"),
+ s_on.next(650, "foo"),
+ s_on.next(700, "qux"),
+ s_on.next(700, "bar"),
+ s_on.next(750, "baz"),
+ s_on.next(800, "qux"),
+ s_on.completed(850)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the ints"){
+ auto required = rxu::to_vector({
+ i_on.subscribe(200, 700)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were four subscription and unsubscription to the strings"){
+ auto required = rxu::to_vector({
+ s_on.subscribe(300, 550),
+ s_on.subscribe(400, 650),
+ s_on.subscribe(500, 750),
+ s_on.subscribe(600, 850)
+ });
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
WHEN("each int is mapped to the strings with coordinator"){
auto res = w.start(
[&]() {
return xs
- .flat_map(
+ .merge_transform(
[&](int){
return ys;},
[](int, std::string s){
@@ -377,7 +461,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){
}
}
-SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
+SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -406,7 +490,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
auto res = w.start(
[&]() {
return xs
- .flat_map([&](int){return ys;}, [](int, std::string s){return s;})
+ .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
@@ -464,7 +548,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){
}
}
-SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
+SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -494,7 +578,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
auto res = w.start(
[&]() {
return xs
- .flat_map([&](int){return ys;}, [](int, std::string s){return s;})
+ .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
@@ -538,7 +622,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){
}
}
-SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][operators]"){
+SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -566,7 +650,7 @@ SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][opera
auto res = w.start(
[&]() {
return xs
- .flat_map(
+ .merge_transform(
[&](int){
return ys;})
// forget type to workaround lambda deduction bug on msvc 2013
@@ -620,7 +704,7 @@ SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][opera
}
}
-SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][operators]"){
+SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){
GIVEN("two cold observables. one of ints. one of strings."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -648,7 +732,7 @@ SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][ope
auto res = w.start(
[&]() {
return xs
- .flat_map(
+ .merge_transform(
[&](int){
return ys;},
rx::identity_current_thread())
@@ -701,4 +785,4 @@ SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][ope
}
}
}
-} \ No newline at end of file
+}
diff --git a/Rx/v2/test/operators/on_error_resume_next.cpp b/Rx/v2/test/operators/on_error_resume_next.cpp
index 490bd92..3761094 100644
--- a/Rx/v2/test/operators/on_error_resume_next.cpp
+++ b/Rx/v2/test/operators/on_error_resume_next.cpp
@@ -1,6 +1,77 @@
#include "../test.h"
#include <rxcpp/operators/rx-on_error_resume_next.hpp>
+SCENARIO("switch_on_error stops on completion", "[switch_on_error][on_error_resume_next][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(180, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.next(350, 5),
+ on.completed(400),
+ on.next(410, -1),
+ on.completed(420),
+ on.error(430, std::runtime_error("error on unsubscribed stream"))
+ });
+
+ auto ys = sc.make_cold_observable({
+ on.next(10, -1),
+ on.completed(20),
+ });
+
+ WHEN("passed through unchanged"){
+
+ auto res = w.start(
+ [xs, ys, &invoked]() {
+ return xs
+ .switch_on_error([ys, &invoked](std::exception_ptr) {
+ invoked++;
+ return ys;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output stops on completion"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(290, 4),
+ on.next(350, 5),
+ on.completed(400)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one xs subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was no ys subscription"){
+ auto required = std::vector<rxcpp::notifications::subscription>();
+ auto actual = ys.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("switch_on_error selector was not called"){
+ REQUIRE(0 == invoked);
+ }
+ }
+ }
+}
+
SCENARIO("on_error_resume_next stops on completion", "[on_error_resume_next][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
diff --git a/Rx/v2/test/operators/reduce.cpp b/Rx/v2/test/operators/reduce.cpp
index 819f86c..33a0644 100644
--- a/Rx/v2/test/operators/reduce.cpp
+++ b/Rx/v2/test/operators/reduce.cpp
@@ -56,6 +56,61 @@ SCENARIO("reduce some data with seed", "[reduce][operators]"){
}
}
+SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ int seed = 42;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 0),
+ on.next(220, 1),
+ on.next(230, 2),
+ on.next(240, 3),
+ on.next(250, 4),
+ on.completed(260)
+ });
+
+ WHEN("mapped to ints that are one larger"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .accumulate(seed,
+ [](int sum, int x) {
+ return sum + x;
+ },
+ [](int sum) {
+ return sum * 5;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output stops on completion"){
+ auto required = rxu::to_vector({
+ on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
+ on.completed(260)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 260)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
SCENARIO("average some data", "[reduce][average][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();