diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-03-15 10:41:03 +0300 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-03-15 10:41:03 +0300 |
commit | 089984da8a7d102643f61d352c2415d79a6850e3 (patch) | |
tree | e341bfe0c27a5124aa7742e650d3f760bc6eb158 /Rx | |
parent | c2df04fc77596f500ea02c8e99d591ad9ec9f978 (diff) | |
download | RxCpp-089984da8a7d102643f61d352c2415d79a6850e3.tar.gz |
add debounce operator
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/examples/doxygen/debounce.cpp | 20 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-debounce.hpp | 216 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 44 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/debounce.cpp | 212 |
6 files changed, 494 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/debounce.cpp b/Rx/v2/examples/doxygen/debounce.cpp new file mode 100644 index 0000000..b90183d --- /dev/null +++ b/Rx/v2/examples/doxygen/debounce.cpp @@ -0,0 +1,20 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("debounce sample"){ + printf("//! [debounce sample]\n"); + using namespace std::chrono; + auto scheduler = rxcpp::identity_current_thread(); + auto start = scheduler.now(); + auto period = milliseconds(10); + auto values = rxcpp::observable<>::interval(start, period, scheduler). + take(4). + debounce(period); + values. + subscribe( + [](long v) { printf("OnNext: %ld\n", v); }, + []() { printf("OnCompleted\n"); }); + printf("//! [debounce sample]\n"); +}
\ No newline at end of file diff --git a/Rx/v2/src/rxcpp/operators/rx-debounce.hpp b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp new file mode 100644 index 0000000..bd8ec60 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp @@ -0,0 +1,216 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP) +#define RXCPP_OPERATORS_RX_DEBOUNCE_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Duration, class Coordination> +struct debounce +{ + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Coordination> coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + typedef rxu::decay_t<Duration> duration_type; + + struct debounce_values + { + debounce_values(duration_type p, coordination_type c) + : period(p) + , coordination(c) + { + } + + duration_type period; + coordination_type coordination; + }; + debounce_values initial; + + debounce(duration_type period, coordination_type coordination) + : initial(period, coordination) + { + } + + template<class Subscriber> + struct debounce_observer + { + typedef debounce_observer<Subscriber> this_type; + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<T, this_type> observer_type; + + struct debounce_subscriber_values : public debounce_values + { + debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c) + : debounce_values(v) + , cs(std::move(cs)) + , dest(std::move(d)) + , coordinator(std::move(c)) + , worker(coordinator.get_worker()) + , index(0) + { + } + + composite_subscription cs; + dest_type dest; + coordinator_type coordinator; + rxsc::worker worker; + mutable std::size_t index; + mutable rxu::maybe<value_type> value; + }; + typedef std::shared_ptr<debounce_subscriber_values> state_type; + state_type state; + + debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c) + : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) + { + auto localState = state; + + auto disposer = [=](const rxsc::schedulable&){ + localState->cs.unsubscribe(); + localState->dest.unsubscribe(); + localState->worker.unsubscribe(); + }; + auto selectedDisposer = on_exception( + [&](){ return localState->coordinator.act(disposer); }, + localState->dest); + if (selectedDisposer.empty()) { + return; + } + + localState->dest.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + localState->cs.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + } + + static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t id, state_type state) { + auto produce = [id, state](const rxsc::schedulable&) { + if(id != state->index) + return; + + state->dest.on_next(*state->value); + state->value.reset(); + }; + + auto selectedProduce = on_exception( + [&](){ return state->coordinator.act(produce); }, + state->dest); + if (selectedProduce.empty()) { + return std::function<void(const rxsc::schedulable&)>(); + } + + return std::function<void(const rxsc::schedulable&)>(selectedProduce.get()); + } + + void on_next(T v) const { + auto localState = state; + auto work = [v, localState](const rxsc::schedulable&) { + auto new_id = ++localState->index; + auto produce_time = localState->worker.now() + localState->period; + + localState->value.reset(v); + localState->worker.schedule(produce_time, produce_item(new_id, localState)); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_error(std::exception_ptr e) const { + auto localState = state; + auto work = [e, localState](const rxsc::schedulable&) { + localState->dest.on_error(e); + localState->value.reset(); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_completed() const { + auto localState = state; + auto work = [localState](const rxsc::schedulable&) { + if(!localState->value.empty()) { + localState->dest.on_next(*localState->value); + } + localState->dest.on_completed(); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + static subscriber<T, observer_type> make(dest_type d, debounce_values v) { + auto cs = composite_subscription(); + auto coordinator = v.coordination.create_coordinator(); + + return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) { + return debounce_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template<class Duration, class Coordination> +class debounce_factory +{ + typedef rxu::decay_t<Duration> duration_type; + typedef rxu::decay_t<Coordination> coordination_type; + + duration_type period; + coordination_type coordination; +public: + debounce_factory(duration_type p, coordination_type c) : period(p), coordination(c) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(debounce<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(debounce<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination)); + } +}; + +} + +template<class Duration, class Coordination> +inline auto debounce(Duration period, Coordination coordination) + -> detail::debounce_factory<Duration, Coordination> { + return detail::debounce_factory<Duration, Coordination>(period, coordination); +} + +template<class Duration> +inline auto debounce(Duration period) + -> detail::debounce_factory<Duration, identity_one_worker> { + return detail::debounce_factory<Duration, identity_one_worker>(period, identity_current_thread()); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index e2646ed..08071ef 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -838,6 +838,50 @@ public: return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s))); } + /*! Return an observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable. + + \tparam Duration the type of time interval + \tparam Coordination the type of the scheduler + + \param period the period of time to suppress any emitted items + \param coordination the scheduler to manage timeout for each event + + \return Observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable. + + \sample + \snippet debounce.cpp debounce sample + \snippet output.txt debounce sample + */ + template<class Duration, class Coordination> + auto debounce(Duration period, Coordination coordination) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::debounce<T, Duration, Coordination>(period, coordination))) + /// \endcond + { + return lift<T>(rxo::detail::debounce<T, Duration, Coordination>(period, coordination)); + } + + /*! Return an observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable. + + \tparam Duration the type of time interval + + \param period the period of time to suppress any emitted items + + \return Observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable. + + \sample + \snippet debounce.cpp debounce sample + \snippet output.txt debounce sample + */ + template<class Duration> + auto debounce(Duration period) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::debounce<T, Duration, identity_one_worker>(period, identity_current_thread()))) + /// \endcond + { + return lift<T>(rxo::detail::debounce<T, Duration, identity_one_worker>(period, identity_current_thread())); + } + /*! Return an observable that emits each item emitted by the source observable after the specified delay. \tparam Duration the type of time interval diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 81e5b1f..21c7bf8 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -42,6 +42,7 @@ namespace rxo=operators; #include "operators/rx-concat.hpp" #include "operators/rx-concat_map.hpp" #include "operators/rx-connect_forever.hpp" +#include "operators/rx-debounce.hpp" #include "operators/rx-delay.hpp" #include "operators/rx-distinct.hpp" #include "operators/rx-distinct_until_changed.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 43a942d..8b9b100 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -33,6 +33,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/combine_latest.cpp ${TEST_DIR}/operators/concat.cpp ${TEST_DIR}/operators/concat_map.cpp + ${TEST_DIR}/operators/debounce.cpp ${TEST_DIR}/operators/delay.cpp ${TEST_DIR}/operators/distinct.cpp ${TEST_DIR}/operators/distinct_until_changed.cpp diff --git a/Rx/v2/test/operators/debounce.cpp b/Rx/v2/test/operators/debounce.cpp new file mode 100644 index 0000000..8b542c7 --- /dev/null +++ b/Rx/v2/test/operators/debounce.cpp @@ -0,0 +1,212 @@ +#include "../test.h" + +using namespace std::chrono; + +SCENARIO("debounce - never", "[debounce][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("values are debounceed"){ + + auto res = w.start( + [so, xs]() { + return xs.debounce(milliseconds(10), so); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1001) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("debounce - empty", "[debounce][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("values are debounceed"){ + + auto res = w.start( + [so, xs]() { + return xs.debounce(milliseconds(10), so); + } + ); + + THEN("the output only contains complete message"){ + auto required = rxu::to_vector({ + on.completed(251) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("debounce - no overlap", "[debounce][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("values are debounceed"){ + + auto res = w.start( + [so, xs]() { + return xs.debounce(milliseconds(10), so); + } + ); + + THEN("the output only contains debounced items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(221, 2), + on.next(251, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("debounce - overlap", "[debounce][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(215, 2), + on.next(225, 3), + on.next(235, 4), + on.next(245, 5), + on.next(255, 6), + on.next(265, 7), + on.completed(300) + }); + + WHEN("values are debounceed"){ + + auto res = w.start( + [so, xs]() { + return xs.debounce(milliseconds(30), so); + } + ); + + THEN("the output only contains debounced items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(296, 7), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("debounce - throw", "[debounce][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("debounce on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("values are debounceed"){ + + auto res = w.start( + [so, xs]() { + return xs.debounce(milliseconds(10), so); + } + ); + + THEN("the output only contains only error"){ + auto required = rxu::to_vector({ + on.error(251, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |