diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-02-01 08:58:39 -0800 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-02-03 09:08:56 +0300 |
commit | 6245a18869836ef29f9116808435cefac0eebcb7 (patch) | |
tree | 7c42f28dfc2df14930bc11d97fcb5324f3c45ee3 | |
parent | 07d5e235baabc841d01bfeac6d4476deda0a1bb0 (diff) | |
download | RxCpp-6245a18869836ef29f9116808435cefac0eebcb7.tar.gz |
move sources docs out of observable
shift tests to include aliases
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-subscribe.hpp | 80 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 480 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-create.hpp | 37 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-defer.hpp | 17 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-empty.hpp | 24 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-error.hpp | 26 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-interval.hpp | 37 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-iterate.hpp | 147 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-never.hpp | 15 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-range.hpp | 31 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-scope.hpp | 21 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/sources/rx-timer.hpp | 34 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat_map.cpp | 116 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 134 | ||||
-rw-r--r-- | Rx/v2/test/operators/on_error_resume_next.cpp | 71 | ||||
-rw-r--r-- | Rx/v2/test/operators/reduce.cpp | 55 |
17 files changed, 839 insertions, 488 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp index cd49196..7e54eb8 100644 --- a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp @@ -250,7 +250,7 @@ struct buffer_with_time } -/*! @copydoc rx-buffer_with_time.hpp +/*! @copydoc rx-buffer_time.hpp */ template<class... AN> auto buffer_with_time(AN&&... an) diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp index 7488bde..c3e3917 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp @@ -81,41 +81,13 @@ public: } -template<class T, class Arg0> -auto subscribe(Arg0&& a0) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0)))> - (make_subscriber<T>(std::forward<Arg0>(a0))); -} -template<class T, class Arg0, class Arg1> -auto subscribe(Arg0&& a0, Arg1&& a1) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1)))> - (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1))); -} -template<class T, class Arg0, class Arg1, class Arg2> -auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2)))> - (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2))); -} -template<class T, class Arg0, class Arg1, class Arg2, class Arg3> -auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3)))> - (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3))); -} -template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4> -auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4)))> - (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4))); -} -template<class T, class Arg0, class Arg1, class Arg2, class Arg3, class Arg4, class Arg5> -auto subscribe(Arg0&& a0, Arg1&& a1, Arg2&& a2, Arg3&& a3, Arg4&& a4, Arg5&& a5) - -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> { - return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5)))> - (make_subscriber<T>(std::forward<Arg0>(a0), std::forward<Arg1>(a1), std::forward<Arg2>(a2), std::forward<Arg3>(a3), std::forward<Arg4>(a4), std::forward<Arg5>(a5))); +/*! @copydoc rx-subscribe.hpp +*/ +template<class T, class... ArgN> +auto subscribe(ArgN&&... an) + -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))> { + return detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))> + (make_subscriber<T>(std::forward<ArgN>(an)...)); } namespace detail { @@ -132,11 +104,49 @@ public: } +/*! Return a new observable that performs type-forgetting conversion of this observable. + + \return The source observable converted to observable<T>. + + \note This operator could be useful to workaround lambda deduction bug on msvc 2013. + + \sample + \snippet as_dynamic.cpp as_dynamic sample + \snippet output.txt as_dynamic sample +*/ inline auto as_dynamic() -> detail::dynamic_factory { return detail::dynamic_factory(); } +namespace detail { + +class blocking_factory +{ +public: + template<class Observable> + auto operator()(Observable&& source) + -> decltype(std::forward<Observable>(source).as_blocking()) { + return std::forward<Observable>(source).as_blocking(); + } +}; + +} + +/*! Return a new observable that contains the blocking methods for this observable. + + \return An observable that contains the blocking methods for this observable. + + \sample + \snippet from.cpp threaded from sample + \snippet output.txt threaded from sample +*/ +inline auto as_blocking() + -> detail::blocking_factory { + return detail::blocking_factory(); +} + + } } diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 3db90ab..6ad8da7 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -598,30 +598,16 @@ public: } #endif - /*! Return a new observable that performs type-forgetting conversion of this observable. - - \return The source observable converted to observable<T>. - - \note This operator could be useful to workaround lambda deduction bug on msvc 2013. - - \sample - \snippet as_dynamic.cpp as_dynamic sample - \snippet output.txt as_dynamic sample - */ + /*! @copydoc rxcpp::operators::as_dynamic + */ template<class... AN> observable<T> as_dynamic(AN**...) const { return *this; static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments."); } - /*! Return a new observable that contains the blocking methods for this observable. - - \return An observable that contains the blocking methods for this observable. - - \sample - \snippet from.cpp threaded from sample - \snippet output.txt threaded from sample - */ + /*! @copydoc rxcpp::operators::as_blocking + */ template<class... AN> blocking_observable<T, this_type> as_blocking(AN**...) const { return blocking_observable<T, this_type>(*this); @@ -1571,535 +1557,223 @@ class observable<void, void> { ~observable(); public: - /*! Returns an observable that executes the specified function when a subscriber subscribes to it. - - \tparam T the type of the items that this observable emits - \tparam OnSubscribe the type of OnSubscribe handler function - - \param os OnSubscribe event handler - - \return Observable that executes the specified function when a Subscriber subscribes to it. - - \sample - \snippet create.cpp Create sample - \snippet output.txt Create sample - - \warning - It is good practice to check the observer's is_subscribed state from within the function you pass to create - so that your observable can stop emitting items or doing expensive calculations when there is no longer an interested observer. - - \badcode - \snippet create.cpp Create bad code - \snippet output.txt Create bad code - - \goodcode - \snippet create.cpp Create good code - \snippet output.txt Create good code - - \warning - It is good practice to use operators like observable::take to control lifetime rather than use the subscription explicitly. - - \goodcode - \snippet create.cpp Create great code - \snippet output.txt Create great code - */ + /*! @copydoc rx-create.hpp + */ template<class T, class OnSubscribe> static auto create(OnSubscribe os) -> decltype(rxs::create<T>(std::move(os))) { return rxs::create<T>(std::move(os)); } - /*! Returns an observable that sends values in the range first-last by adding step to the previous value. - - \tparam T the type of the values that this observable emits - \param first first value to send - \param last last value to send - \param step value to add to the previous value to get the next value - - \return Observable that sends values in the range first-last by adding step to the previous value. - - \sample - \snippet range.cpp range sample - \snippet output.txt range sample - */ + /*! @copydoc rx-range.hpp + */ template<class T> static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1) -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) { return rxs::range<T>(first, last, step, identity_current_thread()); } - /*! Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler. - - \tparam T the type of the values that this observable emits - \tparam Coordination the type of the scheduler - - \param first first value to send - \param last last value to send - \param step value to add to the previous value to get the next value - \param cn the scheduler to run the generator loop on - - \return Observable that sends values in the range first-last by adding step to the previous value using the specified scheduler. - - \note `step` or both `step` & `last` may be omitted. - - \sample - \snippet range.cpp threaded range sample - \snippet output.txt threaded range sample - - An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator - \snippet range.cpp subscribe_on range sample - \snippet output.txt subscribe_on range sample - */ + /*! @copydoc rx-range.hpp + */ template<class T, class Coordination> static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range<T>(first, last, step, std::move(cn))) { return rxs::range<T>(first, last, step, std::move(cn)); } - /// Returns an observable that sends values in the range ```first```-```last``` by adding 1 to the previous value. The values are sent on the specified scheduler. - /// - /// \see rxcpp::observable<void,void>#range(T first, T last, std::ptrdiff_t step, Coordination cn) + /*! @copydoc rx-range.hpp + */ template<class T, class Coordination> static auto range(T first, T last, Coordination cn) -> decltype(rxs::range<T>(first, last, std::move(cn))) { return rxs::range<T>(first, last, std::move(cn)); } - /// Returns an observable that infinitely (until overflow) sends values starting from ```first```. The values are sent on the specified scheduler. - /// - /// \see rxcpp::observable<void,void>#range(T first, T last, std::ptrdiff_t step, Coordination cn) + /*! @copydoc rx-range.hpp + */ template<class T, class Coordination> static auto range(T first, Coordination cn) -> decltype(rxs::range<T>(first, std::move(cn))) { return rxs::range<T>(first, std::move(cn)); } - /*! Returns an observable that never sends any items or notifications to observer. - - \tparam T the type of (not) emitted items - - \return Observable that never sends any items or notifications to observer. - \sample - \snippet never.cpp never sample - \snippet output.txt never sample - */ + /*! @copydoc rx-never.hpp + */ template<class T> static auto never() -> decltype(rxs::never<T>()) { return rxs::never<T>(); } - /*! Returns an observable that calls the specified observable factory to create an observable for each new observer that subscribes. - \tparam ObservableFactory the type of the observable factory - - \param of the observable factory function to invoke for each observer that subscribes to the resulting observable - - \return observable whose observers' subscriptions trigger an invocation of the given observable factory function - - \sample - \snippet defer.cpp defer sample - \snippet output.txt defer sample - */ + /*! @copydoc rx-defer.hpp + */ template<class ObservableFactory> static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of))) { return rxs::defer(std::move(of)); } - /*! Returns an observable that emits a sequential integer every specified time interval. - \param period period between emitted values - - \return Observable that sends a sequential integer each time interval - - \sample - \snippet interval.cpp immediate interval sample - \snippet output.txt immediate interval sample - */ + /*! @copydoc rx-interval.hpp + */ template<class... AN> static auto interval(rxsc::scheduler::clock_type::duration period, AN**...) -> decltype(rxs::interval(period)) { return rxs::interval(period); static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments."); } - /*! Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param period period between emitted values - \param cn the scheduler to use for scheduling the items - - \return Observable that sends a sequential integer each time interval - - \sample - \snippet interval.cpp threaded immediate interval sample - \snippet output.txt threaded immediate interval sample - */ + /*! @copydoc rx-interval.hpp + */ template<class Coordination> static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn))) { return rxs::interval(period, std::move(cn)); } - /*! Returns an observable that emits a sequential integer every specified time interval starting from the specified time point. - - \param initial time when the first value is sent - \param period period between emitted values - - \return Observable that sends a sequential integer each time interval - - \sample - \snippet interval.cpp interval sample - \snippet output.txt interval sample - */ + /*! @copydoc rx-interval.hpp + */ template<class... AN> static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...) -> decltype(rxs::interval(initial, period)) { return rxs::interval(initial, period); static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments."); } - /*! Returns an observable that emits a sequential integer every specified time interval starting from the specified time point, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param initial time when the first value is sent - \param period period between emitted values - \param cn the scheduler to use for scheduling the items - - \return Observable that sends a sequential integer each time interval - - \sample - \snippet interval.cpp threaded interval sample - \snippet output.txt threaded interval sample - */ + /*! @copydoc rx-interval.hpp + */ template<class Coordination> static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn))) { return rxs::interval(initial, period, std::move(cn)); } - /*! Returns an observable that emits an integer at the specified time point. - - \param when time point when the value is emitted - - \return Observable that emits an integer at the specified time point - \sample - \snippet timer.cpp timepoint timer sample - \snippet output.txt timepoint timer sample - */ + /*! @copydoc rx-timer.hpp + */ template<class... AN> static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...) -> decltype(rxs::timer(at)) { return rxs::timer(at); static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments."); } - /*! Returns an observable that emits an integer in the specified time interval. - - \param when interval when the value is emitted - - \return Observable that emits an integer in the specified time interval - - \sample - \snippet timer.cpp duration timer sample - \snippet output.txt duration timer sample - */ + /*! @copydoc rx-timer.hpp + */ template<class... AN> static auto timer(rxsc::scheduler::clock_type::duration after, AN**...) -> decltype(rxs::timer(after)) { return rxs::timer(after); static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments."); } - /*! Returns an observable that emits an integer at the specified time point, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param when time point when the value is emitted - \param cn the scheduler to use for scheduling the items - - \return Observable that emits an integer at the specified time point - - \sample - \snippet timer.cpp threaded timepoint timer sample - \snippet output.txt threaded timepoint timer sample - */ + /*! @copydoc rx-timer.hpp + */ template<class Coordination> static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn))) { return rxs::timer(when, std::move(cn)); } - /*! Returns an observable that emits an integer in the specified time interval, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param when interval when the value is emitted - \param cn the scheduler to use for scheduling the items - - \return Observable that emits an integer in the specified time interval - - \sample - \snippet timer.cpp threaded duration timer sample - \snippet output.txt threaded duration timer sample - */ + /*! @copydoc rx-timer.hpp + */ template<class Coordination> static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn))) { return rxs::timer(when, std::move(cn)); } - /*! Returns an observable that sends each value in the collection. - - \tparam Collection the type of the collection of values that this observable emits - - \param c collection containing values to send - - \return Observable that sends each value in the collection. - \sample - \snippet iterate.cpp iterate sample - \snippet output.txt iterate sample - */ + /*! @copydoc rx-iterate.hpp + */ template<class Collection> static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread())) { return rxs::iterate(std::move(c), identity_current_thread()); } - /*! Returns an observable that sends each value in the collection, on the specified scheduler. - - \tparam Collection the type of the collection of values that this observable emits - \tparam Coordination the type of the scheduler - - \param c collection containing values to send - \param cn the scheduler to use for scheduling the items - - \return Observable that sends each value in the collection. - - \sample - \snippet iterate.cpp threaded iterate sample - \snippet output.txt threaded iterate sample - */ + /*! @copydoc rx-iterate.hpp + */ template<class Collection, class Coordination> static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn))) { return rxs::iterate(std::move(c), std::move(cn)); } - /*! Returns an observable that sends an empty set of values and then completes. - - \tparam T the type of elements (not) to be sent - \return Observable that sends an empty set of values and then completes. - - This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator. - - \note This is a degenerate case of ```observable<void,void>::from(Value0 v0, ValueN... vn)``` operator. - */ + /*! @copydoc rxcpp::sources::from() + */ template<class T> static auto from() -> decltype( rxs::from<T>()) { return rxs::from<T>(); } - /*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler. - - \tparam T the type of elements (not) to be sent - \tparam Coordination the type of the scheduler - - \return Observable that sends an empty set of values and then completes. - - \note This is a degenerate case of ```observable<void,void>::from(Coordination cn, Value0 v0, ValueN... vn)``` operator. - */ + /*! @copydoc rxcpp::sources::from(Coordination cn) + */ template<class T, class Coordination> static auto from(Coordination cn) -> typename std::enable_if<is_coordination<Coordination>::value, decltype( rxs::from<T>(std::move(cn)))>::type { return rxs::from<T>(std::move(cn)); } - /*! Returns an observable that sends each value from its arguments list. - - \tparam Value0 ... - \tparam ValueN the type of sending values - - \param v0 ... - \param vn values to send - - \return Observable that sends each value from its arguments list. - - \sample - \snippet from.cpp from sample - \snippet output.txt from sample - - \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead. - */ + /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn) + */ template<class Value0, class... ValueN> static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination<Value0>::value, decltype( rxs::from(v0, vn...))>::type { return rxs::from(v0, vn...); } - /*! Returns an observable that sends each value from its arguments list, on the specified scheduler. - - \tparam Coordination the type of the scheduler - \tparam Value0 ... - \tparam ValueN the type of sending values - - \param cn the scheduler to use for scheduling the items - \param v0 ... - \param vn values to send - - \return Observable that sends each value from its arguments list. - - \sample - \snippet from.cpp threaded from sample - \snippet output.txt threaded from sample - - \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead. - */ + /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn) + */ template<class Coordination, class Value0, class... ValueN> static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if<is_coordination<Coordination>::value, decltype( rxs::from(std::move(cn), v0, vn...))>::type { return rxs::from(std::move(cn), v0, vn...); } - /*! Returns an observable that sends no items to observer and immediately completes. - \tparam T the type of (not) emitted items + /*! @copydoc rxcpp::sources::just(Value0 v0) + */ + template<class T> + static auto just(T v) + -> decltype(rxs::just(std::move(v))) { + return rxs::just(std::move(v)); + } + /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn) + */ + template<class T, class Coordination> + static auto just(T v, Coordination cn) + -> decltype(rxs::just(std::move(v), std::move(cn))) { + return rxs::just(std::move(v), std::move(cn)); + } - \return Observable that sends no items to observer and immediately completes. + /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn) + */ + template<class Observable, class Value0, class... ValueN> + static auto start_with(Observable o, Value0 v0, ValueN... vn) + -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) { + return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...); + } - \sample - \snippet empty.cpp empty sample - \snippet output.txt empty sample - */ + /*! @copydoc rx-empty.hpp + */ template<class T> static auto empty() -> decltype(from<T>()) { return from<T>(); } - /*! Returns an observable that sends no items to observer and immediately completes, on the specified scheduler. - - \tparam T the type of (not) emitted items - \tparam Coordination the type of the scheduler - - \param cn the scheduler to use for scheduling the items - - \return Observable that sends no items to observer and immediately completes. - - \sample - \snippet empty.cpp threaded empty sample - \snippet output.txt threaded empty sample - */ + /*! @copydoc rx-empty.hpp + */ template<class T, class Coordination> static auto empty(Coordination cn) -> decltype(from<T>(std::move(cn))) { return from<T>(std::move(cn)); } - /*! Returns an observable that sends the specified item to observer and then completes. - - \tparam T the type of the emitted item - - \param v the value to send - - \return Observable that sends the specified item to observer and then completes. - - \sample - \snippet just.cpp just sample - \snippet output.txt just sample - */ - template<class T> - static auto just(T v) - -> decltype(from(std::move(v))) { - return from(std::move(v)); - } - /*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler. - - \tparam T the type of the emitted item - \tparam Coordination the type of the scheduler - - \param v the value to send - \param cn the scheduler to use for scheduling the items - - \return Observable that sends the specified item to observer and then completes. - - \sample - \snippet just.cpp threaded just sample - \snippet output.txt threaded just sample - */ - template<class T, class Coordination> - static auto just(T v, Coordination cn) - -> decltype(from(std::move(cn), std::move(v))) { - return from(std::move(cn), std::move(v)); - } - /*! Returns an observable that sends no items to observer and immediately generates an error. - - \tparam T the type of (not) emitted items - \tparam Exception the type of the error - - \param e the error to be passed to observers - - \return Observable that sends no items to observer and immediately generates an error. - \sample - \snippet error.cpp error sample - \snippet output.txt error sample - */ + /*! @copydoc rx-error.hpp + */ template<class T, class Exception> static auto error(Exception&& e) -> decltype(rxs::error<T>(std::forward<Exception>(e))) { return rxs::error<T>(std::forward<Exception>(e)); } - /*! Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler. - - \tparam T the type of (not) emitted items - \tparam Exception the type of the error - \tparam Coordination the type of the scheduler - - \param e the error to be passed to observers - \param cn the scheduler to use for scheduling the items - - \return Observable that sends no items to observer and immediately generates an error. - - \sample - \snippet error.cpp threaded error sample - \snippet output.txt threaded error sample - */ + /*! @copydoc rx-error.hpp + */ template<class T, class Exception, class Coordination> static auto error(Exception&& e, Coordination cn) -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) { return rxs::error<T>(std::forward<Exception>(e), std::move(cn)); } - /*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable. - - \tparam Observable the type of the observable that emits values for resending - \tparam Value0 ... - \tparam ValueN the type of sending values - - \param o the observable that emits values for resending - \param v0 ... - \param vn values to send - - \return Observable that sends the specified values before it begins to send items emitted by the given observable. - - \sample - \snippet start_with.cpp full start_with sample - \snippet output.txt full start_with sample - - Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable: - \snippet start_with.cpp short start_with sample - \snippet output.txt short start_with sample - */ - template<class Observable, class Value0, class... ValueN> - static auto start_with(Observable o, Value0 v0, ValueN... vn) - -> decltype(rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) { - return rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o); - } - /*! Returns an observable that makes an observable by the specified observable factory - using the resource provided by the specified resource factory for each new observer that subscribes. - \tparam ResourceFactory the type of the resource factory - \tparam ObservableFactory the type of the observable factory - - \param rf the resource factory function that resturn the rxcpp::resource that is used as a resource by the observable factory - \param of the observable factory function to invoke for each observer that subscribes to the resulting observable - - \return observable that makes an observable by the specified observable factory - using the resource provided by the specified resource factory for each new observer that subscribes. - - \sample - \snippet scope.cpp scope sample - \snippet output.txt scope sample - */ + /*! @copydoc rx-scope.hpp + */ template<class ResourceFactory, class ObservableFactory> static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of))) { diff --git a/Rx/v2/src/rxcpp/sources/rx-create.hpp b/Rx/v2/src/rxcpp/sources/rx-create.hpp index ed27fea..95b9533 100644 --- a/Rx/v2/src/rxcpp/sources/rx-create.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-create.hpp @@ -7,6 +7,41 @@ #include "../rx-includes.hpp" +/*! \file rx-create.hpp + + \brief Returns an observable that executes the specified function when a subscriber subscribes to it. + + \tparam T the type of the items that this observable emits + \tparam OnSubscribe the type of OnSubscribe handler function + + \param os OnSubscribe event handler + + \return Observable that executes the specified function when a Subscriber subscribes to it. + + \sample + \snippet create.cpp Create sample + \snippet output.txt Create sample + + \warning + It is good practice to check the observer's is_subscribed state from within the function you pass to create + so that your observable can stop emitting items or doing expensive calculations when there is no longer an interested observer. + + \badcode + \snippet create.cpp Create bad code + \snippet output.txt Create bad code + + \goodcode + \snippet create.cpp Create good code + \snippet output.txt Create good code + + \warning + It is good practice to use operators like observable::take to control lifetime rather than use the subscription explicitly. + + \goodcode + \snippet create.cpp Create great code + \snippet output.txt Create great code +*/ + namespace rxcpp { namespace sources { @@ -41,6 +76,8 @@ struct create : public source_base<T> } +/*! @copydoc rx-create.hpp + */ template<class T, class OnSubscribe> auto create(OnSubscribe os) -> observable<T, detail::create<T, OnSubscribe>> { diff --git a/Rx/v2/src/rxcpp/sources/rx-defer.hpp b/Rx/v2/src/rxcpp/sources/rx-defer.hpp index 3afff64..306b4d0 100644 --- a/Rx/v2/src/rxcpp/sources/rx-defer.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-defer.hpp @@ -7,6 +7,21 @@ #include "../rx-includes.hpp" +/*! \file rx-defer.hpp + + \brief Returns an observable that calls the specified observable factory to create an observable for each new observer that subscribes. + + \tparam ObservableFactory the type of the observable factory + + \param of the observable factory function to invoke for each observer that subscribes to the resulting observable + + \return observable whose observers' subscriptions trigger an invocation of the given observable factory function + + \sample + \snippet defer.cpp defer sample + \snippet output.txt defer sample +*/ + namespace rxcpp { namespace sources { @@ -52,6 +67,8 @@ struct defer : public source_base<rxu::value_type_t<defer_traits<ObservableFacto } +/*! @copydoc rx-defer.hpp + */ template<class ObservableFactory> auto defer(ObservableFactory of) -> observable<rxu::value_type_t<detail::defer_traits<ObservableFactory>>, detail::defer<ObservableFactory>> { diff --git a/Rx/v2/src/rxcpp/sources/rx-empty.hpp b/Rx/v2/src/rxcpp/sources/rx-empty.hpp index 1a6f1d8..2e11a16 100644 --- a/Rx/v2/src/rxcpp/sources/rx-empty.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-empty.hpp @@ -7,15 +7,39 @@ #include "../rx-includes.hpp" +/*! \file rx-empty.hpp + + \brief Returns an observable that sends no items to observer and immediately completes, on the specified scheduler. + + \tparam T the type of (not) emitted items + \tparam Coordination the type of the scheduler (optional) + + \param cn the scheduler to use for scheduling the items (optional) + + \return Observable that sends no items to observer and immediately completes. + + \sample + \snippet empty.cpp empty sample + \snippet output.txt empty sample + + \sample + \snippet empty.cpp threaded empty sample + \snippet output.txt threaded empty sample +*/ + namespace rxcpp { namespace sources { +/*! @copydoc rx-empty.hpp + */ template<class T> auto empty() -> decltype(from<T>()) { return from<T>(); } +/*! @copydoc rx-empty.hpp + */ template<class T, class Coordination> auto empty(Coordination cn) -> decltype(from<T>(std::move(cn))) { diff --git a/Rx/v2/src/rxcpp/sources/rx-error.hpp b/Rx/v2/src/rxcpp/sources/rx-error.hpp index 3f3e07b..461c081 100644 --- a/Rx/v2/src/rxcpp/sources/rx-error.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-error.hpp @@ -7,6 +7,28 @@ #include "../rx-includes.hpp" +/*! \file rx-error.hpp + + \brief Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler. + + \tparam T the type of (not) emitted items + \tparam Exception the type of the error + \tparam Coordination the type of the scheduler (optional) + + \param e the error to be passed to observers + \param cn the scheduler to use for scheduling the items (optional) + + \return Observable that sends no items to observer and immediately generates an error. + + \sample + \snippet error.cpp error sample + \snippet output.txt error sample + + \sample + \snippet error.cpp threaded error sample + \snippet output.txt threaded error sample +*/ + namespace rxcpp { namespace sources { @@ -86,11 +108,15 @@ auto make_error(throw_instance_tag&&, E e, Coordination cn) } +/*! @copydoc rx-error.hpp + */ template<class T, class E> auto error(E e) -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) { return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate()); } +/*! @copydoc rx-error.hpp + */ template<class T, class E, class Coordination> auto error(E e, Coordination cn) -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) { diff --git a/Rx/v2/src/rxcpp/sources/rx-interval.hpp b/Rx/v2/src/rxcpp/sources/rx-interval.hpp index b151155..7d3d47d 100644 --- a/Rx/v2/src/rxcpp/sources/rx-interval.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-interval.hpp @@ -7,6 +7,34 @@ #include "../rx-includes.hpp" +/*! \file rx-interval.hpp + + \brief Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler. + + \tparam Coordination the type of the scheduler (optional) + + \param period period between emitted values + \param cn the scheduler to use for scheduling the items (optional) + + \return Observable that sends a sequential integer each time interval + + \sample + \snippet interval.cpp interval sample + \snippet output.txt interval sample + + \sample + \snippet interval.cpp immediate interval sample + \snippet output.txt immediate interval sample + + \sample + \snippet interval.cpp threaded interval sample + \snippet output.txt threaded interval sample + + \sample + \snippet interval.cpp threaded immediate interval sample + \snippet output.txt threaded immediate interval sample +*/ + namespace rxcpp { namespace sources { @@ -78,6 +106,9 @@ struct defer_interval : public defer_observable< } + +/*! @copydoc rx-interval.hpp + */ template<class Duration> auto interval(Duration period) -> typename std::enable_if< @@ -86,6 +117,8 @@ auto interval(Duration period) return detail::defer_interval<Duration, identity_one_worker>::make(identity_current_thread().now(), period, identity_current_thread()); } +/*! @copydoc rx-interval.hpp + */ template<class Coordination> auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> typename std::enable_if< @@ -94,6 +127,8 @@ auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn)); } +/*! @copydoc rx-interval.hpp + */ template<class Duration> auto interval(rxsc::scheduler::clock_type::time_point when, Duration period) -> typename std::enable_if< @@ -102,6 +137,8 @@ auto interval(rxsc::scheduler::clock_type::time_point when, Duration period) return detail::defer_interval<Duration, identity_one_worker>::make(when, period, identity_current_thread()); } +/*! @copydoc rx-interval.hpp + */ template<class Coordination> auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn) -> typename std::enable_if< diff --git a/Rx/v2/src/rxcpp/sources/rx-iterate.hpp b/Rx/v2/src/rxcpp/sources/rx-iterate.hpp index be98c0c..8832775 100644 --- a/Rx/v2/src/rxcpp/sources/rx-iterate.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-iterate.hpp @@ -7,6 +7,28 @@ #include "../rx-includes.hpp" +/*! \file rx-iterate.hpp + + \brief Returns an observable that sends each value in the collection, on the specified scheduler. + + \tparam Collection the type of the collection of values that this observable emits + \tparam Coordination the type of the scheduler (optional) + + \param c collection containing values to send + \param cn the scheduler to use for scheduling the items (optional) + + \return Observable that sends each value in the collection. + + \sample + \snippet iterate.cpp iterate sample + \snippet output.txt iterate sample + + \sample + \snippet iterate.cpp threaded iterate sample + \snippet output.txt threaded iterate sample + +*/ + namespace rxcpp { namespace sources { @@ -132,12 +154,16 @@ struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection> } +/*! @copydoc rx-iterate.hpp + */ template<class Collection> auto iterate(Collection c) -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> { return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>( detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate())); } +/*! @copydoc rx-iterate.hpp + */ template<class Collection, class Coordination> auto iterate(Collection c, Coordination cn) -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> { @@ -145,17 +171,52 @@ auto iterate(Collection c, Coordination cn) detail::iterate<Collection, Coordination>(std::move(c), std::move(cn))); } +/*! Returns an observable that sends an empty set of values and then completes. + + \tparam T the type of elements (not) to be sent + + \return Observable that sends an empty set of values and then completes. + + This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator. + + \note This is a degenerate case of ```from(Value0 v0, ValueN... vn)``` operator. +*/ template<class T> auto from() -> decltype(iterate(std::array<T, 0>(), identity_immediate())) { return iterate(std::array<T, 0>(), identity_immediate()); } +/*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler. + + \tparam T the type of elements (not) to be sent + \tparam Coordination the type of the scheduler + + \return Observable that sends an empty set of values and then completes. + + \note This is a degenerate case of ```from(Coordination cn, Value0 v0, ValueN... vn)``` operator. +*/ template<class T, class Coordination> auto from(Coordination cn) -> typename std::enable_if<is_coordination<Coordination>::value, decltype( iterate(std::array<T, 0>(), std::move(cn)))>::type { return iterate(std::array<T, 0>(), std::move(cn)); } +/*! Returns an observable that sends each value from its arguments list. + + \tparam Value0 ... + \tparam ValueN the type of sending values + + \param v0 ... + \param vn values to send + + \return Observable that sends each value from its arguments list. + + \sample + \snippet from.cpp from sample + \snippet output.txt from sample + + \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead. +*/ template<class Value0, class... ValueN> auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination<Value0>::value, @@ -163,6 +224,24 @@ auto from(Value0 v0, ValueN... vn) std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}}; return iterate(std::move(c), identity_immediate()); } +/*! Returns an observable that sends each value from its arguments list, on the specified scheduler. + + \tparam Coordination the type of the scheduler + \tparam Value0 ... + \tparam ValueN the type of sending values + + \param cn the scheduler to use for scheduling the items + \param v0 ... + \param vn values to send + + \return Observable that sends each value from its arguments list. + + \sample + \snippet from.cpp threaded from sample + \snippet output.txt threaded from sample + + \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead. +*/ template<class Coordination, class Value0, class... ValueN> auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if<is_coordination<Coordination>::value, @@ -171,6 +250,74 @@ auto from(Coordination cn, Value0 v0, ValueN... vn) return iterate(std::move(c), std::move(cn)); } + +/*! Returns an observable that sends the specified item to observer and then completes. + + \tparam T the type of the emitted item + + \param v the value to send + + \return Observable that sends the specified item to observer and then completes. + + \sample + \snippet just.cpp just sample + \snippet output.txt just sample +*/ +template<class Value0> +auto just(Value0 v0) + -> typename std::enable_if<!is_coordination<Value0>::value, + decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type { + std::array<Value0, 1> c{{v0}}; + return iterate(std::move(c), identity_immediate()); +} +/*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler. + + \tparam T the type of the emitted item + \tparam Coordination the type of the scheduler + + \param v the value to send + \param cn the scheduler to use for scheduling the items + + \return Observable that sends the specified item to observer and then completes. + + \sample + \snippet just.cpp threaded just sample + \snippet output.txt threaded just sample +*/ +template<class Value0, class Coordination> +auto just(Value0 v0, Coordination cn) + -> typename std::enable_if<is_coordination<Coordination>::value, + decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type { + std::array<Value0, 1> c{{v0}}; + return iterate(std::move(c), std::move(cn)); +} + +/*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable. + + \tparam Observable the type of the observable that emits values for resending + \tparam Value0 ... + \tparam ValueN the type of sending values + + \param o the observable that emits values for resending + \param v0 ... + \param vn values to send + + \return Observable that sends the specified values before it begins to send items emitted by the given observable. + + \sample + \snippet start_with.cpp full start_with sample + \snippet output.txt full start_with sample + + Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable: + \snippet start_with.cpp short start_with sample + \snippet output.txt short start_with sample +*/ +template<class Observable, class Value0, class... ValueN> +auto start_with(Observable o, Value0 v0, ValueN... vn) + -> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) { + return from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o); +} + } } diff --git a/Rx/v2/src/rxcpp/sources/rx-never.hpp b/Rx/v2/src/rxcpp/sources/rx-never.hpp index bcb991e..cd6f4d0 100644 --- a/Rx/v2/src/rxcpp/sources/rx-never.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-never.hpp @@ -7,6 +7,19 @@ #include "../rx-includes.hpp" +/*! \file rx-naver.hpp + + \brief Returns an observable that never sends any items or notifications to observer. + + \tparam T the type of (not) emitted items + + \return Observable that never sends any items or notifications to observer. + + \sample + \snippet never.cpp never sample + \snippet output.txt never sample +*/ + namespace rxcpp { namespace sources { @@ -23,6 +36,8 @@ struct never : public source_base<T> } +/*! @copydoc rx-never.hpp + */ template<class T> auto never() -> observable<T, detail::never<T>> { diff --git a/Rx/v2/src/rxcpp/sources/rx-range.hpp b/Rx/v2/src/rxcpp/sources/rx-range.hpp index 76c0101..d55ffff 100644 --- a/Rx/v2/src/rxcpp/sources/rx-range.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-range.hpp @@ -7,6 +7,29 @@ #include "../rx-includes.hpp" +/*! \file rx-range.hpp + + \brief Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler. + + \tparam T the type of the values that this observable emits + \tparam Coordination the type of the scheduler (optional) + + \param first first value to send (optional) + \param last last value to send (optional) + \param step value to add to the previous value to get the next value (optional) + \param cn the scheduler to run the generator loop on (optional) + + \return Observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value using the specified scheduler. + + \sample + \snippet range.cpp threaded range sample + \snippet output.txt threaded range sample + + An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator + \snippet range.cpp subscribe_on range sample + \snippet output.txt subscribe_on range sample +*/ + namespace rxcpp { namespace sources { @@ -90,18 +113,24 @@ struct range : public source_base<T> } +/*! @copydoc rx-create.hpp + */ template<class T> auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1) -> observable<T, detail::range<T, identity_one_worker>> { return observable<T, detail::range<T, identity_one_worker>>( detail::range<T, identity_one_worker>(first, last, step, identity_current_thread())); } +/*! @copydoc rx-create.hpp + */ template<class T, class Coordination> auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> observable<T, detail::range<T, Coordination>> { return observable<T, detail::range<T, Coordination>>( detail::range<T, Coordination>(first, last, step, std::move(cn))); } +/*! @copydoc rx-create.hpp + */ template<class T, class Coordination> auto range(T first, T last, Coordination cn) -> typename std::enable_if<is_coordination<Coordination>::value, @@ -109,6 +138,8 @@ auto range(T first, T last, Coordination cn) return observable<T, detail::range<T, Coordination>>( detail::range<T, Coordination>(first, last, 1, std::move(cn))); } +/*! @copydoc rx-create.hpp + */ template<class T, class Coordination> auto range(T first, Coordination cn) -> typename std::enable_if<is_coordination<Coordination>::value, diff --git a/Rx/v2/src/rxcpp/sources/rx-scope.hpp b/Rx/v2/src/rxcpp/sources/rx-scope.hpp index 8105ef4..d7151f2 100644 --- a/Rx/v2/src/rxcpp/sources/rx-scope.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-scope.hpp @@ -7,6 +7,23 @@ #include "../rx-includes.hpp" +/*! \file rx-scope.hpp + + \brief Returns an observable that makes an observable by the specified observable factory using the resource provided by the specified resource factory for each new observer that subscribes. + + \tparam ResourceFactory the type of the resource factory + \tparam ObservableFactory the type of the observable factory + + \param rf the resource factory function that resturn the rxcpp::resource that is used as a resource by the observable factory + \param of the observable factory function to invoke for each observer that subscribes to the resulting observable + + \return observable that makes an observable by the specified observable factory using the resource provided by the specified resource factory for each new observer that subscribes. + + \sample + \snippet scope.cpp scope sample + \snippet output.txt scope sample +*/ + namespace rxcpp { namespace sources { @@ -77,7 +94,7 @@ struct scope : public source_base<rxu::value_type_t<scope_traits<ResourceFactory return; } state->out.add(state->resource->get_subscription()); - + auto selectedCollection = on_exception( [state](){return state->observable_factory(state->resource.get()); }, state->out); @@ -91,6 +108,8 @@ struct scope : public source_base<rxu::value_type_t<scope_traits<ResourceFactory } +/*! @copydoc rx-scope.hpp + */ template<class ResourceFactory, class ObservableFactory> auto scope(ResourceFactory rf, ObservableFactory of) -> observable<rxu::value_type_t<detail::scope_traits<ResourceFactory, ObservableFactory>>, detail::scope<ResourceFactory, ObservableFactory>> { diff --git a/Rx/v2/src/rxcpp/sources/rx-timer.hpp b/Rx/v2/src/rxcpp/sources/rx-timer.hpp index 3ff7287..2b8e270 100644 --- a/Rx/v2/src/rxcpp/sources/rx-timer.hpp +++ b/Rx/v2/src/rxcpp/sources/rx-timer.hpp @@ -7,6 +7,34 @@ #include "../rx-includes.hpp" +/*! \file rx-timer.hpp + + \brief Returns an observable that emits an integer at the specified time point. + + \tparam Coordination the type of the scheduler (optional) + + \param when time point when the value is emitted + \param cn the scheduler to use for scheduling the items (optional) + + \return Observable that emits an integer at the specified time point + + \sample + \snippet timer.cpp timepoint timer sample + \snippet output.txt timepoint timer sample + + \sample + \snippet timer.cpp duration timer sample + \snippet output.txt duration timer sample + + \sample + \snippet timer.cpp threaded timepoint timer sample + \snippet output.txt threaded timepoint timer sample + + \sample + \snippet timer.cpp threaded duration timer sample + \snippet output.txt threaded duration timer sample +*/ + namespace rxcpp { namespace sources { @@ -70,7 +98,7 @@ struct timer : public source_base<long> template<class TimePointOrDuration, class Coordination> struct defer_timer : public defer_observable< rxu::all_true< - std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value || + std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value || std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value, is_coordination<Coordination>::value>, void, @@ -80,6 +108,8 @@ struct defer_timer : public defer_observable< } +/*! @copydoc rx-timer.hpp + */ template<class TimePointOrDuration> auto timer(TimePointOrDuration when) -> typename std::enable_if< @@ -88,6 +118,8 @@ auto timer(TimePointOrDuration when) return detail::defer_timer<TimePointOrDuration, identity_one_worker>::make(when, identity_current_thread()); } +/*! @copydoc rx-timer.hpp + */ template<class TimePointOrDuration, class Coordination> auto timer(TimePointOrDuration when, Coordination cn) -> typename std::enable_if< diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp index 0a3ee6b..e6551d6 100644 --- a/Rx/v2/test/operators/concat_map.cpp +++ b/Rx/v2/test/operators/concat_map.cpp @@ -8,7 +8,7 @@ static const int static_tripletCount = 100; -SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian][perf]"){ +SCENARIO("concat_transform pythagorian ranges", "[hide][range][concat_transform][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -25,14 +25,14 @@ SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian auto start = clock::now(); auto triples = rxs::range(1, so) - .concat_map( + .concat_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .concat_map( + .concat_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}) @@ -53,7 +53,7 @@ SCENARIO("concat_map pythagorian ranges", "[hide][range][concat_map][pythagorian } } -SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map][synchronize][pythagorian][perf]"){ +SCENARIO("synchronize concat_transform pythagorian ranges", "[hide][range][concat_transform][synchronize][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -67,10 +67,10 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map] auto start = clock::now(); auto triples = rxs::range(1, so) - .concat_map( + .concat_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .concat_map( + .concat_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -79,7 +79,7 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map] return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -101,7 +101,7 @@ SCENARIO("synchronize concat_map pythagorian ranges", "[hide][range][concat_map] } } -SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][observe_on][pythagorian][perf]"){ +SCENARIO("observe_on concat_transform pythagorian ranges", "[hide][range][concat_transform][observe_on][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -115,10 +115,10 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][ auto start = clock::now(); auto triples = rxs::range(1, so) - .concat_map( + .concat_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .concat_map( + .concat_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -127,7 +127,7 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][ return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -150,7 +150,7 @@ SCENARIO("observe_on concat_map pythagorian ranges", "[hide][range][concat_map][ } } -SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][serialize][pythagorian][perf]"){ +SCENARIO("serialize concat_transform pythagorian ranges", "[hide][range][concat_transform][serialize][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -164,10 +164,10 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s auto start = clock::now(); auto triples = rxs::range(1, so) - .concat_map( + .concat_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .concat_map( + .concat_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -176,7 +176,7 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -199,7 +199,7 @@ SCENARIO("serialize concat_map pythagorian ranges", "[hide][range][concat_map][s } } -SCENARIO("concat_map completes", "[concat_map][map][operators]"){ +SCENARIO("concat_map completes", "[concat_map][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -268,13 +268,85 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){ REQUIRE(required == actual); } } + } +} + +SCENARIO("concat_transform completes", "[concat_transform][transform][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> i_on; + const rxsc::test::messages<std::string> s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + | rxo::concat_transform( + [&](int){ + return ys;}, + [](int, std::string s){ + return s;}) + // forget type to workaround lambda deduction bug on msvc 2013 + | rxo::as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(500, "qux"), + s_on.next(600, "foo"), + s_on.next(650, "bar"), + s_on.next(700, "baz"), + s_on.next(750, "qux"), + s_on.completed(800) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were 2 subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(550, 800) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } WHEN("each int is mapped to the strings with coordinator"){ auto res = w.start( [&]() { return xs - .concat_map( + .concat_transform( [&](int){ return ys;}, [](int, std::string s){ @@ -321,7 +393,7 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){ } } -SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][operators]"){ +SCENARIO("concat_transform, no result selector, no coordination", "[concat_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -347,7 +419,7 @@ SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][o auto res = w.start( [&]() { return xs - .concat_map( + .concat_transform( [&](int){ return ys;}) // forget type to workaround lambda deduction bug on msvc 2013 @@ -391,7 +463,7 @@ SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][o } } -SCENARIO("concat_map, no result selector, with coordination", "[concat_map][map][operators]"){ +SCENARIO("concat_transform, no result selector, with coordination", "[concat_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -417,7 +489,7 @@ SCENARIO("concat_map, no result selector, with coordination", "[concat_map][map] auto res = w.start( [&]() { return xs - .concat_map( + .concat_transform( [&](int){ return ys;}, rx::identity_current_thread()) diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index e2e3cf8..2e01a30 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -45,7 +45,7 @@ SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){ } } -SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][perf]"){ +SCENARIO("merge_transform pythagorian ranges", "[hide][range][merge_transform][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -60,14 +60,14 @@ SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][pe auto start = clock::now(); auto triples = rxs::range(1, so) - .flat_map( + .merge_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) .flat_map( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}) @@ -89,7 +89,7 @@ SCENARIO("flat_map pythagorian ranges", "[hide][range][flat_map][pythagorian][pe } } -SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][synchronize][pythagorian][perf]"){ +SCENARIO("synchronize merge_transform pythagorian ranges", "[hide][range][merge_transform][synchronize][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -103,10 +103,10 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn auto start = clock::now(); auto triples = rxs::range(1, so) - .flat_map( + .merge_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .flat_map( + .merge_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -115,7 +115,7 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -137,7 +137,7 @@ SCENARIO("synchronize flat_map pythagorian ranges", "[hide][range][flat_map][syn } } -SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][observe_on][pythagorian][perf]"){ +SCENARIO("observe_on merge_transform pythagorian ranges", "[hide][range][merge_transform][observe_on][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -151,10 +151,10 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse auto start = clock::now(); auto triples = rxs::range(1, so) - .flat_map( + .merge_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .flat_map( + .merge_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -163,7 +163,7 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -185,7 +185,7 @@ SCENARIO("observe_on flat_map pythagorian ranges", "[hide][range][flat_map][obse } } -SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][serialize][pythagorian][perf]"){ +SCENARIO("serialize merge_transform pythagorian ranges", "[hide][range][merge_transform][serialize][pythagorian][perf]"){ const int& tripletCount = static_tripletCount; GIVEN("some ranges"){ WHEN("generating pythagorian triplets"){ @@ -199,10 +199,10 @@ SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][seria auto start = clock::now(); auto triples = rxs::range(1, so) - .flat_map( + .merge_transform( [&c, so](int z){ return rxs::range(1, z, 1, so) - .flat_map( + .merge_transform( [&c, so, z](int x){ return rxs::range(x, z, 1, so) .filter([&c, z, x](int y){ @@ -211,7 +211,7 @@ SCENARIO("serialize flat_map pythagorian ranges", "[hide][range][flat_map][seria return true;} else { return false;}}) - .map([z, x](int y){return std::make_tuple(x, y, z);}) + .transform([z, x](int y){return std::make_tuple(x, y, z);}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic();}, [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, @@ -314,13 +314,97 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ REQUIRE(required == actual); } } + } +} + +SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> i_on; + const rxsc::test::messages<std::string> s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.next(300, 3), + i_on.next(400, 1), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + | rxo::merge_transform( + [&](int){ + return ys;}, + [](int, std::string s){ + return s;}) + // forget type to workaround lambda deduction bug on msvc 2013 + | rxo::as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(450, "foo"), + s_on.next(500, "qux"), + s_on.next(500, "bar"), + s_on.next(550, "baz"), + s_on.next(550, "foo"), + s_on.next(600, "qux"), + s_on.next(600, "bar"), + s_on.next(650, "baz"), + s_on.next(650, "foo"), + s_on.next(700, "qux"), + s_on.next(700, "bar"), + s_on.next(750, "baz"), + s_on.next(800, "qux"), + s_on.completed(850) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were four subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(400, 650), + s_on.subscribe(500, 750), + s_on.subscribe(600, 850) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } WHEN("each int is mapped to the strings with coordinator"){ auto res = w.start( [&]() { return xs - .flat_map( + .merge_transform( [&](int){ return ys;}, [](int, std::string s){ @@ -377,7 +461,7 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ } } -SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ +SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -406,7 +490,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ auto res = w.start( [&]() { return xs - .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) + .merge_transform([&](int){return ys;}, [](int, std::string s){return s;}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); } @@ -464,7 +548,7 @@ SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ } } -SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ +SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -494,7 +578,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ auto res = w.start( [&]() { return xs - .flat_map([&](int){return ys;}, [](int, std::string s){return s;}) + .merge_transform([&](int){return ys;}, [](int, std::string s){return s;}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); } @@ -538,7 +622,7 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ } } -SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][operators]"){ +SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -566,7 +650,7 @@ SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][opera auto res = w.start( [&]() { return xs - .flat_map( + .merge_transform( [&](int){ return ys;}) // forget type to workaround lambda deduction bug on msvc 2013 @@ -620,7 +704,7 @@ SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][opera } } -SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][operators]"){ +SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -648,7 +732,7 @@ SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][ope auto res = w.start( [&]() { return xs - .flat_map( + .merge_transform( [&](int){ return ys;}, rx::identity_current_thread()) @@ -701,4 +785,4 @@ SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][ope } } } -}
\ No newline at end of file +} diff --git a/Rx/v2/test/operators/on_error_resume_next.cpp b/Rx/v2/test/operators/on_error_resume_next.cpp index 490bd92..3761094 100644 --- a/Rx/v2/test/operators/on_error_resume_next.cpp +++ b/Rx/v2/test/operators/on_error_resume_next.cpp @@ -1,6 +1,77 @@ #include "../test.h" #include <rxcpp/operators/rx-on_error_resume_next.hpp> +SCENARIO("switch_on_error stops on completion", "[switch_on_error][on_error_resume_next][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + long invoked = 0; + + auto xs = sc.make_hot_observable({ + on.next(180, 1), + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.next(350, 5), + on.completed(400), + on.next(410, -1), + on.completed(420), + on.error(430, std::runtime_error("error on unsubscribed stream")) + }); + + auto ys = sc.make_cold_observable({ + on.next(10, -1), + on.completed(20), + }); + + WHEN("passed through unchanged"){ + + auto res = w.start( + [xs, ys, &invoked]() { + return xs + .switch_on_error([ys, &invoked](std::exception_ptr) { + invoked++; + return ys; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output stops on completion"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.next(290, 4), + on.next(350, 5), + on.completed(400) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one xs subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 400) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there was no ys subscription"){ + auto required = std::vector<rxcpp::notifications::subscription>(); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + + THEN("switch_on_error selector was not called"){ + REQUIRE(0 == invoked); + } + } + } +} + SCENARIO("on_error_resume_next stops on completion", "[on_error_resume_next][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); diff --git a/Rx/v2/test/operators/reduce.cpp b/Rx/v2/test/operators/reduce.cpp index 819f86c..33a0644 100644 --- a/Rx/v2/test/operators/reduce.cpp +++ b/Rx/v2/test/operators/reduce.cpp @@ -56,6 +56,61 @@ SCENARIO("reduce some data with seed", "[reduce][operators]"){ } } +SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + int seed = 42; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 0), + on.next(220, 1), + on.next(230, 2), + on.next(240, 3), + on.next(250, 4), + on.completed(260) + }); + + WHEN("mapped to ints that are one larger"){ + + auto res = w.start( + [&]() { + return xs + .accumulate(seed, + [](int sum, int x) { + return sum + x; + }, + [](int sum) { + return sum * 5; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output stops on completion"){ + auto required = rxu::to_vector({ + on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5), + on.completed(260) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 260) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + SCENARIO("average some data", "[reduce][average][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); |