summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-02-26 14:53:20 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-03-01 07:25:31 -0800
commit0b3e8b7b47a11d03cf66ce8de57bf1da9108ce3b (patch)
tree428acf7ce9c8e02f6b153801f673e0f0ef968169 /Rx
parentec76ecc0a55c73d35c7cb0944f1563e039583499 (diff)
downloadRxCpp-0b3e8b7b47a11d03cf66ce8de57bf1da9108ce3b.tar.gz
add sample operator
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/sample.cpp18
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-sample_time.hpp201
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp39
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/sample.cpp158
6 files changed, 418 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/sample.cpp b/Rx/v2/examples/doxygen/sample.cpp
new file mode 100644
index 0000000..1782af2
--- /dev/null
+++ b/Rx/v2/examples/doxygen/sample.cpp
@@ -0,0 +1,18 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("sample period sample") {
+ printf("//! [sample period sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).
+ take(7).
+ sample_with_time(std::chrono::milliseconds(4));
+ values.
+ subscribe(
+ [](long v) {
+ printf("OnNext: %ld\n", v);
+ },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [sample period sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
new file mode 100644
index 0000000..c014fef
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
@@ -0,0 +1,201 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP)
+#define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Duration, class Coordination>
+struct sample_with_time
+{
+ static_assert(std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, "Duration parameter must convert to rxsc::scheduler::clock_type::duration");
+ static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");
+
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+ typedef rxu::decay_t<Duration> duration_type;
+
+ struct sample_with_time_value
+ {
+ sample_with_time_value(duration_type p, coordination_type c)
+ : period(p)
+ , coordination(c)
+ {
+ }
+ duration_type period;
+ coordination_type coordination;
+ };
+ sample_with_time_value initial;
+
+ sample_with_time(duration_type period, coordination_type coordination)
+ : initial(period, coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct sample_with_time_observer
+ {
+ typedef sample_with_time_observer<Subscriber> this_type;
+ typedef T value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+
+ struct sample_with_time_subscriber_value : public sample_with_time_value
+ {
+ sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
+ : sample_with_time_value(v)
+ , cs(std::move(cs))
+ , dest(std::move(d))
+ , coordinator(std::move(c))
+ , worker(coordinator.get_worker())
+ {
+ }
+ composite_subscription cs;
+ dest_type dest;
+ coordinator_type coordinator;
+ rxsc::worker worker;
+ mutable rxu::maybe<value_type> value;
+ };
+ std::shared_ptr<sample_with_time_subscriber_value> state;
+
+ sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
+ : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c))))
+ {
+ auto localState = state;
+
+ auto disposer = [=](const rxsc::schedulable&){
+ localState->cs.unsubscribe();
+ localState->dest.unsubscribe();
+ localState->worker.unsubscribe();
+ };
+ auto selectedDisposer = on_exception(
+ [&](){ return localState->coordinator.act(disposer); },
+ localState->dest);
+ if (selectedDisposer.empty()) {
+ return;
+ }
+
+ localState->dest.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ localState->cs.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+
+ auto produce_sample = [localState](const rxsc::schedulable&) {
+ if(!localState->value.empty()) {
+ localState->dest.on_next(*localState->value);
+ localState->value.reset();
+ }
+ };
+ auto selectedProduce = on_exception(
+ [&](){ return localState->coordinator.act(produce_sample); },
+ localState->dest);
+ if (selectedProduce.empty()) {
+ return;
+ }
+
+ state->worker.schedule_periodically(
+ localState->worker.now(),
+ localState->period,
+ [localState, selectedProduce](const rxsc::schedulable&) {
+ localState->worker.schedule(selectedProduce.get());
+ });
+ }
+
+ void on_next(T v) const {
+ auto localState = state;
+ auto work = [v, localState](const rxsc::schedulable&) {
+ localState->value.reset(v);
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_error(std::exception_ptr e) const {
+ auto localState = state;
+ auto work = [e, localState](const rxsc::schedulable&) {
+ localState->dest.on_error(e);
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_completed() const {
+ auto localState = state;
+ auto work = [localState](const rxsc::schedulable&) {
+ localState->dest.on_completed();
+ };
+ auto selectedWork = on_exception(
+ [&](){ return localState->coordinator.act(work); },
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) {
+ auto cs = composite_subscription();
+ auto coordinator = v.coordination.create_coordinator();
+
+ return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
+ return sample_with_time_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class Duration, class Coordination>
+class sample_with_time_factory
+{
+ typedef rxu::decay_t<Duration> duration_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ duration_type period;
+ coordination_type coordination;
+public:
+ sample_with_time_factory(duration_type p, coordination_type c) : period(p), coordination(c) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(sample_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(sample_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination));
+ }
+};
+
+}
+
+template<class Duration, class Coordination>
+inline auto sample_with_time(Duration period, Coordination coordination)
+ -> detail::sample_with_time_factory<Duration, Coordination> {
+ return detail::sample_with_time_factory<Duration, Coordination>(period, coordination);
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index fe5f2d8..fcbc377 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -2556,6 +2556,45 @@ public:
rxo::detail::scan<T, this_type, Accumulator, Seed>(*this, std::forward<Accumulator>(a), seed));
}
+ /*! Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.
+
+ \param period the period of time to sample the source observable.
+ \param coordination the scheduler for the items.
+
+ \return Observable that emits the most recently emitted item since the previous sampling.
+
+ \sample
+ \snippet sample.cpp sample period sample
+ \snippet output.txt sample period sample
+ */
+ template<class Coordination,
+ class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type>
+ auto sample_with_time(rxsc::scheduler::clock_type::duration period, Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, coordination)))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, coordination));
+ }
+
+ /*! Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.
+
+ \param period the period of time to sample the source observable.
+
+ \return Observable that emits the most recently emitted item since the previous sampling.
+
+ \sample
+ \snippet sample.cpp sample period sample
+ \snippet output.txt sample period sample
+ */
+ auto sample_with_time(rxsc::scheduler::clock_type::duration period) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, identity_current_thread())))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, identity_current_thread()));
+ }
+
/*! Make new observable with skipped first count items from this observable.
\tparam Count the type of the items counter
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index f8f112d..90b3e29 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -62,6 +62,7 @@ namespace rxo=operators;
#include "operators/rx-repeat.hpp"
#include "operators/rx-replay.hpp"
#include "operators/rx-retry.hpp"
+#include "operators/rx-sample_time.hpp"
#include "operators/rx-scan.hpp"
#include "operators/rx-skip.hpp"
#include "operators/rx-skip_until.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 9fd13fd..57fe3e0 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -50,6 +50,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/repeat.cpp
${TEST_DIR}/operators/replay.cpp
${TEST_DIR}/operators/retry.cpp
+ ${TEST_DIR}/operators/sample.cpp
${TEST_DIR}/operators/scan.cpp
${TEST_DIR}/operators/skip.cpp
${TEST_DIR}/operators/skip_until.cpp
diff --git a/Rx/v2/test/operators/sample.cpp b/Rx/v2/test/operators/sample.cpp
new file mode 100644
index 0000000..5dfc007
--- /dev/null
+++ b/Rx/v2/test/operators/sample.cpp
@@ -0,0 +1,158 @@
+#include "../test.h"
+
+SCENARIO("sample with time, error", "[sample_with_time][operators]"){
+ GIVEN("1 hot observable of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("sample_with_time on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(100, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(280, 4),
+ on.next(320, 5),
+ on.next(350, 6),
+ on.next(380, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.error(600, ex)
+ });
+ WHEN("group ints on intersecting intervals"){
+ using namespace std::chrono;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .sample_with_time(milliseconds(100), so)
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains groups of ints"){
+ auto required = rxu::to_vector({
+ on.next(301, 4),
+ on.next(401, 7),
+ on.next(501, 9),
+ on.error(601, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 600)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("sample with time, disposed", "[sample_with_time][operators]"){
+ GIVEN("1 hot observable of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(100, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(280, 4), //
+ on.next(320, 5),
+ on.next(350, 6),
+ on.next(380, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.completed(600)
+ });
+ WHEN("group ints on intersecting intervals"){
+ using namespace std::chrono;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .sample_with_time(milliseconds(100), so)
+ .as_dynamic();
+ },
+ 370
+ );
+
+ THEN("the output contains groups of ints"){
+ auto required = rxu::to_vector({
+ on.next(301, 4),
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 371)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("sample with time, same", "[sample_with_time][operators]"){
+ GIVEN("1 hot observable of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::vector<int>> v_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(100, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.next(280, 4),
+ on.next(320, 5),
+ on.next(350, 6),
+ on.next(380, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.completed(600)
+ });
+ WHEN("group ints on intervals"){
+ using namespace std::chrono;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .sample_with_time(milliseconds(100), so)
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains groups of ints"){
+ auto required = rxu::to_vector({
+ on.next(301, 4),
+ on.next(401, 7),
+ on.next(501, 9),
+ on.completed(601)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 600)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}