summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-29 00:30:56 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-28 14:12:12 -0800
commit2d10ccfb03439536bbe99cd750b30a4bdf7c31ce (patch)
tree8b9da758ece1ae46edc82741a6d6115bac3db116 /Rx/v2/src
parentac66697bfe72539e6c3ef2f633ae5dfeb589234d (diff)
downloadRxCpp-2d10ccfb03439536bbe99cd750b30a4bdf7c31ce.tar.gz
decouple time_interval from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-time_interval.hpp95
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp40
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 79 insertions, 65 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp b/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
index 11da8bb..181c066 100644
--- a/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
@@ -2,6 +2,22 @@
#pragma once
+/*! \file rx-time_interval.hpp
+
+ \brief Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable.
+ The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.
+
+ \tparam Coordination the type of the scheduler.
+
+ \param coordination the scheduler for time intervals.
+
+ \return Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
+
+ \sample
+ \snippet time_interval.cpp time_interval sample
+ \snippet output.txt time_interval sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP)
#define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP
@@ -13,11 +29,19 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct time_interval_invalid_arguments {};
+
+template<class... AN>
+struct time_interval_invalid : public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
+ using type = observable<time_interval_invalid_arguments<AN...>, time_interval_invalid<AN...>>;
+};
+template<class... AN>
+using time_interval_invalid_t = typename time_interval_invalid<AN...>::type;
+
template<class T, class Coordination>
struct time_interval
{
- static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");
-
typedef rxu::decay_t<T> source_value_type;
typedef rxu::decay_t<Coordination> coordination_type;
@@ -67,7 +91,7 @@ struct time_interval
dest.on_completed();
}
- static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, time_interval_values v) {
+ static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
return make_subscriber<value_type>(d, this_type(d, v.coordination));
}
};
@@ -79,38 +103,51 @@ struct time_interval
}
};
-template <class Coordination>
-class time_interval_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
-
- coordination_type coordination;
-public:
- time_interval_factory(coordination_type ct)
- : coordination(std::move(ct)) { }
-
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<typename rxsc::scheduler::clock_type::time_point::duration>(time_interval<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))) {
- return source.template lift<typename rxsc::scheduler::clock_type::time_point::duration>(time_interval<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination));
- }
-
-};
-
}
-template <class Coordination>
-inline auto time_interval(Coordination ct)
--> detail::time_interval_factory<Coordination> {
- return detail::time_interval_factory<Coordination>(std::move(ct));
+/*! @copydoc rx-time_interval.hpp
+*/
+template<class... AN>
+auto time_interval(AN&&... an)
+ -> operator_factory<time_interval_tag, AN...> {
+ return operator_factory<time_interval_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-inline auto time_interval()
--> detail::time_interval_factory<identity_one_worker> {
- return detail::time_interval_factory<identity_one_worker>(identity_current_thread());
}
-}
+template<>
+struct member_overload<time_interval_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
+ class Value = typename rxsc::scheduler::clock_type::time_point::duration>
+ static auto member(Observable&& o)
+ -> decltype(o.template lift<Value>(TimeInterval(identity_current_thread()))) {
+ return o.template lift<Value>(TimeInterval(identity_current_thread()));
+ }
+
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
+ class Value = typename rxsc::scheduler::clock_type::time_point::duration>
+ static auto member(Observable&& o, Coordination&& cn)
+ -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
+ return o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::time_interval_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "time_interval takes (optional Coordination)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index c88f7f5..6e7931d 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -203,6 +203,7 @@
#include "operators/rx-retry.hpp"
#include "operators/rx-sequence_equal.hpp"
#include "operators/rx-take_while.hpp"
+#include "operators/rx-time_interval.hpp"
#include "operators/rx-timestamp.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-zip.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index bfd3391..ce445c6 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -876,45 +876,15 @@ public:
return lift<T>(rxo::detail::tap<T, std::tuple<MakeObserverArgN...>>(std::make_tuple(std::forward<MakeObserverArgN>(an)...)));
}
- /*! Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable.
- The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.
-
- \tparam Coordination the type of the scheduler
-
- \param coordination the scheduler for itme intervals
-
- \return Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
-
- \sample
- \snippet time_interval.cpp time_interval sample
- \snippet output.txt time_interval sample
- */
- template<class Coordination>
- auto time_interval(Coordination coordination) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<rxsc::scheduler::clock_type::time_point::duration>(rxo::detail::time_interval<T, Coordination>{coordination}))
- /// \endcond
- {
- return lift<rxsc::scheduler::clock_type::time_point::duration>(rxo::detail::time_interval<T, Coordination>{coordination});
- }
-
- /*! Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable.
- The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.
-
- \return Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
-
- \sample
- \snippet time_interval.cpp time_interval sample
- \snippet output.txt time_interval sample
- */
+ /*! @copydoc rx-time_interval.hpp
+ */
template<class... AN>
- auto time_interval(AN**...) const
+ auto time_interval(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<rxsc::scheduler::clock_type::time_point::duration>(rxo::detail::time_interval<T, identity_one_worker>{identity_current_thread()}))
+ -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return lift<rxsc::scheduler::clock_type::time_point::duration>(rxo::detail::time_interval<T, identity_one_worker>{identity_current_thread()});
- static_assert(sizeof...(AN) == 0, "time_interval() was passed too many arguments.");
+ return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
}
/*! Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index c49c801..0866a27 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -123,7 +123,6 @@ public:
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
#include "operators/rx-tap.hpp"
-#include "operators/rx-time_interval.hpp"
#include "operators/rx-timeout.hpp"
#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
@@ -297,6 +296,13 @@ struct sequence_equal_tag {
};
};
+struct time_interval_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-time_interval.hpp>");
+ };
+};
+
struct timestamp_tag {
template<class Included>
struct include_header{