diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-19 23:18:54 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-03-19 23:18:54 -0700 |
commit | 3ce422e73b636d17a9189b0096cbb15821a29f8e (patch) | |
tree | 5cb4e49b0f9a852df5cdc2956a911a644e1dbb55 /Rx/v2/test/subjects | |
parent | cb42db9eaa8dc288e42584d1ae892b8b811a28f9 (diff) | |
download | RxCpp-3ce422e73b636d17a9189b0096cbb15821a29f8e.tar.gz |
fix a runtime crash
Diffstat (limited to 'Rx/v2/test/subjects')
-rw-r--r-- | Rx/v2/test/subjects/subject.cpp | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp index efa84e8..364cd4e 100644 --- a/Rx/v2/test/subjects/subject.cpp +++ b/Rx/v2/test/subjects/subject.cpp @@ -74,12 +74,13 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){ std::vector<std::future<int>> f(n); #endif + auto o = sub.get_subscriber(); + for (int i = 0; i < n; i++) { #if RXCPP_SUBJECT_TEST_ASYNC - f[i] = std::async([sub]() { + f[i] = std::async([sub, o]() { auto source = sub.get_observable(); - auto subscription = sub.get_observer(); - while(subscription.is_subscribed()) { + while(o.is_subscribed()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); rx::composite_subscription cs; source.subscribe(cs, [cs](int){ @@ -92,8 +93,6 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){ sub.get_observable().subscribe([c](int){++(*c);}); } - auto o = sub.get_observer(); - auto start = clock::now(); for (int i = 0; i < onnextcalls; i++) { o.on_next(i); @@ -114,10 +113,12 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){ std::vector<std::future<int>> f(n); #endif + auto o = sub.get_subscriber(); + for (int i = 0; i < n; i++) { #if RXCPP_SUBJECT_TEST_ASYNC - f[i] = std::async([sub]() { - while(sub.get_observer().is_subscribed()) { + f[i] = std::async([sub, o]() { + while(o.is_subscribed()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); rx::composite_subscription cs; sub.get_observable().subscribe(cs, [cs](int){ @@ -133,8 +134,6 @@ SCENARIO("subject test", "[hide][subject][subjects][perf]"){ .subscribe([c](int){++(*c);}); } - auto o = sub.get_observer(); - auto start = clock::now(); rxs::range<int>(0, onnextcalls).subscribe(o); auto finish = clock::now(); @@ -180,13 +179,14 @@ SCENARIO("subject - infinite source", "[subject][subjects]"){ WHEN("multicasting an infinite source"){ + auto o = s.get_subscriber(); sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){ s = rxsub::subject<int>(); return rxsc::make_action_empty();}); - sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){ - xs.subscribe(s.get_observer()); return rxsc::make_action_empty();}); - sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){ - s.get_observer().unsubscribe(); return rxsc::make_action_empty();}); + sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){ + xs.subscribe(o); return rxsc::make_action_empty();}); + sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){ + o.unsubscribe(); return rxsc::make_action_empty();}); sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){ s.get_observable().subscribe(results1); return rxsc::make_action_empty();}); @@ -272,13 +272,14 @@ SCENARIO("subject - finite source", "[subject][subjects]"){ WHEN("multicasting an infinite source"){ + auto o = s.get_subscriber(); sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){ s = rxsub::subject<int>(); return rxsc::make_action_empty();}); - sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){ - xs.subscribe(s.get_observer()); return rxsc::make_action_empty();}); - sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){ - s.get_observer().unsubscribe(); return rxsc::make_action_empty();}); + sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){ + xs.subscribe(o); return rxsc::make_action_empty();}); + sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){ + o.unsubscribe(); return rxsc::make_action_empty();}); sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){ s.get_observable().subscribe(results1); return rxsc::make_action_empty();}); @@ -368,12 +369,14 @@ SCENARIO("subject - on_error in source", "[subject][subjects]"){ WHEN("multicasting an infinite source"){ + auto o = s.get_subscriber(); + sc->schedule_absolute(100, [&s](rxsc::action, rxsc::scheduler){ s = rxsub::subject<int>(); return rxsc::make_action_empty();}); - sc->schedule_absolute(200, [&xs, &s](rxsc::action, rxsc::scheduler){ - xs.subscribe(s.get_observer()); return rxsc::make_action_empty();}); - sc->schedule_absolute(1000, [&s](rxsc::action, rxsc::scheduler){ - s.get_observer().unsubscribe(); return rxsc::make_action_empty();}); + sc->schedule_absolute(200, [&xs, &o](rxsc::action, rxsc::scheduler){ + xs.subscribe(o); return rxsc::make_action_empty();}); + sc->schedule_absolute(1000, [&o](rxsc::action, rxsc::scheduler){ + o.unsubscribe(); return rxsc::make_action_empty();}); sc->schedule_absolute(300, [&s, &results1](rxsc::action, rxsc::scheduler){ s.get_observable().subscribe(results1); return rxsc::make_action_empty();}); |