summaryrefslogtreecommitdiff
path: root/Rx/v2/test/subjects
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-03-19 23:18:54 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-03-19 23:18:54 -0700
commit3ce422e73b636d17a9189b0096cbb15821a29f8e (patch)
tree5cb4e49b0f9a852df5cdc2956a911a644e1dbb55 /Rx/v2/test/subjects
parentcb42db9eaa8dc288e42584d1ae892b8b811a28f9 (diff)
downloadRxCpp-3ce422e73b636d17a9189b0096cbb15821a29f8e.tar.gz
fix a runtime crash
Diffstat (limited to 'Rx/v2/test/subjects')
-rw-r--r--Rx/v2/test/subjects/subject.cpp45
1 files changed, 24 insertions, 21 deletions
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp
index efa84e8..364cd4e 100644
--- a/Rx/v2/test/subjects/subject.cpp
+++ b/Rx/v2/test/subjects/subject.cpp
@@ -74,12 +74,13 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){
std::vector<std::future<int>> f(n);
#endif
+ auto o = sub.get_subscriber();
+
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub]() {
+ f[i] = std::async([sub, o]() {
auto source = sub.get_observable();
- auto subscription = sub.get_observer();
- while(subscription.is_subscribed()) {
+ while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
source.subscribe(cs, [cs](int){
@@ -92,8 +93,6 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){
sub.get_observable().subscribe([c](int){++(*c);});
}
- auto o = sub.get_observer();
-
auto start = clock::now();
for (int i = 0; i < onnextcalls; i++) {
o.on_next(i);
@@ -114,10 +113,12 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){
std::vector<std::future<int>> f(n);
#endif
+ auto o = sub.get_subscriber();
+
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub]() {
- while(sub.get_observer().is_subscribed()) {
+ f[i] = std::async([sub, o]() {
+ while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
sub.get_observable().subscribe(cs, [cs](int){
@@ -133,8 +134,6 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){
.subscribe([c](int){++(*c);});
}
- auto o = sub.get_observer();
-
auto start = clock::now();
rxs::range<int>(0, onnextcalls).subscribe(o);
auto finish = clock::now();
@@ -180,13 +179,14 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){
WHEN("multicasting an infinite source"){
+ auto o = s.get_subscriber();
sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){
s = rxsub::subject<int>(); return rxsc::make_action_empty();});
- sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){
- xs.subscribe(s.get_observer()); return rxsc::make_action_empty();});
- sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){
- s.get_observer().unsubscribe(); return rxsc::make_action_empty();});
+ sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){
+ xs.subscribe(o); return rxsc::make_action_empty();});
+ sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){
+ o.unsubscribe(); return rxsc::make_action_empty();});
sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){
s.get_observable().subscribe(results1); return rxsc::make_action_empty();});
@@ -272,13 +272,14 @@ SCENARIO("subject - finite source", "[subject][subjects]"){
WHEN("multicasting an infinite source"){
+ auto o = s.get_subscriber();
sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){
s = rxsub::subject<int>(); return rxsc::make_action_empty();});
- sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){
- xs.subscribe(s.get_observer()); return rxsc::make_action_empty();});
- sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){
- s.get_observer().unsubscribe(); return rxsc::make_action_empty();});
+ sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){
+ xs.subscribe(o); return rxsc::make_action_empty();});
+ sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){
+ o.unsubscribe(); return rxsc::make_action_empty();});
sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){
s.get_observable().subscribe(results1); return rxsc::make_action_empty();});
@@ -368,12 +369,14 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){
WHEN("multicasting an infinite source"){
+ auto o = s.get_subscriber();
+
sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){
s = rxsub::subject<int>(); return rxsc::make_action_empty();});
- sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){
- xs.subscribe(s.get_observer()); return rxsc::make_action_empty();});
- sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){
- s.get_observer().unsubscribe(); return rxsc::make_action_empty();});
+ sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){
+ xs.subscribe(o); return rxsc::make_action_empty();});
+ sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){
+ o.unsubscribe(); return rxsc::make_action_empty();});
sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){
s.get_observable().subscribe(results1); return rxsc::make_action_empty();});