summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-concat.hpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-06-15 23:07:40 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-06-15 23:07:40 -0700
commit91150a059f5250238ec264a888f7f9782ff4cf01 (patch)
treea5a2f89cdffb82fe1318ef25a01567a809229c6a /Rx/v2/src/rxcpp/operators/rx-concat.hpp
parent71b97f68fa64006f91ea4133a9fadb76b1daf14a (diff)
downloadRxCpp-91150a059f5250238ec264a888f7f9782ff4cf01.tar.gz
coordinator refinements
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-concat.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat.hpp28
1 files changed, 18 insertions, 10 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
index 57f7143..68179be 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
@@ -48,7 +48,7 @@ struct concat
void on_subscribe(Subscriber scbr) const {
static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
- typedef typename coordinator_type::template get<Subscriber>::type output_type;
+ typedef Subscriber output_type;
struct concat_state_type
: public std::enable_shared_from_this<concat_state_type>
@@ -86,7 +86,7 @@ struct concat
// this subscribe does not share the out subscription
// so that when it is unsubscribed the out will continue
- selectedSource->subscribe(
+ auto sinkInner = make_subscriber<value_type>(
state->out,
collectionLifetime,
// on_next
@@ -109,6 +109,13 @@ struct concat
}
}
);
+ auto selectedSinkInner = on_exception(
+ [&](){return state->coordinator.out(sinkInner);},
+ state->out);
+ if (selectedSinkInner.empty()) {
+ return;
+ }
+ selectedSource->subscribe(std::move(selectedSinkInner.get()));
}
composite_subscription sourceLifetime;
composite_subscription collectionLifetime;
@@ -118,15 +125,9 @@ struct concat
};
auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
- auto selectedDest = on_exception(
- [&](){return coordinator.out(scbr);},
- scbr);
- if (selectedDest.empty()) {
- return;
- }
// take a copy of the values for each subscription
- auto state = std::shared_ptr<concat_state_type>(new concat_state_type(initial, std::move(coordinator), std::move(selectedDest.get())));
+ auto state = std::shared_ptr<concat_state_type>(new concat_state_type(initial, std::move(coordinator), std::move(scbr)));
state->sourceLifetime = composite_subscription();
@@ -144,7 +145,7 @@ struct concat
// this subscribe does not share the observer subscription
// so that when it is unsubscribed the observer can be called
// until the inner subscriptions have finished
- source->subscribe(
+ auto sink = make_subscriber<collection_type>(
state->out,
state->sourceLifetime,
// on_next
@@ -166,6 +167,13 @@ struct concat
}
}
);
+ auto selectedSink = on_exception(
+ [&](){return state->coordinator.out(sink);},
+ state->out);
+ if (selectedSink.empty()) {
+ return;
+ }
+ source->subscribe(std::move(selectedSink.get()));
}
};