diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-09 12:58:48 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-09 07:32:23 -0800 |
commit | 48aa4773a6ee85aee3849f49c15fde65047822d1 (patch) | |
tree | faa0d7ae5b65817d02bcc876687c5408306ec97a /Rx/v2/src | |
parent | a5a3f310d3358aa3628a694c44e45d746a193470 (diff) | |
download | RxCpp-48aa4773a6ee85aee3849f49c15fde65047822d1.tar.gz |
decouple skip_until from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-skip_until.hpp | 128 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 51 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 |
4 files changed, 117 insertions, 71 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp index c717fad..02a2424 100644 --- a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp @@ -2,6 +2,28 @@ #pragma once +/*! \file rx-skip_until.hpp + + \brief Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time. + skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination) + + \tparam TriggerSource the type of the trigger observable. + \tparam Coordination the type of the scheduler (optional). + + \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable. + \param cn the scheduler to use for scheduling the items (optional). + + \return An observable that skips items from the source observable until the second observable emits an item or the time runs out, then emits the remaining items. + + \sample + \snippet skip_until.cpp skip_until sample + \snippet output.txt skip_until sample + + \sample + \snippet skip_until.cpp threaded skip_until sample + \snippet output.txt threaded skip_until sample +*/ + #if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP) #define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP @@ -13,6 +35,16 @@ namespace operators { namespace detail { +template<class... AN> +struct skip_until_invalid_arguments {}; + +template<class... AN> +struct skip_until_invalid : public rxo::operator_base<skip_until_invalid_arguments<AN...>> { + using type = observable<skip_until_invalid_arguments<AN...>, skip_until_invalid<AN...>>; +}; +template<class... AN> +using skip_until_invalid_t = typename skip_until_invalid<AN...>::type; + template<class T, class Observable, class TriggerObservable, class Coordination> struct skip_until : public operator_base<T> { @@ -161,37 +193,83 @@ struct skip_until : public operator_base<T> } }; -template<class TriggerObservable, class Coordination> -class skip_until_factory -{ - typedef rxu::decay_t<TriggerObservable> trigger_source_type; - typedef rxu::decay_t<Coordination> coordination_type; +} - trigger_source_type trigger_source; - coordination_type coordination; -public: - skip_until_factory(trigger_source_type t, coordination_type sf) - : trigger_source(std::move(t)) - , coordination(std::move(sf)) - { +/*! @copydoc rx-skip_until.hpp +*/ +template<class... AN> +auto skip_until(AN&&... an) + -> operator_factory<skip_until_tag, AN...> { + return operator_factory<skip_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + +} + +template<> +struct member_overload<skip_until_tag> +{ + template<class Observable, class TimePoint, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>, + class SourceValue = rxu::value_type_t<Observable>, + class Timer = typename rxu::defer_type<rxs::detail::timer, identity_one_worker>::type, + class TimerValue = rxu::value_type_t<Timer>, + class TriggerObservable = observable<TimerValue, Timer>, + class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>, + class Value = rxu::value_type_t<SkipUntil>, + class Result = observable<Value, SkipUntil>> + static Result member(Observable&& o, TimePoint&& when) { + auto cn = identity_current_thread(); + return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn)); } - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>> { - return observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>>( - skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>(std::forward<Observable>(source), trigger_source, coordination)); + + template<class Observable, class TimePoint, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>, + std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>, + class SourceValue = rxu::value_type_t<Observable>, + class Timer = typename rxu::defer_type<rxs::detail::timer, rxu::decay_t<Coordination>>::type, + class TimerValue = rxu::value_type_t<Timer>, + class TriggerObservable = observable<TimerValue, Timer>, + class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SkipUntil>, + class Result = observable<Value, SkipUntil>> + static Result member(Observable&& o, TimePoint&& when, Coordination cn) { + return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn)); } -}; -} + template<class Observable, class TriggerObservable, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, TriggerObservable>>, + class SourceValue = rxu::value_type_t<Observable>, + class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>, + class Value = rxu::value_type_t<SkipUntil>, + class Result = observable<Value, SkipUntil>> + static Result member(Observable&& o, TriggerObservable&& t) { + return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread())); + } -template<class TriggerObservable, class Coordination> -auto skip_until(TriggerObservable&& t, Coordination&& sf) - -> detail::skip_until_factory<TriggerObservable, Coordination> { - return detail::skip_until_factory<TriggerObservable, Coordination>(std::forward<TriggerObservable>(t), std::forward<Coordination>(sf)); -} + template<class Observable, class TriggerObservable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, TriggerObservable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SkipUntil>, + class Result = observable<Value, SkipUntil>> + static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) { + return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn))); + } -} + template<class... AN> + static operators::detail::skip_until_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 7effb4b..35fa20e 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -209,6 +209,7 @@ #include "operators/rx-sequence_equal.hpp" #include "operators/rx-skip.hpp" #include "operators/rx-skip_last.hpp" +#include "operators/rx-skip_until.hpp" #include "operators/rx-take.hpp" #include "operators/rx-take_last.hpp" #include "operators/rx-take_until.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 57a704e..01ceb88 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -2004,7 +2004,7 @@ public: return observable_member(skip_tag{}, *this, std::forward<AN>(an)...); } - /*! @copydoc rx-skip_last.hpp + /*! @copydoc rx-skip_last.hpp */ template<class... AN> auto skip_last(AN... an) const @@ -2015,54 +2015,15 @@ public: return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...); } - /*! Make new observable with items skipped until on_next occurs on the trigger observable - - \tparam TriggerSource the type of the trigger observable - - \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable - - \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet skip_until.cpp skip_until sample - \snippet output.txt skip_until sample - */ - template<class TriggerSource> - auto skip_until(TriggerSource&& t) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if<is_observable<TriggerSource>::value, - observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type - /// \endcond - { - return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>( - rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::forward<TriggerSource>(t), identity_one_worker(rxsc::make_current_thread()))); - } - - /*! Make new observable with items skipped until on_next occurs on the trigger observable - - \tparam TriggerSource the type of the trigger observable - \tparam Coordination the type of the scheduler - - \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable - \param cn the scheduler to use for scheduling the items - - \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items. - - \sample - \snippet skip_until.cpp threaded skip_until sample - \snippet output.txt threaded skip_until sample + /*! @copydoc rx-skip_until.hpp */ - template<class TriggerSource, class Coordination> - auto skip_until(TriggerSource&& t, Coordination&& cn) const + template<class... AN> + auto skip_until(AN... an) const /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value, - observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type + -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>( - rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(cn))); + return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-take.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 2469a23..fdf49fd 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -106,7 +106,6 @@ public: #include "operators/rx-ref_count.hpp" #include "operators/rx-replay.hpp" #include "operators/rx-scan.hpp" -#include "operators/rx-skip_until.hpp" #include "operators/rx-start_with.hpp" #include "operators/rx-subscribe.hpp" #include "operators/rx-subscribe_on.hpp" @@ -324,6 +323,13 @@ struct skip_last_tag { }; }; +struct skip_until_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-skip_until.hpp>"); + }; +}; + struct take_tag { template<class Included> struct include_header{ |