diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-04-21 00:53:05 +0300 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-04-21 00:53:05 +0300 |
commit | 8fe51e452f711ca4fbb592502640a04c04a29278 (patch) | |
tree | 89105bd07f275f2096e3b034580d32cd559e8ea2 /Rx/v2 | |
parent | c8bd50a50cc900889fa6bf6ae022c79ee5b5038e (diff) | |
download | RxCpp-8fe51e452f711ca4fbb592502640a04c04a29278.tar.gz |
add timestamp operator
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/examples/doxygen/timestamp.cpp | 51 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-timestamp.hpp | 112 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 39 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/timestamp.cpp | 181 |
6 files changed, 385 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/timestamp.cpp b/Rx/v2/examples/doxygen/timestamp.cpp new file mode 100644 index 0000000..f17e5c0 --- /dev/null +++ b/Rx/v2/examples/doxygen/timestamp.cpp @@ -0,0 +1,51 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("timestamp sample") { + printf("//! [timestamp sample]\n"); + + using namespace std::chrono; + auto values = rxcpp::observable<>::interval(milliseconds(100)) + .timestamp() + .take(3); + values. + subscribe( + [](std::pair<long, typename rxsc::scheduler::clock_type::time_point> v) { printf("OnNext: %ld %lld\n", v.first, static_cast<long long>(v.second.time_since_epoch().count())); }, + [](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + []() { printf("OnCompleted\n"); }); + printf("//! [timestamp sample]\n"); +} + +SCENARIO("timestamp operator syntax sample") { + using namespace rxcpp; + using namespace rxcpp::sources; + using namespace rxcpp::operators; + using namespace std::chrono; + + typedef typename rxcpp::schedulers::scheduler::clock_type::time_point time_point; + + printf("//! [timestamp operator syntax sample]\n"); + auto values = interval(milliseconds(100)) + | timestamp() + | take(3); + values. + subscribe( + [](std::pair<long, time_point> v) { printf("OnNext: %ld %lld\n", v.first, static_cast<long long>(v.second.time_since_epoch().count())); }, + [](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + []() { printf("OnCompleted\n"); }); + printf("//! [timestamp operator syntax sample]\n"); +}
\ No newline at end of file diff --git a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp new file mode 100644 index 0000000..17aff82 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP) +#define RXCPP_OPERATORS_RX_TIMESTAMP_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Coordination> +struct timestamp +{ + static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination"); + + typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<Coordination> coordination_type; + + struct timestamp_values { + timestamp_values(coordination_type c) + : coordination(c) + { + } + + coordination_type coordination; + }; + timestamp_values initial; + + timestamp(coordination_type coordination) + : initial(coordination) + { + } + + template<class Subscriber> + struct timestamp_observer + { + typedef timestamp_observer<Subscriber> this_type; + typedef source_value_type value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + dest_type dest; + coordination_type coord; + + timestamp_observer(dest_type d, coordination_type coordination) + : dest(std::move(d)), + coord(std::move(coordination)) + { + } + + void on_next(source_value_type v) const { + dest.on_next(std::make_pair(v, coord.now())); + } + void on_error(std::exception_ptr e) const { + dest.on_error(e); + } + void on_completed() const { + dest.on_completed(); + } + + static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, timestamp_values v) { + return make_subscriber<value_type>(d, this_type(d, v.coordination)); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(timestamp_observer<Subscriber>::make(std::move(dest), initial)) { + return timestamp_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template <class Coordination> +class timestamp_factory +{ + typedef rxu::decay_t<Coordination> coordination_type; + + coordination_type coordination; +public: + timestamp_factory(coordination_type ct) + : coordination(std::move(ct)) { } + + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))) { + return source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination)); + } + +}; + +} + +template <class Coordination> +inline auto timestamp(Coordination ct) +-> detail::timestamp_factory<Coordination> { + return detail::timestamp_factory<Coordination>(std::move(ct)); +} + +inline auto timestamp() +-> detail::timestamp_factory<identity_one_worker> { + return detail::timestamp_factory<identity_one_worker>(identity_current_thread()); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index cdeb1e0..678db71 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -878,6 +878,45 @@ public: return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread())); } + /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. + + \tparam Coordination the type of the scheduler + + \param coordination the scheduler to manage timeout for each event + + \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. + + \sample + \snippet timestamp.cpp timestamp sample + \snippet output.txt timestamp sample + */ + template<class Coordination> + auto timestamp(Coordination coordination) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination})) + /// \endcond + { + return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination}); + } + + /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. + + \tparam ClockType the type of the clock to return a time_point. + + \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. + + \sample + \snippet timestamp.cpp timestamp sample + \snippet output.txt timestamp sample + */ + auto timestamp() const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()})) + /// \endcond + { + return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()}); + } + /*! Add a new action at the end of the new observable that is returned. \tparam LastCall the type of the action function diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index aebd930..f23d710 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -79,6 +79,7 @@ namespace rxo=operators; #include "operators/rx-take_until.hpp" #include "operators/rx-tap.hpp" #include "operators/rx-timeout.hpp" +#include "operators/rx-timestamp.hpp" #include "operators/rx-with_latest_from.hpp" #include "operators/rx-window.hpp" #include "operators/rx-window_time.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index b900031..ee3f01a 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -66,6 +66,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/take_until.cpp ${TEST_DIR}/operators/tap.cpp ${TEST_DIR}/operators/timeout.cpp + ${TEST_DIR}/operators/timestamp.cpp ${TEST_DIR}/operators/with_latest_from.cpp ${TEST_DIR}/operators/window.cpp ${TEST_DIR}/operators/zip.cpp diff --git a/Rx/v2/test/operators/timestamp.cpp b/Rx/v2/test/operators/timestamp.cpp new file mode 100644 index 0000000..f6ea82b --- /dev/null +++ b/Rx/v2/test/operators/timestamp.cpp @@ -0,0 +1,181 @@ +#include "../test.h" + +using namespace std::chrono; + +SCENARIO("should not emit timestamped items if the source never emits any items", "[timestamp][operators]"){ + GIVEN("a source"){ + typedef typename rxsc::detail::test_type::clock_type::time_point time_point; + + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("timestamp operator is invoked"){ + + auto res = w.start( + [xs]() { + return xs.timestamp(); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<std::pair<int, time_point>>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("should not emit timestamped items if the source observable is empty", "[timestamp][operators]"){ + GIVEN("a source"){ + typedef typename rxsc::detail::test_type::clock_type::time_point time_point; + + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::pair<int, time_point>> on_timestamp; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("timestamp operator is invoked"){ + + auto res = w.start( + [so, xs]() { + return xs.timestamp(); + } + ); + + THEN("the output only contains complete message"){ + auto required = rxu::to_vector({ + on_timestamp.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("should emit timestamped items for every item in the source observable", "[timestamp][operators]"){ + GIVEN("a source"){ + typedef typename rxsc::detail::test_type::clock_type clock_type; + typedef typename clock_type::time_point time_point; + + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::pair<int, time_point>> on_timestamp; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(250) + }); + + WHEN("timestamp operator is invoked"){ + + auto res = w.start( + [so, xs]() { + return xs.timestamp(so); + } + ); + + THEN("the output contains the emitted items while subscribed"){ + auto required = rxu::to_vector({ + on_timestamp.next(210, std::make_pair(2, clock_type::time_point(milliseconds(210)))), + on_timestamp.next(240, std::make_pair(3, clock_type::time_point(milliseconds(240)))), + on_timestamp.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("should emit timestamped items and an error if there is an error", "[timestamp][operators]"){ + GIVEN("a source"){ + typedef typename rxsc::detail::test_type::clock_type clock_type; + typedef typename clock_type::time_point time_point; + + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::pair<int, time_point>> on_timestamp; + + std::runtime_error ex("on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.error(250, ex) + }); + + WHEN("timestamp operator is invoked"){ + + auto res = w.start( + [so, xs]() { + return xs.timestamp(so); + } + ); + + THEN("the output contains emitted items and an error"){ + auto required = rxu::to_vector({ + on_timestamp.next(210, std::make_pair(2, clock_type::time_point(milliseconds(210)))), + on_timestamp.next(240, std::make_pair(3, clock_type::time_point(milliseconds(240)))), + on_timestamp.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |