summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-03-15 10:41:03 +0300
committerGrigoriy Chudnov <g.chudnov@gmail.com>2016-03-15 10:41:03 +0300
commit089984da8a7d102643f61d352c2415d79a6850e3 (patch)
treee341bfe0c27a5124aa7742e650d3f760bc6eb158 /Rx
parentc2df04fc77596f500ea02c8e99d591ad9ec9f978 (diff)
downloadRxCpp-089984da8a7d102643f61d352c2415d79a6850e3.tar.gz
add debounce operator
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/debounce.cpp20
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-debounce.hpp216
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp44
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/debounce.cpp212
6 files changed, 494 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/debounce.cpp b/Rx/v2/examples/doxygen/debounce.cpp
new file mode 100644
index 0000000..b90183d
--- /dev/null
+++ b/Rx/v2/examples/doxygen/debounce.cpp
@@ -0,0 +1,20 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("debounce sample"){
+ printf("//! [debounce sample]\n");
+ using namespace std::chrono;
+ auto scheduler = rxcpp::identity_current_thread();
+ auto start = scheduler.now();
+ auto period = milliseconds(10);
+ auto values = rxcpp::observable<>::interval(start, period, scheduler).
+ take(4).
+ debounce(period);
+ values.
+ subscribe(
+ [](long v) { printf("OnNext: %ld\n", v); },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [debounce sample]\n");
+} \ No newline at end of file
diff --git a/Rx/v2/src/rxcpp/operators/rx-debounce.hpp b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp
new file mode 100644
index 0000000..bd8ec60
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp
@@ -0,0 +1,216 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP)
+#define RXCPP_OPERATORS_RX_DEBOUNCE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Duration, class Coordination>
+struct debounce
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+ typedef rxu::decay_t<Duration> duration_type;
+
+ struct debounce_values
+ {
+ debounce_values(duration_type p, coordination_type c)
+ : period(p)
+ , coordination(c)
+ {
+ }
+
+ duration_type period;
+ coordination_type coordination;
+ };
+ debounce_values initial;
+
+ debounce(duration_type period, coordination_type coordination)
+ : initial(period, coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct debounce_observer
+ {
+ typedef debounce_observer<Subscriber> this_type;
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<T, this_type> observer_type;
+
+ struct debounce_subscriber_values : public debounce_values
+ {
+ debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
+ : debounce_values(v)
+ , cs(std::move(cs))
+ , dest(std::move(d))
+ , coordinator(std::move(c))
+ , worker(coordinator.get_worker())
+ , index(0)
+ {
+ }
+
+ composite_subscription cs;
+ dest_type dest;
+ coordinator_type coordinator;
+ rxsc::worker worker;
+ mutable std::size_t index;
+ mutable rxu::maybe<value_type> value;
+ };
+ typedef std::shared_ptr<debounce_subscriber_values> state_type;
+ state_type state;
+
+ debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
+ : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
+ {
+ auto localState = state;
+
+ auto disposer = [=](const rxsc::schedulable&){
+ localState->cs.unsubscribe();
+ localState->dest.unsubscribe();
+ localState->worker.unsubscribe();
+ };
+ auto selectedDisposer = on_exception(
+ [&](){ return localState->coordinator.act(disposer); },
+ localState->dest);
+ if (selectedDisposer.empty()) {
+ return;
+ }
+
+ localState->dest.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ localState->cs.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ }
+
+ static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t id, state_type state) {
+ auto produce = [id, state](const rxsc::schedulable&) {
+ if(id != state->index)
+ return;
+
+ state->dest.on_next(*state->value);
+ state->value.reset();
+ };
+
+ auto selectedProduce = on_exception(
+ [&](){ return state->coordinator.act(produce); },
+ state->dest);
+ if (selectedProduce.empty()) {
+ return std::function<void(const rxsc::schedulable&)>();
+ }
+
+ return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
+ }
+
+ void on_next(T v) const {
+ auto localState = state;
+ auto work = [v, localState](const rxsc::schedulable&) {
+ auto new_id = ++localState->index;
+ auto produce_time = localState->worker.now() + localState->period;
+
+ localState->value.reset(v);
+ localState->worker.schedule(produce_time, produce_item(new_id, localState));
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_error(std::exception_ptr e) const {
+ auto localState = state;
+ auto work = [e, localState](const rxsc::schedulable&) {
+ localState->dest.on_error(e);
+ localState->value.reset();
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_completed() const {
+ auto localState = state;
+ auto work = [localState](const rxsc::schedulable&) {
+ if(!localState->value.empty()) {
+ localState->dest.on_next(*localState->value);
+ }
+ localState->dest.on_completed();
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, debounce_values v) {
+ auto cs = composite_subscription();
+ auto coordinator = v.coordination.create_coordinator();
+
+ return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) {
+ return debounce_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class Duration, class Coordination>
+class debounce_factory
+{
+ typedef rxu::decay_t<Duration> duration_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ duration_type period;
+ coordination_type coordination;
+public:
+ debounce_factory(duration_type p, coordination_type c) : period(p), coordination(c) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(debounce<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(debounce<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination));
+ }
+};
+
+}
+
+template<class Duration, class Coordination>
+inline auto debounce(Duration period, Coordination coordination)
+ -> detail::debounce_factory<Duration, Coordination> {
+ return detail::debounce_factory<Duration, Coordination>(period, coordination);
+}
+
+template<class Duration>
+inline auto debounce(Duration period)
+ -> detail::debounce_factory<Duration, identity_one_worker> {
+ return detail::debounce_factory<Duration, identity_one_worker>(period, identity_current_thread());
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index e2646ed..08071ef 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -838,6 +838,50 @@ public:
return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)));
}
+ /*! Return an observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable.
+
+ \tparam Duration the type of time interval
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time to suppress any emitted items
+ \param coordination the scheduler to manage timeout for each event
+
+ \return Observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable.
+
+ \sample
+ \snippet debounce.cpp debounce sample
+ \snippet output.txt debounce sample
+ */
+ template<class Duration, class Coordination>
+ auto debounce(Duration period, Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::debounce<T, Duration, Coordination>(period, coordination)))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::debounce<T, Duration, Coordination>(period, coordination));
+ }
+
+ /*! Return an observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable.
+
+ \tparam Duration the type of time interval
+
+ \param period the period of time to suppress any emitted items
+
+ \return Observable that emits an item if a particular timespan has passed without emitting another item from the source ovservable.
+
+ \sample
+ \snippet debounce.cpp debounce sample
+ \snippet output.txt debounce sample
+ */
+ template<class Duration>
+ auto debounce(Duration period) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::debounce<T, Duration, identity_one_worker>(period, identity_current_thread())))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::debounce<T, Duration, identity_one_worker>(period, identity_current_thread()));
+ }
+
/*! Return an observable that emits each item emitted by the source observable after the specified delay.
\tparam Duration the type of time interval
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 81e5b1f..21c7bf8 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -42,6 +42,7 @@ namespace rxo=operators;
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
+#include "operators/rx-debounce.hpp"
#include "operators/rx-delay.hpp"
#include "operators/rx-distinct.hpp"
#include "operators/rx-distinct_until_changed.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 43a942d..8b9b100 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/combine_latest.cpp
${TEST_DIR}/operators/concat.cpp
${TEST_DIR}/operators/concat_map.cpp
+ ${TEST_DIR}/operators/debounce.cpp
${TEST_DIR}/operators/delay.cpp
${TEST_DIR}/operators/distinct.cpp
${TEST_DIR}/operators/distinct_until_changed.cpp
diff --git a/Rx/v2/test/operators/debounce.cpp b/Rx/v2/test/operators/debounce.cpp
new file mode 100644
index 0000000..8b542c7
--- /dev/null
+++ b/Rx/v2/test/operators/debounce.cpp
@@ -0,0 +1,212 @@
+#include "../test.h"
+
+using namespace std::chrono;
+
+SCENARIO("debounce - never", "[debounce][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("values are debounceed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.debounce(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1001)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("debounce - empty", "[debounce][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("values are debounceed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.debounce(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on.completed(251)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("debounce - no overlap", "[debounce][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("values are debounceed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.debounce(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains debounced items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(221, 2),
+ on.next(251, 3),
+ on.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("debounce - overlap", "[debounce][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(215, 2),
+ on.next(225, 3),
+ on.next(235, 4),
+ on.next(245, 5),
+ on.next(255, 6),
+ on.next(265, 7),
+ on.completed(300)
+ });
+
+ WHEN("values are debounceed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.debounce(milliseconds(30), so);
+ }
+ );
+
+ THEN("the output only contains debounced items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(296, 7),
+ on.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("debounce - throw", "[debounce][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("debounce on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("values are debounceed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.debounce(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains only error"){
+ auto required = rxu::to_vector({
+ on.error(251, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}