summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-09 12:58:48 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-09 07:32:23 -0800
commit48aa4773a6ee85aee3849f49c15fde65047822d1 (patch)
treefaa0d7ae5b65817d02bcc876687c5408306ec97a /Rx/v2/src
parenta5a3f310d3358aa3628a694c44e45d746a193470 (diff)
downloadRxCpp-48aa4773a6ee85aee3849f49c15fde65047822d1.tar.gz
decouple skip_until from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_until.hpp128
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp51
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 117 insertions, 71 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
index c717fad..02a2424 100644
--- a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
@@ -2,6 +2,28 @@
#pragma once
+/*! \file rx-skip_until.hpp
+
+ \brief Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time.
+ skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)
+
+ \tparam TriggerSource the type of the trigger observable.
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable.
+ \param cn the scheduler to use for scheduling the items (optional).
+
+ \return An observable that skips items from the source observable until the second observable emits an item or the time runs out, then emits the remaining items.
+
+ \sample
+ \snippet skip_until.cpp skip_until sample
+ \snippet output.txt skip_until sample
+
+ \sample
+ \snippet skip_until.cpp threaded skip_until sample
+ \snippet output.txt threaded skip_until sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP)
#define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP
@@ -13,6 +35,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct skip_until_invalid_arguments {};
+
+template<class... AN>
+struct skip_until_invalid : public rxo::operator_base<skip_until_invalid_arguments<AN...>> {
+ using type = observable<skip_until_invalid_arguments<AN...>, skip_until_invalid<AN...>>;
+};
+template<class... AN>
+using skip_until_invalid_t = typename skip_until_invalid<AN...>::type;
+
template<class T, class Observable, class TriggerObservable, class Coordination>
struct skip_until : public operator_base<T>
{
@@ -161,37 +193,83 @@ struct skip_until : public operator_base<T>
}
};
-template<class TriggerObservable, class Coordination>
-class skip_until_factory
-{
- typedef rxu::decay_t<TriggerObservable> trigger_source_type;
- typedef rxu::decay_t<Coordination> coordination_type;
+}
- trigger_source_type trigger_source;
- coordination_type coordination;
-public:
- skip_until_factory(trigger_source_type t, coordination_type sf)
- : trigger_source(std::move(t))
- , coordination(std::move(sf))
- {
+/*! @copydoc rx-skip_until.hpp
+*/
+template<class... AN>
+auto skip_until(AN&&... an)
+ -> operator_factory<skip_until_tag, AN...> {
+ return operator_factory<skip_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
+}
+
+template<>
+struct member_overload<skip_until_tag>
+{
+ template<class Observable, class TimePoint,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Timer = typename rxu::defer_type<rxs::detail::timer, identity_one_worker>::type,
+ class TimerValue = rxu::value_type_t<Timer>,
+ class TriggerObservable = observable<TimerValue, Timer>,
+ class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>,
+ class Value = rxu::value_type_t<SkipUntil>,
+ class Result = observable<Value, SkipUntil>>
+ static Result member(Observable&& o, TimePoint&& when) {
+ auto cn = identity_current_thread();
+ return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
}
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>>(
- skip_until<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, trigger_source_type, Coordination>(std::forward<Observable>(source), trigger_source, coordination));
+
+ template<class Observable, class TimePoint, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>,
+ std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Timer = typename rxu::defer_type<rxs::detail::timer, rxu::decay_t<Coordination>>::type,
+ class TimerValue = rxu::value_type_t<Timer>,
+ class TriggerObservable = observable<TimerValue, Timer>,
+ class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SkipUntil>,
+ class Result = observable<Value, SkipUntil>>
+ static Result member(Observable&& o, TimePoint&& when, Coordination cn) {
+ return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
}
-};
-}
+ template<class Observable, class TriggerObservable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, TriggerObservable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>,
+ class Value = rxu::value_type_t<SkipUntil>,
+ class Result = observable<Value, SkipUntil>>
+ static Result member(Observable&& o, TriggerObservable&& t) {
+ return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread()));
+ }
-template<class TriggerObservable, class Coordination>
-auto skip_until(TriggerObservable&& t, Coordination&& sf)
- -> detail::skip_until_factory<TriggerObservable, Coordination> {
- return detail::skip_until_factory<TriggerObservable, Coordination>(std::forward<TriggerObservable>(t), std::forward<Coordination>(sf));
-}
+ template<class Observable, class TriggerObservable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, TriggerObservable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SkipUntil>,
+ class Result = observable<Value, SkipUntil>>
+ static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
+ return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
+ }
-}
+ template<class... AN>
+ static operators::detail::skip_until_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 7effb4b..35fa20e 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -209,6 +209,7 @@
#include "operators/rx-sequence_equal.hpp"
#include "operators/rx-skip.hpp"
#include "operators/rx-skip_last.hpp"
+#include "operators/rx-skip_until.hpp"
#include "operators/rx-take.hpp"
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 57a704e..01ceb88 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -2004,7 +2004,7 @@ public:
return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
}
- /*! @copydoc rx-skip_last.hpp
+ /*! @copydoc rx-skip_last.hpp
*/
template<class... AN>
auto skip_last(AN... an) const
@@ -2015,54 +2015,15 @@ public:
return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Make new observable with items skipped until on_next occurs on the trigger observable
-
- \tparam TriggerSource the type of the trigger observable
-
- \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable
-
- \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet skip_until.cpp skip_until sample
- \snippet output.txt skip_until sample
- */
- template<class TriggerSource>
- auto skip_until(TriggerSource&& t) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<is_observable<TriggerSource>::value,
- observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type
- /// \endcond
- {
- return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>(
- rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::forward<TriggerSource>(t), identity_one_worker(rxsc::make_current_thread())));
- }
-
- /*! Make new observable with items skipped until on_next occurs on the trigger observable
-
- \tparam TriggerSource the type of the trigger observable
- \tparam Coordination the type of the scheduler
-
- \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable
- \param cn the scheduler to use for scheduling the items
-
- \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items.
-
- \sample
- \snippet skip_until.cpp threaded skip_until sample
- \snippet output.txt threaded skip_until sample
+ /*! @copydoc rx-skip_until.hpp
*/
- template<class TriggerSource, class Coordination>
- auto skip_until(TriggerSource&& t, Coordination&& cn) const
+ template<class... AN>
+ auto skip_until(AN... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value,
- observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type
+ -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>(
- rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(cn)));
+ return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-take.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 2469a23..fdf49fd 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -106,7 +106,6 @@ public:
#include "operators/rx-ref_count.hpp"
#include "operators/rx-replay.hpp"
#include "operators/rx-scan.hpp"
-#include "operators/rx-skip_until.hpp"
#include "operators/rx-start_with.hpp"
#include "operators/rx-subscribe.hpp"
#include "operators/rx-subscribe_on.hpp"
@@ -324,6 +323,13 @@ struct skip_last_tag {
};
};
+struct skip_until_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-skip_until.hpp>");
+ };
+};
+
struct take_tag {
template<class Included>
struct include_header{