summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/doxygen/buffer.cpp6
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp116
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp3
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp45
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp7
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp9
-rw-r--r--Rx/v2/test/operators/merge_delay_error.cpp6
-rw-r--r--Rx/v2/test/operators/observe_on.cpp56
9 files changed, 168 insertions, 82 deletions
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
index e88c2ed..58f023a 100644
--- a/Rx/v2/examples/doxygen/buffer.cpp
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -162,7 +162,6 @@ SCENARIO("buffer period sample"){
SCENARIO("buffer period+count+coordination sample"){
printf("//! [buffer period+count+coordination sample]\n");
- auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
@@ -171,7 +170,7 @@ SCENARIO("buffer period+count+coordination sample"){
values.
as_blocking().
subscribe(
- [start](std::vector<long> v){
+ [](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
@@ -184,7 +183,6 @@ SCENARIO("buffer period+count+coordination sample"){
SCENARIO("buffer period+count sample"){
printf("//! [buffer period+count sample]\n");
- auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
@@ -192,7 +190,7 @@ SCENARIO("buffer period+count sample"){
buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
subscribe(
- [start](std::vector<long> v){
+ [](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index f702fba..d1c4ea4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -9,6 +9,7 @@
\tparam KeySelector the type of the key extracting function
\tparam MarbleSelector the type of the element extracting function
\tparam BinaryPredicate the type of the key comparing function
+ \tparam DurationSelector the type of the duration observable function
\param ks a function that extracts the key for each item (optional)
\param ms a function that extracts the return element for each item (optional)
@@ -63,7 +64,7 @@ struct is_group_by_selector_for {
static const bool value = !std::is_same<type, tag_not_valid>::value;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by_traits
{
typedef T source_value_type;
@@ -71,6 +72,7 @@ struct group_by_traits
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
@@ -87,14 +89,15 @@ struct group_by_traits
typedef grouped_observable<key_type, marble_type> grouped_observable_type;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by
{
- typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+ typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
typedef typename traits_type::key_selector_type key_selector_type;
typedef typename traits_type::marble_selector_type marble_selector_type;
typedef typename traits_type::marble_type marble_type;
typedef typename traits_type::predicate_type predicate_type;
+ typedef typename traits_type::duration_selector_type duration_selector_type;
typedef typename traits_type::subject_type subject_type;
typedef typename traits_type::key_type key_type;
@@ -130,21 +133,23 @@ struct group_by
struct group_by_values
{
- group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
mutable key_selector_type keySelector;
mutable marble_selector_type marbleSelector;
mutable predicate_type predicate;
+ mutable duration_selector_type durationSelector;
};
group_by_values initial;
- group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
- : initial(std::move(ks), std::move(ms), std::move(p))
+ group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
+ : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds))
{
}
@@ -206,7 +211,35 @@ struct group_by
}
auto sub = subject_type();
g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
- dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())));
+ auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()));
+ auto durationObs = on_exception(
+ [&](){
+ return this->durationSelector(obs);},
+ [this](rxu::error_ptr e){on_error(e);});
+ if (durationObs.empty()) {
+ return;
+ }
+
+ dest.on_next(obs);
+ composite_subscription duration_sub;
+ auto ssub = state->source_lifetime.add(duration_sub);
+
+ auto expire_state = state;
+ auto expire_dest = g->second;
+ auto expire = [=]() {
+ auto g = expire_state->groups.find(selectedKey.get());
+ if (g != expire_state->groups.end()) {
+ expire_state->groups.erase(g);
+ expire_dest.on_completed();
+ }
+ expire_state->source_lifetime.remove(ssub);
+ };
+ auto robs = durationObs.get().take(1);
+ duration_sub.add(robs.subscribe(
+ [](const typename decltype(robs)::value_type &){},
+ [=](rxu::error_ptr) {expire();},
+ [=](){expire();}
+ ));
}
auto selectedMarble = on_exception(
[&](){
@@ -243,33 +276,36 @@ struct group_by
}
};
-template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
class group_by_factory
{
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
key_selector_type keySelector;
marble_selector_type marbleSelector;
predicate_type predicate;
+ duration_selector_type durationSelector;
public:
- group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
template<class Observable>
struct group_by_factory_traits
{
typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
- typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
- typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
+ typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
+ typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type;
};
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
- return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
+ -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) {
+ return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)));
}
};
@@ -288,61 +324,75 @@ auto group_by(AN&&... an)
template<>
struct member_overload<group_by_tag>
{
- template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
+ class Value = typename Traits::grouped_observable_type>
+ static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds)
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)));
+ }
+
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
- template<class Observable, class KeySelector, class MarbleSelector,
+ template<class Observable, class KeySelector, class MarbleSelector,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable, class KeySelector,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable,
class KeySelector=rxu::detail::take_at<0>,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class Enabled = rxu::enable_if_all_true_type_t<
all_observables<Observable>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o)
- -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class... AN>
static operators::detail::group_by_invalid_t<AN...> member(const AN&...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool");
- }
+ static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable");
+ }
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index c1d59a9..b50b773 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -317,8 +317,7 @@ public:
};
inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
- static observe_on_one_worker r(rxsc::make_run_loop(rl));
- return r;
+ return observe_on_one_worker(rxsc::make_run_loop(rl));
}
inline observe_on_one_worker observe_on_event_loop() {
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 3bbb448..97bcabd 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -173,28 +173,9 @@ class blocking_observable
-> void {
std::mutex lock;
std::condition_variable wake;
+ bool disposed = false;
rxu::error_ptr error;
- struct tracking
- {
- ~tracking()
- {
- if (!disposed || !wakened) std::terminate();
- }
- tracking()
- {
- disposed = false;
- wakened = false;
- false_wakes = 0;
- true_wakes = 0;
- }
- std::atomic_bool disposed;
- std::atomic_bool wakened;
- std::atomic_int false_wakes;
- std::atomic_int true_wakes;
- };
- auto track = std::make_shared<tracking>();
-
auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
// keep any error to rethrow at the end.
@@ -213,31 +194,19 @@ class blocking_observable
auto cs = scbr.get_subscription();
cs.add(
- [&, track](){
- // OSX geting invalid x86 op if notify_one is after the disposed = true
- // presumably because the condition_variable may already have been awakened
- // and is now sitting in a while loop on disposed
+ [&](){
+ std::unique_lock<std::mutex> guard(lock);
wake.notify_one();
- track->disposed = true;
+ disposed = true;
});
- std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
+ std::unique_lock<std::mutex> guard(lock);
wake.wait(guard,
- [&, track](){
- // this is really not good.
- // false wakeups were never followed by true wakeups so..
-
- // anyways this gets triggered before disposed is set now so wait.
- while (!track->disposed) {
- ++track->false_wakes;
- }
- ++track->true_wakes;
- return true;
+ [&](){
+ return disposed;
});
- track->wakened = true;
- if (!track->disposed || !track->wakened) std::terminate();
if (error) {rxu::rethrow_exception(error);}
}
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index 0f239be..fc68979 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -458,12 +458,19 @@ class schedulable : public schedulable_base
public:
~exit_recursed_scope_type()
{
+ if (that != nullptr) {
that->requestor = nullptr;
+ }
}
exit_recursed_scope_type(const recursed_scope_type* that)
: that(that)
{
}
+ exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
+ : that(other.that)
+ {
+ other.that = nullptr;
+ }
};
public:
recursed_scope_type()
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index 9c00469..ee4e53e 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -379,7 +379,7 @@ public:
composite_subscription()
: inner_type()
- , subscription(*static_cast<const inner_type* const>(this))
+ , subscription(*static_cast<const inner_type*>(this))
{
}
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index cd5f39b..9ce455f 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -428,6 +428,15 @@ struct less
{ return std::forward<LHS>(lhs) < std::forward<RHS>(rhs); }
};
+template <class T>
+struct ret
+{
+ template <class LHS>
+ auto operator()(LHS&& ) const
+ -> decltype(T())
+ { return T(); }
+};
+
template<class T = void>
struct equal_to
{
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index 7c7a58d..b53b884 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000;
//merge_delay_error must work the very same way as `merge()` except the error handling
-SCENARIO("merge delay error completes", "[merge][join][operators]"){
+SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -117,7 +117,7 @@ SCENARIO("merge delay error completes", "[merge][join][operators]"){
}
}
-SCENARIO("variadic merge delay error completes with error", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -211,7 +211,7 @@ SCENARIO("variadic merge delay error completes with error", "[merge][join][opera
}
}
-SCENARIO("variadic merge delay error completes with 2 errors", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index 644ab93..ffa85aa 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 100000;
@@ -136,4 +137,57 @@ SCENARIO("stream observe_on", "[observe][observe_on]"){
}
}
-} \ No newline at end of file
+}
+
+class nocompare {
+public:
+ int v;
+};
+
+SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::observe_on_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<nocompare> in;
+ const rxsc::test::messages<int> out;
+
+ auto xs = sc.make_hot_observable({
+ in.next(150, nocompare{1}),
+ in.next(210, nocompare{2}),
+ in.next(240, nocompare{3}),
+ in.completed(300)
+ });
+
+ WHEN("observe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::observe_on(so)
+ | rxo::map([](nocompare v){ return v.v; })
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ out.next(211, 2),
+ out.next(241, 3),
+ out.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ out.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}