summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-31 10:20:51 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-31 02:41:53 -0800
commitac2ac8d9a610b908568d09e0abae90047e1fa96c (patch)
tree1f1c842089285774859591884e1a9995dfa8ce25 /Rx/v2/src
parent21552b41774320a3f91c3477d37f1a41a916a741 (diff)
downloadRxCpp-ac2ac8d9a610b908568d09e0abae90047e1fa96c.tar.gz
decouple window from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window.hpp91
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp5
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp42
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 87 insertions, 59 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-window.hpp b/Rx/v2/src/rxcpp/operators/rx-window.hpp
index 3541a30..e5d2c6f 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window.hpp
@@ -2,6 +2,26 @@
#pragma once
+/*! \file rx-window.hpp
+
+ \brief Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
+ If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable.
+
+ \param count the maximum size of each window before it should be completed
+ \param skip how many items need to be skipped before starting a new window
+
+ \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
+ If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable.
+
+ \sample
+ \snippet window.cpp window count+skip sample
+ \snippet output.txt window count+skip sample
+
+ \sample
+ \snippet window.cpp window count sample
+ \snippet output.txt window count sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP)
#define RXCPP_OPERATORS_RX_WINDOW_HPP
@@ -13,10 +33,22 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct window_invalid_arguments {};
+
+template<class... AN>
+struct window_invalid : public rxo::operator_base<window_invalid_arguments<AN...>> {
+ using type = observable<window_invalid_arguments<AN...>, window_invalid<AN...>>;
+};
+template<class... AN>
+using window_invalid_t = typename window_invalid<AN...>::type;
+
template<class T>
struct window
{
typedef rxu::decay_t<T> source_value_type;
+ typedef observable<source_value_type> value_type;
+
struct window_values
{
window_values(int c, int s)
@@ -98,32 +130,51 @@ struct window
}
};
-class window_factory
-{
- int count;
- int skip;
-public:
- window_factory(int c, int s) : count(c), skip(s) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window<rxu::value_type_t<rxu::decay_t<Observable>>>(count, skip))) {
- return source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window<rxu::value_type_t<rxu::decay_t<Observable>>>(count, skip));
- }
-};
-
}
-inline auto window(int count)
- -> detail::window_factory {
- return detail::window_factory(count, count);
-}
-inline auto window(int count, int skip)
- -> detail::window_factory {
- return detail::window_factory(count, skip);
+/*! @copydoc rx-window.hpp
+*/
+template<class... AN>
+auto window(AN&&... an)
+ -> operator_factory<window_tag, AN...> {
+ return operator_factory<window_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<window_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Window = rxo::detail::window<SourceValue>,
+ class Value = rxu::value_type_t<Window>>
+ static auto member(Observable&& o, int count, int skip)
+ -> decltype(o.template lift<Value>(Window(count, skip))) {
+ return o.template lift<Value>(Window(count, skip));
+ }
+
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Window = rxo::detail::window<SourceValue>,
+ class Value = rxu::value_type_t<Window>>
+ static auto member(Observable&& o, int count)
+ -> decltype(o.template lift<Value>(Window(count, count))) {
+ return o.template lift<Value>(Window(count, count));
+ }
+
+ template<class... AN>
+ static operators::detail::window_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "window takes (Count, optional Skip)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 5e01036..551c031 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -182,8 +182,8 @@
#include "rx-grouped_observable.hpp"
#if !defined(RXCPP_LITE)
-#include "operators/rx-amb.hpp"
#include "operators/rx-all.hpp"
+#include "operators/rx-amb.hpp"
#include "operators/rx-any.hpp"
#include "operators/rx-buffer_count.hpp"
#include "operators/rx-buffer_time.hpp"
@@ -208,9 +208,10 @@
#include "operators/rx-skip.hpp"
#include "operators/rx-take.hpp"
#include "operators/rx-take_while.hpp"
-#include "operators/rx-timeout.hpp"
#include "operators/rx-time_interval.hpp"
+#include "operators/rx-timeout.hpp"
#include "operators/rx-timestamp.hpp"
+#include "operators/rx-window.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-zip.hpp"
#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 83061ca..d4a97d9 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -997,45 +997,15 @@ public:
return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
-
- \param count the maximum size of each window before it should be completed
-
- \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
-
- \sample
- \snippet window.cpp window count sample
- \snippet output.txt window count sample
+ /*! @copydoc rx-window.hpp
*/
template<class... AN>
- auto window(int count, AN**...) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count)))
- /// \endcond
- {
- return lift<observable<T>>(rxo::detail::window<T>(count, count));
- static_assert(sizeof...(AN) == 0, "window(count) was passed too many arguments.");
- }
-
- /*! Return an observable that emits windows every skip items containing at most count items from the source observable.
-
- \param count the maximum size of each window before it should be completed
- \param skip how many items need to be skipped before starting a new window
-
- \return Observable that emits windows every skip items containing at most count items from the source observable.
-
- \sample
- \snippet window.cpp window count+skip sample
- \snippet output.txt window count+skip sample
- */
- template<class... AN>
- auto window(int count, int skip, AN**...) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip)))
- /// \endcond
+ auto window(AN&&... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
{
- return lift<observable<T>>(rxo::detail::window<T>(count, skip));
- static_assert(sizeof...(AN) == 0, "window(count, skip) was passed too many arguments.");
+ return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
}
/*! Return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 0cd3150..b290a1e 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -118,7 +118,6 @@ public:
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
#include "operators/rx-tap.hpp"
-#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
#include "operators/rx-window_time_count.hpp"
#include "operators/rx-window_toggle.hpp"
@@ -346,6 +345,13 @@ struct timestamp_tag {
};
};
+struct window_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-window.hpp>");
+ };
+};
+
struct with_latest_from_tag {
template<class Included>
struct include_header{