diff options
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/examples/doxygen/buffer.cpp | 6 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 116 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 45 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-scheduler.hpp | 7 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscription.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 9 | ||||
-rw-r--r-- | Rx/v2/test/operators/merge_delay_error.cpp | 6 | ||||
-rw-r--r-- | Rx/v2/test/operators/observe_on.cpp | 56 |
9 files changed, 168 insertions, 82 deletions
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp index e88c2ed..58f023a 100644 --- a/Rx/v2/examples/doxygen/buffer.cpp +++ b/Rx/v2/examples/doxygen/buffer.cpp @@ -162,7 +162,6 @@ SCENARIO("buffer period sample"){ SCENARIO("buffer period+count+coordination sample"){ printf("//! [buffer period+count+coordination sample]\n"); - auto start = std::chrono::steady_clock::now(); auto int1 = rxcpp::observable<>::range(1L, 3L); auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); auto values = int1. @@ -171,7 +170,7 @@ SCENARIO("buffer period+count+coordination sample"){ values. as_blocking(). subscribe( - [start](std::vector<long> v){ + [](std::vector<long> v){ printf("OnNext:"); std::for_each(v.begin(), v.end(), [](long a){ printf(" %ld", a); @@ -184,7 +183,6 @@ SCENARIO("buffer period+count+coordination sample"){ SCENARIO("buffer period+count sample"){ printf("//! [buffer period+count sample]\n"); - auto start = std::chrono::steady_clock::now(); auto int1 = rxcpp::observable<>::range(1L, 3L); auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); auto values = int1. @@ -192,7 +190,7 @@ SCENARIO("buffer period+count sample"){ buffer_with_time_or_count(std::chrono::milliseconds(20), 2); values. subscribe( - [start](std::vector<long> v){ + [](std::vector<long> v){ printf("OnNext:"); std::for_each(v.begin(), v.end(), [](long a){ printf(" %ld", a); diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index f702fba..d1c4ea4 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -9,6 +9,7 @@ \tparam KeySelector the type of the key extracting function \tparam MarbleSelector the type of the element extracting function \tparam BinaryPredicate the type of the key comparing function + \tparam DurationSelector the type of the duration observable function \param ks a function that extracts the key for each item (optional) \param ms a function that extracts the return element for each item (optional) @@ -63,7 +64,7 @@ struct is_group_by_selector_for { static const bool value = !std::is_same<type, tag_not_valid>::value; }; -template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> struct group_by_traits { typedef T source_value_type; @@ -71,6 +72,7 @@ struct group_by_traits typedef rxu::decay_t<KeySelector> key_selector_type; typedef rxu::decay_t<MarbleSelector> marble_selector_type; typedef rxu::decay_t<BinaryPredicate> predicate_type; + typedef rxu::decay_t<DurationSelector> duration_selector_type; static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); @@ -87,14 +89,15 @@ struct group_by_traits typedef grouped_observable<key_type, marble_type> grouped_observable_type; }; -template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> struct group_by { - typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; + typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type; typedef typename traits_type::key_selector_type key_selector_type; typedef typename traits_type::marble_selector_type marble_selector_type; typedef typename traits_type::marble_type marble_type; typedef typename traits_type::predicate_type predicate_type; + typedef typename traits_type::duration_selector_type duration_selector_type; typedef typename traits_type::subject_type subject_type; typedef typename traits_type::key_type key_type; @@ -130,21 +133,23 @@ struct group_by struct group_by_values { - group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } mutable key_selector_type keySelector; mutable marble_selector_type marbleSelector; mutable predicate_type predicate; + mutable duration_selector_type durationSelector; }; group_by_values initial; - group_by(key_selector_type ks, marble_selector_type ms, predicate_type p) - : initial(std::move(ks), std::move(ms), std::move(p)) + group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) + : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds)) { } @@ -206,7 +211,35 @@ struct group_by } auto sub = subject_type(); g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; - dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()))); + auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())); + auto durationObs = on_exception( + [&](){ + return this->durationSelector(obs);}, + [this](rxu::error_ptr e){on_error(e);}); + if (durationObs.empty()) { + return; + } + + dest.on_next(obs); + composite_subscription duration_sub; + auto ssub = state->source_lifetime.add(duration_sub); + + auto expire_state = state; + auto expire_dest = g->second; + auto expire = [=]() { + auto g = expire_state->groups.find(selectedKey.get()); + if (g != expire_state->groups.end()) { + expire_state->groups.erase(g); + expire_dest.on_completed(); + } + expire_state->source_lifetime.remove(ssub); + }; + auto robs = durationObs.get().take(1); + duration_sub.add(robs.subscribe( + [](const typename decltype(robs)::value_type &){}, + [=](rxu::error_ptr) {expire();}, + [=](){expire();} + )); } auto selectedMarble = on_exception( [&](){ @@ -243,33 +276,36 @@ struct group_by } }; -template<class KeySelector, class MarbleSelector, class BinaryPredicate> +template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector> class group_by_factory { typedef rxu::decay_t<KeySelector> key_selector_type; typedef rxu::decay_t<MarbleSelector> marble_selector_type; typedef rxu::decay_t<BinaryPredicate> predicate_type; + typedef rxu::decay_t<DurationSelector> duration_selector_type; key_selector_type keySelector; marble_selector_type marbleSelector; predicate_type predicate; + duration_selector_type durationSelector; public: - group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } template<class Observable> struct group_by_factory_traits { typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type; - typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; - typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type; + typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type; + typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type; }; template<class Observable> auto operator()(Observable&& source) - -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) { - return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate))); + -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) { + return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector))); } }; @@ -288,61 +324,75 @@ auto group_by(AN&&... an) template<> struct member_overload<group_by_tag> { - template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector, + class SourceValue = rxu::value_type_t<Observable>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, + class Value = typename Traits::grouped_observable_type> + static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds) + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds))); + } + + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p))); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>())); } - template<class Observable, class KeySelector, class MarbleSelector, + template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class Observable, class KeySelector, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks) - -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class Observable, class KeySelector=rxu::detail::take_at<0>, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>, class Enabled = rxu::enable_if_all_true_type_t< all_observables<Observable>>, class SourceValue = rxu::value_type_t<Observable>, - class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>, + class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o) - -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) { + return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>())); } template<class... AN> static operators::detail::group_by_invalid_t<AN...> member(const AN&...) { std::terminate(); return {}; - static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool"); - } + static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable"); + } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index c1d59a9..b50b773 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -317,8 +317,7 @@ public: }; inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { - static observe_on_one_worker r(rxsc::make_run_loop(rl)); - return r; + return observe_on_one_worker(rxsc::make_run_loop(rl)); } inline observe_on_one_worker observe_on_event_loop() { diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 3bbb448..97bcabd 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -173,28 +173,9 @@ class blocking_observable -> void { std::mutex lock; std::condition_variable wake; + bool disposed = false; rxu::error_ptr error; - struct tracking - { - ~tracking() - { - if (!disposed || !wakened) std::terminate(); - } - tracking() - { - disposed = false; - wakened = false; - false_wakes = 0; - true_wakes = 0; - } - std::atomic_bool disposed; - std::atomic_bool wakened; - std::atomic_int false_wakes; - std::atomic_int true_wakes; - }; - auto track = std::make_shared<tracking>(); - auto dest = make_subscriber<T>(std::forward<ArgN>(an)...); // keep any error to rethrow at the end. @@ -213,31 +194,19 @@ class blocking_observable auto cs = scbr.get_subscription(); cs.add( - [&, track](){ - // OSX geting invalid x86 op if notify_one is after the disposed = true - // presumably because the condition_variable may already have been awakened - // and is now sitting in a while loop on disposed + [&](){ + std::unique_lock<std::mutex> guard(lock); wake.notify_one(); - track->disposed = true; + disposed = true; }); - std::unique_lock<std::mutex> guard(lock); source.subscribe(std::move(scbr)); + std::unique_lock<std::mutex> guard(lock); wake.wait(guard, - [&, track](){ - // this is really not good. - // false wakeups were never followed by true wakeups so.. - - // anyways this gets triggered before disposed is set now so wait. - while (!track->disposed) { - ++track->false_wakes; - } - ++track->true_wakes; - return true; + [&](){ + return disposed; }); - track->wakened = true; - if (!track->disposed || !track->wakened) std::terminate(); if (error) {rxu::rethrow_exception(error);} } diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index 0f239be..fc68979 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -458,12 +458,19 @@ class schedulable : public schedulable_base public: ~exit_recursed_scope_type() { + if (that != nullptr) { that->requestor = nullptr; + } } exit_recursed_scope_type(const recursed_scope_type* that) : that(that) { } + exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT + : that(other.that) + { + other.that = nullptr; + } }; public: recursed_scope_type() diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index 9c00469..ee4e53e 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -379,7 +379,7 @@ public: composite_subscription() : inner_type() - , subscription(*static_cast<const inner_type* const>(this)) + , subscription(*static_cast<const inner_type*>(this)) { } diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index cd5f39b..9ce455f 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -428,6 +428,15 @@ struct less { return std::forward<LHS>(lhs) < std::forward<RHS>(rhs); } }; +template <class T> +struct ret +{ + template <class LHS> + auto operator()(LHS&& ) const + -> decltype(T()) + { return T(); } +}; + template<class T = void> struct equal_to { diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp index 7c7a58d..b53b884 100644 --- a/Rx/v2/test/operators/merge_delay_error.cpp +++ b/Rx/v2/test/operators/merge_delay_error.cpp @@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000; //merge_delay_error must work the very same way as `merge()` except the error handling -SCENARIO("merge delay error completes", "[merge][join][operators]"){ +SCENARIO("merge_delay_error completes", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -117,7 +117,7 @@ SCENARIO("merge delay error completes", "[merge][join][operators]"){ } } -SCENARIO("variadic merge delay error completes with error", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -211,7 +211,7 @@ SCENARIO("variadic merge delay error completes with error", "[merge][join][opera } } -SCENARIO("variadic merge delay error completes with 2 errors", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp index 644ab93..ffa85aa 100644 --- a/Rx/v2/test/operators/observe_on.cpp +++ b/Rx/v2/test/operators/observe_on.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include <rxcpp/operators/rx-take.hpp> +#include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 100000; @@ -136,4 +137,57 @@ SCENARIO("stream observe_on", "[observe][observe_on]"){ } } -}
\ No newline at end of file +} + +class nocompare { +public: + int v; +}; + +SCENARIO("observe_on no-comparison", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::observe_on_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<nocompare> in; + const rxsc::test::messages<int> out; + + auto xs = sc.make_hot_observable({ + in.next(150, nocompare{1}), + in.next(210, nocompare{2}), + in.next(240, nocompare{3}), + in.completed(300) + }); + + WHEN("observe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + | rxo::observe_on(so) + | rxo::map([](nocompare v){ return v.v; }) + | rxo::as_dynamic(); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + out.next(211, 2), + out.next(241, 3), + out.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + out.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |