diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-12-30 23:40:42 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-30 14:47:57 -0800 |
commit | 94dc773fbf1fa6df0e6df58a82de9aab63c26e8c (patch) | |
tree | 6e8bf9f7f57276d50493692c953d29648783db53 /Rx/v2 | |
parent | 800030070b2daa0085bc9c0d34a975bc2555c5a1 (diff) | |
download | RxCpp-94dc773fbf1fa6df0e6df58a82de9aab63c26e8c.tar.gz |
decouple buffer_with_time_or_count from observable
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp | 102 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 48 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/buffer.cpp | 5 |
6 files changed, 96 insertions, 70 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp index 79196e9..cd49196 100644 --- a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp @@ -317,7 +317,7 @@ struct member_overload<buffer_with_time_tag> static operators::detail::buffer_with_time_invalid_t<AN...> member(AN...) { std::terminate(); return {}; - static_assert(sizeof...(AN) == 10000, "buffer takes (Duration, optional Duration, optional Coordination)"); + static_assert(sizeof...(AN) == 10000, "buffer_with_time takes (Duration, optional Duration, optional Coordination)"); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp index 936315d..9b30570 100644 --- a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp @@ -2,6 +2,28 @@ #pragma once +/*! \file rx-buffer_with_time_or_count.hpp + + \brief Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler. + + \tparam Duration the type of the time interval. + \tparam Coordination the type of the scheduler (optional). + + \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer. + \param count the maximum size of each buffer before it is emitted and new buffer is created. + \param coordination the scheduler for the buffers (optional). + + \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). + + \sample + \snippet buffer.cpp buffer period+count+coordination sample + \snippet output.txt buffer period+count+coordination sample + + \sample + \snippet buffer.cpp buffer period+count sample + \snippet output.txt buffer period+count sample +*/ + #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP) #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP @@ -13,13 +35,21 @@ namespace operators { namespace detail { +template<class... AN> +struct buffer_with_time_or_count_tag_invalid_arguments {}; + +template<class... AN> +struct buffer_with_time_or_count_tag_invalid : public rxo::operator_base<buffer_with_time_or_count_tag_invalid_arguments<AN...>> { + using type = observable<buffer_with_time_or_count_tag_invalid_arguments<AN...>, buffer_with_time_or_count_tag_invalid<AN...>>; +}; +template<class... AN> +using buffer_with_time_or_count_tag_invalid_t = typename buffer_with_time_or_count_tag_invalid<AN...>::type; + template<class T, class Duration, class Coordination> struct buffer_with_time_or_count { - static_assert(std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, "Duration parameter must convert to rxsc::scheduler::clock_type::duration"); - static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination"); - typedef rxu::decay_t<T> source_value_type; + typedef std::vector<source_value_type> value_type; typedef rxu::decay_t<Coordination> coordination_type; typedef typename coordination_type::coordinator_type coordinator_type; typedef rxu::decay_t<Duration> duration_type; @@ -192,34 +222,54 @@ struct buffer_with_time_or_count } }; -template<class Duration, class Coordination> -class buffer_with_time_or_count_factory -{ - typedef rxu::decay_t<Duration> duration_type; - typedef rxu::decay_t<Coordination> coordination_type; - - duration_type period; - duration_type skip; - coordination_type coordination; -public: - buffer_with_time_or_count_factory(duration_type p, duration_type s, coordination_type c) : period(p), skip(s), coordination(c) {} - template<class Observable> - auto operator()(Observable&& source) - -> decltype(source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time_or_count<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination))) { - return source.template lift<std::vector<rxu::value_type_t<rxu::decay_t<Observable>>>>(buffer_with_time_or_count<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, skip, coordination)); - } -}; - } -template<class Duration, class Coordination> -inline auto buffer_with_time_or_count(Duration period, int count, Coordination coordination) - -> detail::buffer_with_time_or_count_factory<Duration, Coordination> { - return detail::buffer_with_time_or_count_factory<Duration, Coordination>(period, count, coordination); +/*! @copydoc rx-buffer_time_count.hpp +*/ +template<class... AN> +auto buffer_with_time_or_count(AN&&... an) + -> operator_factory<buffer_with_time_or_count_tag, AN...> { + return operator_factory<buffer_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + } -} +template<> +struct member_overload<buffer_with_time_or_count_tag> +{ + template<class Observable, class Duration, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>, + class SourceValue = rxu::value_type_t<Observable>, + class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>, + class Value = rxu::value_type_t<BufferTimeCount>> + static auto member(Observable&& o, Duration&& period, int count) + -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) { + return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread())); + } + template<class Observable, class Duration, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<BufferTimeCount>> + static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn) + -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) { + return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::buffer_count_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "buffer_with_time_or_count takes (Duration, Count, optional Coordination)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 50dce24..5e01036 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -187,6 +187,7 @@ #include "operators/rx-any.hpp" #include "operators/rx-buffer_count.hpp" #include "operators/rx-buffer_time.hpp" +#include "operators/rx-buffer_time_count.hpp" #include "operators/rx-combine_latest.hpp" #include "operators/rx-debounce.hpp" #include "operators/rx-delay.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index c534de4..83061ca 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1244,47 +1244,15 @@ public: return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...); } - /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer - \param count the maximum size of each buffer before it is emitted and new buffer is created - \param coordination the scheduler for the buffers - - \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). - - \sample - \snippet buffer.cpp buffer period+count+coordination sample - \snippet output.txt buffer period+count+coordination sample - */ - template<class Coordination> - auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count, Coordination coordination) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination))) - /// \endcond - { - return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination)); - } - - /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). - - \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer - \param count the maximum size of each buffer before it is emitted and new buffer is created - - \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). - - \sample - \snippet buffer.cpp buffer period+count sample - \snippet output.txt buffer period+count sample - */ - template<class Duration> - auto buffer_with_time_or_count(Duration period, int count) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()))) - /// \endcond + /*! @copydoc rx-buffer_time.hpp + */ + template<class... AN> + auto buffer_with_time_or_count(AN&&... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond { - return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread())); + return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); } /// \cond SHOW_SERVICE_MEMBERS diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 7fe6b20..0cd3150 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -95,7 +95,6 @@ public: } -#include "operators/rx-buffer_time_count.hpp" #include "operators/rx-concat.hpp" #include "operators/rx-concat_map.hpp" #include "operators/rx-connect_forever.hpp" @@ -166,6 +165,13 @@ struct buffer_with_time_tag { }; }; +struct buffer_with_time_or_count_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-buffer_time_count.hpp>"); + }; +}; + struct combine_latest_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/buffer.cpp b/Rx/v2/test/operators/buffer.cpp index e01e12f..509d9f9 100644 --- a/Rx/v2/test/operators/buffer.cpp +++ b/Rx/v2/test/operators/buffer.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-buffer_count.hpp> #include <rxcpp/operators/rx-buffer_time.hpp> +#include <rxcpp/operators/rx-buffer_time_count.hpp> #include <rxcpp/operators/rx-take.hpp> SCENARIO("buffer count partial window", "[buffer][operators]"){ @@ -979,9 +980,9 @@ SCENARIO("buffer with time or count, basic", "[buffer_with_time_or_count][operat auto res = w.start( [&]() { return xs - .buffer_with_time_or_count(milliseconds(70), 3, so) + | rxo::buffer_with_time_or_count(milliseconds(70), 3, so) // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); + | rxo::as_dynamic(); } ); |