summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@gmail.com>2018-08-05 15:15:24 -0700
committerKirk Shoop <kirk.shoop@gmail.com>2018-08-05 16:40:01 -0700
commitb3753b360072a32822c564e288782ed704c7494d (patch)
tree1937e4da27410eeccecce004529fdabf3343cf88
parent1e9312fc7eba8e78719f68d2d6743cf8ecce0e85 (diff)
downloadRxCpp-b3753b360072a32822c564e288782ed704c7494d.tar.gz
fix blocking_observable::subscribe
removes spinning from blocking submit. ran all perf tests on osx without issue. should fix #430 and help with #451
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp45
-rw-r--r--Rx/v2/test/operators/merge_delay_error.cpp6
2 files changed, 10 insertions, 41 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 496db0b..037f375 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -173,28 +173,9 @@ class blocking_observable
-> void {
std::mutex lock;
std::condition_variable wake;
+ bool disposed = false;
std::exception_ptr error;
- struct tracking
- {
- ~tracking()
- {
- if (!disposed || !wakened) std::terminate();
- }
- tracking()
- {
- disposed = false;
- wakened = false;
- false_wakes = 0;
- true_wakes = 0;
- }
- std::atomic_bool disposed;
- std::atomic_bool wakened;
- std::atomic_int false_wakes;
- std::atomic_int true_wakes;
- };
- auto track = std::make_shared<tracking>();
-
auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
// keep any error to rethrow at the end.
@@ -213,31 +194,19 @@ class blocking_observable
auto cs = scbr.get_subscription();
cs.add(
- [&, track](){
- // OSX geting invalid x86 op if notify_one is after the disposed = true
- // presumably because the condition_variable may already have been awakened
- // and is now sitting in a while loop on disposed
+ [&](){
+ std::unique_lock<std::mutex> guard(lock);
wake.notify_one();
- track->disposed = true;
+ disposed = true;
});
- std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
+ std::unique_lock<std::mutex> guard(lock);
wake.wait(guard,
- [&, track](){
- // this is really not good.
- // false wakeups were never followed by true wakeups so..
-
- // anyways this gets triggered before disposed is set now so wait.
- while (!track->disposed) {
- ++track->false_wakes;
- }
- ++track->true_wakes;
- return true;
+ [&](){
+ return disposed;
});
- track->wakened = true;
- if (!track->disposed || !track->wakened) std::terminate();
if (error) {std::rethrow_exception(error);}
}
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index d560b45..b53b884 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000;
//merge_delay_error must work the very same way as `merge()` except the error handling
-SCENARIO("merge completes", "[merge][join][operators]"){
+SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -117,7 +117,7 @@ SCENARIO("merge completes", "[merge][join][operators]"){
}
}
-SCENARIO("variadic merge completes with error", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -211,7 +211,7 @@ SCENARIO("variadic merge completes with error", "[merge][join][operators]"){
}
}
-SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();