summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-30 23:40:42 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-30 14:47:57 -0800
commit94dc773fbf1fa6df0e6df58a82de9aab63c26e8c (patch)
tree6e8bf9f7f57276d50493692c953d29648783db53 /Rx/v2
parent800030070b2daa0085bc9c0d34a975bc2555c5a1 (diff)
downloadRxCpp-94dc773fbf1fa6df0e6df58a82de9aab63c26e8c.tar.gz
decouple buffer_with_time_or_count from observable
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp102
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp48
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/buffer.cpp5
6 files changed, 96 insertions, 70 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
index 79196e9..cd49196 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
@@ -317,7 +317,7 @@ struct member_overload<buffer_with_time_tag>
static operators::detail::buffer_with_time_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "buffer takes (Duration, optional Duration, optional Coordination)");
+ static_assert(sizeof...(AN) == 10000, "buffer_with_time takes (Duration, optional Duration, optional Coordination)");
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
index 936315d..9b30570 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
@@ -2,6 +2,28 @@
#pragma once
+/*! \file rx-buffer_with_time_or_count.hpp
+
+ \brief Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
+
+ \tparam Duration the type of the time interval.
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer.
+ \param count the maximum size of each buffer before it is emitted and new buffer is created.
+ \param coordination the scheduler for the buffers (optional).
+
+ \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
+
+ \sample
+ \snippet buffer.cpp buffer period+count+coordination sample
+ \snippet output.txt buffer period+count+coordination sample
+
+ \sample
+ \snippet buffer.cpp buffer period+count sample
+ \snippet output.txt buffer period+count sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP)
#define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP
@@ -13,13 +35,21 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct buffer_with_time_or_count_tag_invalid_arguments {};
+
+template<class... AN>
+struct buffer_with_time_or_count_tag_invalid : public rxo::operator_base<buffer_with_time_or_count_tag_invalid_arguments<AN...>> {
+ using type = observable<buffer_with_time_or_count_tag_invalid_arguments<AN...>, buffer_with_time_or_count_tag_invalid<AN...>>;
+};
+template<class... AN>
+using buffer_with_time_or_count_tag_invalid_t = typename buffer_with_time_or_count_tag_invalid<AN...>::type;
+
template<class T, class Duration, class Coordination>
struct buffer_with_time_or_count
{
- static_assert(std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, "Duration parameter must convert to rxsc::scheduler::clock_type::duration");
- static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");
-
typedef rxu::decay_t<T> source_value_type;
+ typedef std::vector<source_value_type> value_type;
typedef rxu::decay_t<Coordination> coordination_type;
typedef typename coordination_type::coordinator_type coordinator_type;
typedef rxu::decay_t<Duration> duration_type;
@@ -192,34 +222,54 @@ struct buffer_with_time_or_count
}
};
-template<class Duration, class Coordination>
-class buffer_with_time_or_count_factory
-{
- typedef rxu::decay_t<Duration> duration_type;
- typedef rxu::decay_t<Coordination> coordination_type;
-
- duration_type period;
- duration_type skip;
- coordination_type coordination;
-public:
- buffer_with_time_or_count_factory(duration_type p, duration_type s, coordination_type c) : period(p), skip(s), coordination(c) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time_or_count<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination))) {
- return source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time_or_count<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination));
- }
-};
-
}
-template<class Duration, class Coordination>
-inline auto buffer_with_time_or_count(Duration period, int count, Coordination coordination)
- -> detail::buffer_with_time_or_count_factory<Duration, Coordination> {
- return detail::buffer_with_time_or_count_factory<Duration, Coordination>(period, count, coordination);
+/*! @copydoc rx-buffer_time_count.hpp
+*/
+template<class... AN>
+auto buffer_with_time_or_count(AN&&... an)
+ -> operator_factory<buffer_with_time_or_count_tag, AN...> {
+ return operator_factory<buffer_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
}
-}
+template<>
+struct member_overload<buffer_with_time_or_count_tag>
+{
+ template<class Observable, class Duration,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
+ class Value = rxu::value_type_t<BufferTimeCount>>
+ static auto member(Observable&& o, Duration&& period, int count)
+ -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
+ return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
+ }
+ template<class Observable, class Duration, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<BufferTimeCount>>
+ static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
+ -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
+ return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "buffer_with_time_or_count takes (Duration, Count, optional Coordination)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 50dce24..5e01036 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -187,6 +187,7 @@
#include "operators/rx-any.hpp"
#include "operators/rx-buffer_count.hpp"
#include "operators/rx-buffer_time.hpp"
+#include "operators/rx-buffer_time_count.hpp"
#include "operators/rx-combine_latest.hpp"
#include "operators/rx-debounce.hpp"
#include "operators/rx-delay.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index c534de4..83061ca 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1244,47 +1244,15 @@ public:
return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
- \param count the maximum size of each buffer before it is emitted and new buffer is created
- \param coordination the scheduler for the buffers
-
- \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
-
- \sample
- \snippet buffer.cpp buffer period+count+coordination sample
- \snippet output.txt buffer period+count+coordination sample
- */
- template<class Coordination>
- auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count, Coordination coordination) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination)))
- /// \endcond
- {
- return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination));
- }
-
- /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
-
- \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
- \param count the maximum size of each buffer before it is emitted and new buffer is created
-
- \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
-
- \sample
- \snippet buffer.cpp buffer period+count sample
- \snippet output.txt buffer period+count sample
- */
- template<class Duration>
- auto buffer_with_time_or_count(Duration period, int count) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread())))
- /// \endcond
+ /*! @copydoc rx-buffer_time.hpp
+ */
+ template<class... AN>
+ auto buffer_with_time_or_count(AN&&... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
{
- return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()));
+ return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
}
/// \cond SHOW_SERVICE_MEMBERS
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 7fe6b20..0cd3150 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -95,7 +95,6 @@ public:
}
-#include "operators/rx-buffer_time_count.hpp"
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
@@ -166,6 +165,13 @@ struct buffer_with_time_tag {
};
};
+struct buffer_with_time_or_count_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-buffer_time_count.hpp>");
+ };
+};
+
struct combine_latest_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/buffer.cpp b/Rx/v2/test/operators/buffer.cpp
index e01e12f..509d9f9 100644
--- a/Rx/v2/test/operators/buffer.cpp
+++ b/Rx/v2/test/operators/buffer.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-buffer_count.hpp>
#include <rxcpp/operators/rx-buffer_time.hpp>
+#include <rxcpp/operators/rx-buffer_time_count.hpp>
#include <rxcpp/operators/rx-take.hpp>
SCENARIO("buffer count partial window", "[buffer][operators]"){
@@ -979,9 +980,9 @@ SCENARIO("buffer with time or count, basic", "[buffer_with_time_or_count][operat
auto res = w.start(
[&]() {
return xs
- .buffer_with_time_or_count(milliseconds(70), 3, so)
+ | rxo::buffer_with_time_or_count(milliseconds(70), 3, so)
// forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
+ | rxo::as_dynamic();
}
);