summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-03-05 21:22:56 -0800
committerKirk Shoop <kirk.shoop@microsoft.com>2016-03-05 21:22:56 -0800
commit375bed59885132334de28c1e947a7ba8c087adc9 (patch)
tree735afbb6bd750d529c6cd1ea2f945ebbe25cdeb6 /Rx
parentaa3c334ca0720a65fb481558047ae636dbbd7733 (diff)
downloadRxCpp-375bed59885132334de28c1e947a7ba8c087adc9.tar.gz
add delay operator
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/delay.cpp51
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-delay.hpp185
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp78
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/delay.cpp165
6 files changed, 464 insertions, 17 deletions
diff --git a/Rx/v2/examples/doxygen/delay.cpp b/Rx/v2/examples/doxygen/delay.cpp
new file mode 100644
index 0000000..b44b67d
--- /dev/null
+++ b/Rx/v2/examples/doxygen/delay.cpp
@@ -0,0 +1,51 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("delay period+coordination sample"){
+ printf("//! [delay period+coordination sample]\n");
+ using namespace std::chrono;
+ auto scheduler = rxcpp::identity_current_thread();
+ auto start = scheduler.now();
+ auto period = milliseconds(10);
+ const auto next = [=](const char* s) {
+ return [=](long v){
+ auto t = duration_cast<milliseconds>(scheduler.now() - start);
+ printf("[%s @ %lld] OnNext: %ld\n", s, t.count(), v);
+ };
+ };
+ auto values = rxcpp::observable<>::interval(start, period, scheduler).
+ take(4).
+ tap(next("interval")).
+ delay(period, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ next(" delayed"),
+ [](){printf("OnCompleted\n");});
+ printf("//! [delay period+coordination sample]\n");
+}
+
+SCENARIO("delay period sample"){
+ printf("//! [delay period sample]\n");
+ using namespace std::chrono;
+ auto scheduler = rxcpp::identity_current_thread();
+ auto start = scheduler.now();
+ auto period = milliseconds(10);
+ const auto next = [=](const char* s) {
+ return [=](long v){
+ auto t = duration_cast<milliseconds>(scheduler.now() - start);
+ printf("[%s @ %lld] OnNext: %ld\n", s, t.count(), v);
+ };
+ };
+ auto values = rxcpp::observable<>::interval(start, period, scheduler).
+ take(4).
+ tap(next("interval")).
+ delay(period);
+ values.
+ subscribe(
+ next(" delayed"),
+ [](){printf("OnCompleted\n");});
+ printf("//! [delay period sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-delay.hpp b/Rx/v2/src/rxcpp/operators/rx-delay.hpp
new file mode 100644
index 0000000..ec011ff
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-delay.hpp
@@ -0,0 +1,185 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_delay_HPP)
+#define RXCPP_OPERATORS_RX_delay_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Duration, class Coordination>
+struct delay
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+ typedef typename coordination_type::coordinator_type coordinator_type;
+ typedef rxu::decay_t<Duration> duration_type;
+
+ struct delay_values
+ {
+ delay_values(duration_type p, coordination_type c)
+ : period(p)
+ , coordination(c)
+ {
+ }
+ duration_type period;
+ coordination_type coordination;
+ };
+ delay_values initial;
+
+ delay(duration_type period, coordination_type coordination)
+ : initial(period, coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct delay_observer
+ {
+ typedef delay_observer<Subscriber> this_type;
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<T, this_type> observer_type;
+
+ struct delay_subscriber_values : public delay_values
+ {
+ delay_subscriber_values(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
+ : delay_values(v)
+ , cs(std::move(cs))
+ , dest(std::move(d))
+ , coordinator(std::move(c))
+ , worker(coordinator.get_worker())
+ , expected(worker.now())
+ {
+ }
+ composite_subscription cs;
+ dest_type dest;
+ coordinator_type coordinator;
+ rxsc::worker worker;
+ rxsc::scheduler::clock_type::time_point expected;
+ };
+ std::shared_ptr<delay_subscriber_values> state;
+
+ delay_observer(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
+ : state(std::make_shared<delay_subscriber_values>(delay_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
+ {
+ auto localState = state;
+
+ auto disposer = [=](const rxsc::schedulable&){
+ localState->cs.unsubscribe();
+ localState->dest.unsubscribe();
+ localState->worker.unsubscribe();
+ };
+ auto selectedDisposer = on_exception(
+ [&](){return localState->coordinator.act(disposer);},
+ localState->dest);
+ if (selectedDisposer.empty()) {
+ return;
+ }
+
+ localState->dest.add([=](){
+ localState->worker.schedule(selectedDisposer.get());
+ });
+ localState->cs.add([=](){
+ localState->worker.schedule(localState->worker.now() + localState->period, selectedDisposer.get());
+ });
+ }
+
+ void on_next(T v) const {
+ auto localState = state;
+ auto work = [v, localState](const rxsc::schedulable&){
+ localState->dest.on_next(v);
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
+ }
+
+ void on_error(std::exception_ptr e) const {
+ auto localState = state;
+ auto work = [e, localState](const rxsc::schedulable&){
+ localState->dest.on_error(e);
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_completed() const {
+ auto localState = state;
+ auto work = [localState](const rxsc::schedulable&){
+ localState->dest.on_completed();
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, delay_values v) {
+ auto cs = composite_subscription();
+ auto coordinator = v.coordination.create_coordinator();
+
+ return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(delay_observer<Subscriber>::make(std::move(dest), initial)) {
+ return delay_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class Duration, class Coordination>
+class delay_factory
+{
+ typedef rxu::decay_t<Duration> duration_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ duration_type period;
+ coordination_type coordination;
+public:
+ delay_factory(duration_type p, coordination_type c) : period(p), coordination(c) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(delay<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(delay<rxu::value_type_t<rxu::decay_t<Observable>>, Duration, Coordination>(period, coordination));
+ }
+};
+
+}
+
+template<class Duration, class Coordination>
+inline auto delay(Duration period, Coordination coordination)
+ -> detail::delay_factory<Duration, Coordination> {
+ return detail::delay_factory<Duration, Coordination>(period, coordination);
+}
+
+template<class Duration>
+inline auto delay(Duration period)
+ -> detail::delay_factory<Duration, identity_one_worker> {
+ return detail::delay_factory<Duration, identity_one_worker>(period, identity_current_thread());
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 68ff550..bbc0274 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -838,7 +838,51 @@ public:
return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)));
}
- /*! For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
+ /*! Return an observable that emits each item emitted by the source observable after the specified delay.
+
+ \tparam Duration the type of time interval
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each item is delayed
+ \param coordination the scheduler for the delays
+
+ \return Observable that emits each item emitted by the source observable after the specified delay.
+
+ \sample
+ \snippet delay.cpp delay period+coordination sample
+ \snippet output.txt delay period+coordination sample
+ */
+ template<class Duration, class Coordination>
+ auto delay(Duration period, Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::delay<T, Duration, Coordination>(period, coordination)))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::delay<T, Duration, Coordination>(period, coordination));
+ }
+
+ /*! Return an observable that emits each item emitted by the source observable after the specified delay.
+
+ \tparam Duration the type of time interval
+
+ \param period the period of time each item is delayed
+
+ \return Observable that emits each item emitted by the source observable after the specified delay.
+
+ \sample
+ \snippet delay.cpp delay period sample
+ \snippet output.txt delay period sample
+ */
+ template<class Duration>
+ auto delay(Duration period) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::delay<T, Duration, identity_one_worker>(period, identity_current_thread())))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::delay<T, Duration, identity_one_worker>(period, identity_current_thread()));
+ }
+
+ /*! For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
\return Observable that emits those items from the source observable that are distinct.
@@ -888,7 +932,7 @@ public:
return lift<T>(rxo::detail::element_at<T>(index));
}
- /*! Rerurn an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
+ /*! Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
\param count the maximum size of each window before it should be completed
@@ -906,7 +950,7 @@ public:
return lift<observable<T>>(rxo::detail::window<T>(count, count));
}
- /*! Rerurn an observable that emits windows every skip items containing at most count items from the source observable.
+ /*! Return an observable that emits windows every skip items containing at most count items from the source observable.
\param count the maximum size of each window before it should be completed
\param skip how many items need to be skipped before starting a new window
@@ -925,7 +969,7 @@ public:
return lift<observable<T>>(rxo::detail::window<T>(count, skip));
}
- /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+ /*! Return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
\tparam Duration the type of time intervals
\tparam Coordination the type of the scheduler
@@ -949,7 +993,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination));
}
- /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable.
+ /*! Return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable.
\tparam Duration the type of time intervals
@@ -971,7 +1015,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()));
}
- /*! Rerurn an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+ /*! Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
\tparam Duration the type of time intervals
\tparam Coordination the type of the scheduler
@@ -994,7 +1038,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination));
}
- /*! Rerurn an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
+ /*! Return an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
\tparam Duration the type of time intervals
@@ -1015,7 +1059,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()));
}
- /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
+ /*! Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
\tparam Duration the type of time intervals
\tparam Coordination the type of the scheduler
@@ -1039,7 +1083,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination));
}
- /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
+ /*! Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
\tparam Duration the type of time intervals
@@ -1061,7 +1105,7 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()));
}
- /*! Rerurn an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
+ /*! Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
\param count the maximum size of each buffer before it should be emitted
@@ -1079,7 +1123,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count));
}
- /*! Rerurn an observable that emits buffers every skip items containing at most count items from the source observable.
+ /*! Return an observable that emits buffers every skip items containing at most count items from the source observable.
\param count the maximum size of each buffers before it should be emitted
\param skip how many items need to be skipped before starting a new buffers
@@ -1098,7 +1142,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip));
}
- /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
+ /*! Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
\tparam Coordination the type of the scheduler
@@ -1121,7 +1165,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination));
}
- /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer.
+ /*! Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer.
\param period the period of time each buffer collects items before it is emitted
\param skip the period of time after which a new buffer will be created
@@ -1148,7 +1192,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread()));
}
- /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
+ /*! Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
\tparam Coordination the type of the scheduler
@@ -1171,7 +1215,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination));
}
- /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer.
+ /*! Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer.
\param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
@@ -1189,7 +1233,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread()));
}
- /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
+ /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
\tparam Coordination the type of the scheduler
@@ -1212,7 +1256,7 @@ public:
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination));
}
- /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
+ /*! Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
\param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
\param count the maximum size of each buffer before it is emitted and new buffer is created
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index b6b5518..81e5b1f 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -42,6 +42,7 @@ namespace rxo=operators;
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
+#include "operators/rx-delay.hpp"
#include "operators/rx-distinct.hpp"
#include "operators/rx-distinct_until_changed.hpp"
#include "operators/rx-element_at.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 39558e4..43a942d 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/combine_latest.cpp
${TEST_DIR}/operators/concat.cpp
${TEST_DIR}/operators/concat_map.cpp
+ ${TEST_DIR}/operators/delay.cpp
${TEST_DIR}/operators/distinct.cpp
${TEST_DIR}/operators/distinct_until_changed.cpp
${TEST_DIR}/operators/element_at.cpp
diff --git a/Rx/v2/test/operators/delay.cpp b/Rx/v2/test/operators/delay.cpp
new file mode 100644
index 0000000..8ec13bb
--- /dev/null
+++ b/Rx/v2/test/operators/delay.cpp
@@ -0,0 +1,165 @@
+#include "../test.h"
+
+using namespace std::chrono;
+
+SCENARIO("delay - never", "[delay][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("values are delayed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.delay(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1001)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("delay - empty", "[delay][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("values are delayed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.delay(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on.completed(260)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("delay - return", "[delay][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("values are delayed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.delay(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains delayed items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(220, 2),
+ on.next(250, 3),
+ on.completed(310)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("delay - throw", "[delay][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("delay on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("values are delayed"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs.delay(milliseconds(10), so);
+ }
+ );
+
+ THEN("the output only contains only error"){
+ auto required = rxu::to_vector({
+ on.error(251, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}