summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-04-21 00:53:05 +0300
committerGrigoriy Chudnov <g.chudnov@gmail.com>2016-04-21 00:53:05 +0300
commit8fe51e452f711ca4fbb592502640a04c04a29278 (patch)
tree89105bd07f275f2096e3b034580d32cd559e8ea2 /Rx/v2
parentc8bd50a50cc900889fa6bf6ae022c79ee5b5038e (diff)
downloadRxCpp-8fe51e452f711ca4fbb592502640a04c04a29278.tar.gz
add timestamp operator
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/doxygen/timestamp.cpp51
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timestamp.hpp112
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp39
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/timestamp.cpp181
6 files changed, 385 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/timestamp.cpp b/Rx/v2/examples/doxygen/timestamp.cpp
new file mode 100644
index 0000000..f17e5c0
--- /dev/null
+++ b/Rx/v2/examples/doxygen/timestamp.cpp
@@ -0,0 +1,51 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("timestamp sample") {
+ printf("//! [timestamp sample]\n");
+
+ using namespace std::chrono;
+ auto values = rxcpp::observable<>::interval(milliseconds(100))
+ .timestamp()
+ .take(3);
+ values.
+ subscribe(
+ [](std::pair<long, typename rxsc::scheduler::clock_type::time_point> v) { printf("OnNext: %ld %lld\n", v.first, static_cast<long long>(v.second.time_since_epoch().count())); },
+ [](std::exception_ptr ep) {
+ try {
+ std::rethrow_exception(ep);
+ } catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [timestamp sample]\n");
+}
+
+SCENARIO("timestamp operator syntax sample") {
+ using namespace rxcpp;
+ using namespace rxcpp::sources;
+ using namespace rxcpp::operators;
+ using namespace std::chrono;
+
+ typedef typename rxcpp::schedulers::scheduler::clock_type::time_point time_point;
+
+ printf("//! [timestamp operator syntax sample]\n");
+ auto values = interval(milliseconds(100))
+ | timestamp()
+ | take(3);
+ values.
+ subscribe(
+ [](std::pair<long, time_point> v) { printf("OnNext: %ld %lld\n", v.first, static_cast<long long>(v.second.time_since_epoch().count())); },
+ [](std::exception_ptr ep) {
+ try {
+ std::rethrow_exception(ep);
+ } catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [timestamp operator syntax sample]\n");
+} \ No newline at end of file
diff --git a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
new file mode 100644
index 0000000..17aff82
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
@@ -0,0 +1,112 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP)
+#define RXCPP_OPERATORS_RX_TIMESTAMP_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Coordination>
+struct timestamp
+{
+ static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");
+
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ struct timestamp_values {
+ timestamp_values(coordination_type c)
+ : coordination(c)
+ {
+ }
+
+ coordination_type coordination;
+ };
+ timestamp_values initial;
+
+ timestamp(coordination_type coordination)
+ : initial(coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct timestamp_observer
+ {
+ typedef timestamp_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ coordination_type coord;
+
+ timestamp_observer(dest_type d, coordination_type coordination)
+ : dest(std::move(d)),
+ coord(std::move(coordination))
+ {
+ }
+
+ void on_next(source_value_type v) const {
+ dest.on_next(std::make_pair(v, coord.now()));
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ dest.on_completed();
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, timestamp_values v) {
+ return make_subscriber<value_type>(d, this_type(d, v.coordination));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(timestamp_observer<Subscriber>::make(std::move(dest), initial)) {
+ return timestamp_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template <class Coordination>
+class timestamp_factory
+{
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ coordination_type coordination;
+public:
+ timestamp_factory(coordination_type ct)
+ : coordination(std::move(ct)) { }
+
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))) {
+ return source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination));
+ }
+
+};
+
+}
+
+template <class Coordination>
+inline auto timestamp(Coordination ct)
+-> detail::timestamp_factory<Coordination> {
+ return detail::timestamp_factory<Coordination>(std::move(ct));
+}
+
+inline auto timestamp()
+-> detail::timestamp_factory<identity_one_worker> {
+ return detail::timestamp_factory<identity_one_worker>(identity_current_thread());
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index cdeb1e0..678db71 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -878,6 +878,45 @@ public:
return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread()));
}
+ /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.
+
+ \tparam Coordination the type of the scheduler
+
+ \param coordination the scheduler to manage timeout for each event
+
+ \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
+
+ \sample
+ \snippet timestamp.cpp timestamp sample
+ \snippet output.txt timestamp sample
+ */
+ template<class Coordination>
+ auto timestamp(Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination}))
+ /// \endcond
+ {
+ return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination});
+ }
+
+ /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.
+
+ \tparam ClockType the type of the clock to return a time_point.
+
+ \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
+
+ \sample
+ \snippet timestamp.cpp timestamp sample
+ \snippet output.txt timestamp sample
+ */
+ auto timestamp() const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()}))
+ /// \endcond
+ {
+ return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()});
+ }
+
/*! Add a new action at the end of the new observable that is returned.
\tparam LastCall the type of the action function
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index aebd930..f23d710 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -79,6 +79,7 @@ namespace rxo=operators;
#include "operators/rx-take_until.hpp"
#include "operators/rx-tap.hpp"
#include "operators/rx-timeout.hpp"
+#include "operators/rx-timestamp.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index b900031..ee3f01a 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -66,6 +66,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/take_until.cpp
${TEST_DIR}/operators/tap.cpp
${TEST_DIR}/operators/timeout.cpp
+ ${TEST_DIR}/operators/timestamp.cpp
${TEST_DIR}/operators/with_latest_from.cpp
${TEST_DIR}/operators/window.cpp
${TEST_DIR}/operators/zip.cpp
diff --git a/Rx/v2/test/operators/timestamp.cpp b/Rx/v2/test/operators/timestamp.cpp
new file mode 100644
index 0000000..f6ea82b
--- /dev/null
+++ b/Rx/v2/test/operators/timestamp.cpp
@@ -0,0 +1,181 @@
+#include "../test.h"
+
+using namespace std::chrono;
+
+SCENARIO("should not emit timestamped items if the source never emits any items", "[timestamp][operators]"){
+ GIVEN("a source"){
+ typedef typename rxsc::detail::test_type::clock_type::time_point time_point;
+
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("timestamp operator is invoked"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.timestamp();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<std::pair<int, time_point>>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("should not emit timestamped items if the source observable is empty", "[timestamp][operators]"){
+ GIVEN("a source"){
+ typedef typename rxsc::detail::test_type::clock_type::time_point time_point;
+
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::pair<int, time_point>> on_timestamp;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("timestamp operator is invoked"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timestamp();
+ }
+ );
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on_timestamp.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("should emit timestamped items for every item in the source observable", "[timestamp][operators]"){
+ GIVEN("a source"){
+ typedef typename rxsc::detail::test_type::clock_type clock_type;
+ typedef typename clock_type::time_point time_point;
+
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::pair<int, time_point>> on_timestamp;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(250)
+ });
+
+ WHEN("timestamp operator is invoked"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timestamp(so);
+ }
+ );
+
+ THEN("the output contains the emitted items while subscribed"){
+ auto required = rxu::to_vector({
+ on_timestamp.next(210, std::make_pair(2, clock_type::time_point(milliseconds(210)))),
+ on_timestamp.next(240, std::make_pair(3, clock_type::time_point(milliseconds(240)))),
+ on_timestamp.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("should emit timestamped items and an error if there is an error", "[timestamp][operators]"){
+ GIVEN("a source"){
+ typedef typename rxsc::detail::test_type::clock_type clock_type;
+ typedef typename clock_type::time_point time_point;
+
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::pair<int, time_point>> on_timestamp;
+
+ std::runtime_error ex("on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.error(250, ex)
+ });
+
+ WHEN("timestamp operator is invoked"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.timestamp(so);
+ }
+ );
+
+ THEN("the output contains emitted items and an error"){
+ auto required = rxu::to_vector({
+ on_timestamp.next(210, std::make_pair(2, clock_type::time_point(milliseconds(210)))),
+ on_timestamp.next(240, std::make_pair(3, clock_type::time_point(milliseconds(240)))),
+ on_timestamp.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}