diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-02-26 14:53:20 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-03-01 07:25:31 -0800 |
commit | 0b3e8b7b47a11d03cf66ce8de57bf1da9108ce3b (patch) | |
tree | 428acf7ce9c8e02f6b153801f673e0f0ef968169 /Rx | |
parent | ec76ecc0a55c73d35c7cb0944f1563e039583499 (diff) | |
download | RxCpp-0b3e8b7b47a11d03cf66ce8de57bf1da9108ce3b.tar.gz |
add sample operator
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/examples/doxygen/sample.cpp | 18 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-sample_time.hpp | 201 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 39 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/sample.cpp | 158 |
6 files changed, 418 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/sample.cpp b/Rx/v2/examples/doxygen/sample.cpp new file mode 100644 index 0000000..1782af2 --- /dev/null +++ b/Rx/v2/examples/doxygen/sample.cpp @@ -0,0 +1,18 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("sample period sample") { + printf("//! [sample period sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(2)). + take(7). + sample_with_time(std::chrono::milliseconds(4)); + values. + subscribe( + [](long v) { + printf("OnNext: %ld\n", v); + }, + []() { printf("OnCompleted\n"); }); + printf("//! [sample period sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp new file mode 100644 index 0000000..c014fef --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp @@ -0,0 +1,201 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP) +#define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Duration, class Coordination> +struct sample_with_time +{ + static_assert(std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, "Duration parameter must convert to rxsc::scheduler::clock_type::duration"); + static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination"); + + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Coordination> coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + typedef rxu::decay_t<Duration> duration_type; + + struct sample_with_time_value + { + sample_with_time_value(duration_type p, coordination_type c) + : period(p) + , coordination(c) + { + } + duration_type period; + coordination_type coordination; + }; + sample_with_time_value initial; + + sample_with_time(duration_type period, coordination_type coordination) + : initial(period, coordination) + { + } + + template<class Subscriber> + struct sample_with_time_observer + { + typedef sample_with_time_observer<Subscriber> this_type; + typedef T value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + + struct sample_with_time_subscriber_value : public sample_with_time_value + { + sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c) + : sample_with_time_value(v) + , cs(std::move(cs)) + , dest(std::move(d)) + , coordinator(std::move(c)) + , worker(coordinator.get_worker()) + { + } + composite_subscription cs; + dest_type dest; + coordinator_type coordinator; + rxsc::worker worker; + mutable rxu::maybe<value_type> value; + }; + std::shared_ptr<sample_with_time_subscriber_value> state; + + sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c) + : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c)))) + { + auto localState = state; + + auto disposer = [=](const rxsc::schedulable&){ + localState->cs.unsubscribe(); + localState->dest.unsubscribe(); + localState->worker.unsubscribe(); + }; + auto selectedDisposer = on_exception( + [&](){ return localState->coordinator.act(disposer); }, + localState->dest); + if (selectedDisposer.empty()) { + return; + } + + localState->dest.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + localState->cs.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + + auto produce_sample = [localState](const rxsc::schedulable&) { + if(!localState->value.empty()) { + localState->dest.on_next(*localState->value); + localState->value.reset(); + } + }; + auto selectedProduce = on_exception( + [&](){ return localState->coordinator.act(produce_sample); }, + localState->dest); + if (selectedProduce.empty()) { + return; + } + + state->worker.schedule_periodically( + localState->worker.now(), + localState->period, + [localState, selectedProduce](const rxsc::schedulable&) { + localState->worker.schedule(selectedProduce.get()); + }); + } + + void on_next(T v) const { + auto localState = state; + auto work = [v, localState](const rxsc::schedulable&) { + localState->value.reset(v); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_error(std::exception_ptr e) const { + auto localState = state; + auto work = [e, localState](const rxsc::schedulable&) { + localState->dest.on_error(e); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_completed() const { + auto localState = state; + auto work = [localState](const rxsc::schedulable&) { + localState->dest.on_completed(); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) { + auto cs = composite_subscription(); + auto coordinator = v.coordination.create_coordinator(); + + return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) { + return sample_with_time_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template<class Duration, class Coordination> +class sample_with_time_factory +{ + typedef rxu::decay_t<Duration> duration_type; + typedef rxu::decay_t<Coordination> coordination_type; + + duration_type period; + coordination_type coordination; +public: + sample_with_time_factory(duration_type p, coordination_type c) : period(p), coordination(c) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(sample_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(sample_with_time<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination)); + } +}; + +} + +template<class Duration, class Coordination> +inline auto sample_with_time(Duration period, Coordination coordination) + -> detail::sample_with_time_factory<Duration, Coordination> { + return detail::sample_with_time_factory<Duration, Coordination>(period, coordination); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index fe5f2d8..fcbc377 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -2556,6 +2556,45 @@ public: rxo::detail::scan<T, this_type, Accumulator, Seed>(*this, std::forward<Accumulator>(a), seed)); } + /*! Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals. + + \param period the period of time to sample the source observable. + \param coordination the scheduler for the items. + + \return Observable that emits the most recently emitted item since the previous sampling. + + \sample + \snippet sample.cpp sample period sample + \snippet output.txt sample period sample + */ + template<class Coordination, + class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> + auto sample_with_time(rxsc::scheduler::clock_type::duration period, Coordination coordination) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, coordination))) + /// \endcond + { + return lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, coordination)); + } + + /*! Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals. + + \param period the period of time to sample the source observable. + + \return Observable that emits the most recently emitted item since the previous sampling. + + \sample + \snippet sample.cpp sample period sample + \snippet output.txt sample period sample + */ + auto sample_with_time(rxsc::scheduler::clock_type::duration period) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, identity_current_thread()))) + /// \endcond + { + return lift<T>(rxo::detail::sample_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, identity_current_thread())); + } + /*! Make new observable with skipped first count items from this observable. \tparam Count the type of the items counter diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index f8f112d..90b3e29 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -62,6 +62,7 @@ namespace rxo=operators; #include "operators/rx-repeat.hpp" #include "operators/rx-replay.hpp" #include "operators/rx-retry.hpp" +#include "operators/rx-sample_time.hpp" #include "operators/rx-scan.hpp" #include "operators/rx-skip.hpp" #include "operators/rx-skip_until.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 9fd13fd..57fe3e0 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -50,6 +50,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/repeat.cpp ${TEST_DIR}/operators/replay.cpp ${TEST_DIR}/operators/retry.cpp + ${TEST_DIR}/operators/sample.cpp ${TEST_DIR}/operators/scan.cpp ${TEST_DIR}/operators/skip.cpp ${TEST_DIR}/operators/skip_until.cpp diff --git a/Rx/v2/test/operators/sample.cpp b/Rx/v2/test/operators/sample.cpp new file mode 100644 index 0000000..5dfc007 --- /dev/null +++ b/Rx/v2/test/operators/sample.cpp @@ -0,0 +1,158 @@ +#include "../test.h" + +SCENARIO("sample with time, error", "[sample_with_time][operators]"){ + GIVEN("1 hot observable of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("sample_with_time on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(100, 1), + on.next(210, 2), + on.next(240, 3), + on.next(280, 4), + on.next(320, 5), + on.next(350, 6), + on.next(380, 7), + on.next(420, 8), + on.next(470, 9), + on.error(600, ex) + }); + WHEN("group ints on intersecting intervals"){ + using namespace std::chrono; + + auto res = w.start( + [&]() { + return xs + .sample_with_time(milliseconds(100), so) + .as_dynamic(); + } + ); + + THEN("the output contains groups of ints"){ + auto required = rxu::to_vector({ + on.next(301, 4), + on.next(401, 7), + on.next(501, 9), + on.error(601, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 600) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("sample with time, disposed", "[sample_with_time][operators]"){ + GIVEN("1 hot observable of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(100, 1), + on.next(210, 2), + on.next(240, 3), + on.next(280, 4), // + on.next(320, 5), + on.next(350, 6), + on.next(380, 7), + on.next(420, 8), + on.next(470, 9), + on.completed(600) + }); + WHEN("group ints on intersecting intervals"){ + using namespace std::chrono; + + auto res = w.start( + [&]() { + return xs + .sample_with_time(milliseconds(100), so) + .as_dynamic(); + }, + 370 + ); + + THEN("the output contains groups of ints"){ + auto required = rxu::to_vector({ + on.next(301, 4), + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 371) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("sample with time, same", "[sample_with_time][operators]"){ + GIVEN("1 hot observable of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::vector<int>> v_on; + + auto xs = sc.make_hot_observable({ + on.next(100, 1), + on.next(210, 2), + on.next(240, 3), + on.next(280, 4), + on.next(320, 5), + on.next(350, 6), + on.next(380, 7), + on.next(420, 8), + on.next(470, 9), + on.completed(600) + }); + WHEN("group ints on intervals"){ + using namespace std::chrono; + + auto res = w.start( + [&]() { + return xs + .sample_with_time(milliseconds(100), so) + .as_dynamic(); + } + ); + + THEN("the output contains groups of ints"){ + auto required = rxu::to_vector({ + on.next(301, 4), + on.next(401, 7), + on.next(501, 9), + on.completed(601) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 600) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} |