summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-01 18:53:32 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-01 09:12:35 -0800
commit224c011cd294f0bbdcc56db766715c63f479991b (patch)
tree10b7b6ac954ec7faeb529ee7460cf180fe47f4e5 /Rx/v2/src
parent465ccc260f69c2be4d73cf0432580265db7dbeb5 (diff)
downloadRxCpp-224c011cd294f0bbdcc56db766715c63f479991b.tar.gz
decouple window_with_time from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_time.hpp136
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp95
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 125 insertions, 115 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_time.hpp b/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
index cc6a398..083317b 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
@@ -2,6 +2,38 @@
#pragma once
+/*! \file rx-window_time.hpp
+
+ \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+ If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+
+ \tparam Duration the type of time intervals.
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param period the period of time each window collects items before it is completed.
+ \param skip the period of time after which a new window will be created.
+ \param coordination the scheduler for the windows (optional).
+
+ \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
+ If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
+
+ \sample
+ \snippet window.cpp window period+skip+coordination sample
+ \snippet output.txt window period+skip+coordination sample
+
+ \sample
+ \snippet window.cpp window period+skip sample
+ \snippet output.txt window period+skip sample
+
+ \sample
+ \snippet window.cpp window period+coordination sample
+ \snippet output.txt window period+coordination sample
+
+ \sample
+ \snippet window.cpp window period sample
+ \snippet output.txt window period sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP)
#define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP
@@ -13,10 +45,21 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct window_with_time_invalid_arguments {};
+
+template<class... AN>
+struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
+ using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>;
+};
+template<class... AN>
+using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type;
+
template<class T, class Duration, class Coordination>
struct window_with_time
{
typedef rxu::decay_t<T> source_value_type;
+ typedef observable<source_value_type> value_type;
typedef rxu::decay_t<Coordination> coordination_type;
typedef typename coordination_type::coordinator_type coordinator_type;
typedef rxu::decay_t<Duration> duration_type;
@@ -201,40 +244,79 @@ struct window_with_time
}
};
-template<class Duration, class Coordination>
-class window_with_time_factory
-{
- typedef rxu::decay_t<Duration> duration_type;
- typedef rxu::decay_t<Coordination> coordination_type;
+}
- duration_type period;
- duration_type skip;
- coordination_type coordination;
-public:
- window_with_time_factory(duration_type p, duration_type s, coordination_type c) : period(p), skip(s), coordination(c) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination))) {
- return source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination));
- }
-};
+/*! @copydoc rx-window_with_time.hpp
+*/
+template<class... AN>
+auto window_with_time(AN&&... an)
+ -> operator_factory<window_with_time_tag, AN...> {
+ return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
}
-template<class Duration, class Coordination>
-inline auto window_with_time(Duration period, Coordination coordination)
- -> detail::window_with_time_factory<Duration, Coordination> {
- return detail::window_with_time_factory<Duration, Coordination>(period, period, coordination);
-}
+template<>
+struct member_overload<window_with_time_tag>
+{
+ template<class Observable, class Duration,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
+ class Value = rxu::value_type_t<WindowWithTime>>
+ static auto member(Observable&& o, Duration period)
+ -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) {
+ return o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()));
+ }
-template<class Duration, class Coordination>
-inline auto window_with_time(Duration period, Duration skip, Coordination coordination)
- -> detail::window_with_time_factory<Duration, Coordination> {
- return detail::window_with_time_factory<Duration, Coordination>(period, skip, coordination);
-}
+ template<class Observable, class Duration, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<WindowWithTime>>
+ static auto member(Observable&& o, Duration period, Coordination&& cn)
+ -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
+ return o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
+ }
-}
+ template<class Observable, class Duration,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
+ class Value = rxu::value_type_t<WindowWithTime>>
+ static auto member(Observable&& o, Duration&& period, Duration&& skip)
+ -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
+ return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
+ }
+
+ template<class Observable, class Duration, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<WindowWithTime>>
+ static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
+ -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
+ return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
+ }
+ template<class... AN>
+ static operators::detail::window_with_time_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 551c031..9fdbbfa 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -212,6 +212,7 @@
#include "operators/rx-timeout.hpp"
#include "operators/rx-timestamp.hpp"
#include "operators/rx-window.hpp"
+#include "operators/rx-window_time.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-zip.hpp"
#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index d4a97d9..65bab7d 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1008,94 +1008,15 @@ public:
return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
-
- \tparam Duration the type of time intervals
- \tparam Coordination the type of the scheduler
-
- \param period the period of time each window collects items before it is completed
- \param skip the period of time after which a new window will be created
- \param coordination the scheduler for the windows
-
- \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
-
- \sample
- \snippet window.cpp window period+skip+coordination sample
- \snippet output.txt window period+skip+coordination sample
- */
- template<class Duration, class Coordination>
- auto window_with_time(Duration period, Duration skip, Coordination coordination) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination)))
- /// \endcond
- {
- return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination));
- }
-
- /*! Return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable.
-
- \tparam Duration the type of time intervals
-
- \param period the period of time each window collects items before it is completed
- \param skip the period of time after which a new window will be created
-
- \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
-
- \sample
- \snippet window.cpp window period+skip sample
- \snippet output.txt window period+skip sample
+ /*! @copydoc rx-window_time.hpp
*/
- template<class Duration>
- auto window_with_time(Duration period, Duration skip) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread())))
- /// \endcond
- {
- return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()));
- }
-
- /*! Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
-
- \tparam Duration the type of time intervals
- \tparam Coordination the type of the scheduler
-
- \param period the period of time each window collects items before it is completed and replaced with a new window
- \param coordination the scheduler for the windows
-
- \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
-
- \sample
- \snippet window.cpp window period+coordination sample
- \snippet output.txt window period+coordination sample
- */
- template<class Duration, class Coordination, class Reqiures = typename rxu::types_checked_from<typename Coordination::coordination_tag>::type>
- auto window_with_time(Duration period, Coordination coordination) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination)))
- /// \endcond
- {
- return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination));
- }
-
- /*! Return an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
-
- \tparam Duration the type of time intervals
-
- \param period the period of time each window collects items before it is completed and replaced with a new window
-
- \return Observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
-
- \sample
- \snippet window.cpp window period sample
- \snippet output.txt window period sample
- */
- template<class Duration>
- auto window_with_time(Duration period) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread())))
- /// \endcond
+ template<class... AN>
+ auto window_with_time(AN&&... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
{
- return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()));
+ return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
}
/*! Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
@@ -3174,7 +3095,7 @@ public:
//
// support range() >> filter() >> subscribe() syntax
// '>>' is spelled 'stream'
-//
+//K
template<class T, class SourceOperator, class OperatorFactory>
auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index b290a1e..70d0682 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -118,7 +118,6 @@ public:
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
#include "operators/rx-tap.hpp"
-#include "operators/rx-window_time.hpp"
#include "operators/rx-window_time_count.hpp"
#include "operators/rx-window_toggle.hpp"
@@ -352,6 +351,13 @@ struct window_tag {
};
};
+struct window_with_time_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-window_time.hpp>");
+ };
+};
+
struct with_latest_from_tag {
template<class Included>
struct include_header{