diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-06-15 23:07:40 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-06-15 23:07:40 -0700 |
commit | 91150a059f5250238ec264a888f7f9782ff4cf01 (patch) | |
tree | a5a2f89cdffb82fe1318ef25a01567a809229c6a /Rx/v2/src/rxcpp/operators/rx-concat.hpp | |
parent | 71b97f68fa64006f91ea4133a9fadb76b1daf14a (diff) | |
download | RxCpp-91150a059f5250238ec264a888f7f9782ff4cf01.tar.gz |
coordinator refinements
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-concat.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat.hpp | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp index 57f7143..68179be 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp @@ -48,7 +48,7 @@ struct concat void on_subscribe(Subscriber scbr) const { static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); - typedef typename coordinator_type::template get<Subscriber>::type output_type; + typedef Subscriber output_type; struct concat_state_type : public std::enable_shared_from_this<concat_state_type> @@ -86,7 +86,7 @@ struct concat // this subscribe does not share the out subscription // so that when it is unsubscribed the out will continue - selectedSource->subscribe( + auto sinkInner = make_subscriber<value_type>( state->out, collectionLifetime, // on_next @@ -109,6 +109,13 @@ struct concat } } ); + auto selectedSinkInner = on_exception( + [&](){return state->coordinator.out(sinkInner);}, + state->out); + if (selectedSinkInner.empty()) { + return; + } + selectedSource->subscribe(std::move(selectedSinkInner.get())); } composite_subscription sourceLifetime; composite_subscription collectionLifetime; @@ -118,15 +125,9 @@ struct concat }; auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); - auto selectedDest = on_exception( - [&](){return coordinator.out(scbr);}, - scbr); - if (selectedDest.empty()) { - return; - } // take a copy of the values for each subscription - auto state = std::shared_ptr<concat_state_type>(new concat_state_type(initial, std::move(coordinator), std::move(selectedDest.get()))); + auto state = std::shared_ptr<concat_state_type>(new concat_state_type(initial, std::move(coordinator), std::move(scbr))); state->sourceLifetime = composite_subscription(); @@ -144,7 +145,7 @@ struct concat // this subscribe does not share the observer subscription // so that when it is unsubscribed the observer can be called // until the inner subscriptions have finished - source->subscribe( + auto sink = make_subscriber<collection_type>( state->out, state->sourceLifetime, // on_next @@ -166,6 +167,13 @@ struct concat } } ); + auto selectedSink = on_exception( + [&](){return state->coordinator.out(sink);}, + state->out); + if (selectedSink.empty()) { + return; + } + source->subscribe(std::move(selectedSink.get())); } }; |