summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp129
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-predef.hpp33
-rw-r--r--Rx/v2/test/operators/publish.cpp145
4 files changed, 302 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 39590dd..bf63dd0 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -9,14 +9,132 @@
namespace rxcpp {
+namespace detail {
+
+template<class T>
+struct has_on_connect
+{
+ struct not_void {};
+ template<class CT>
+ static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
+ template<class CT>
+ static not_void check(...);
+
+ typedef decltype(check<T>(0)) detail_result;
+ static const bool value = std::is_same<detail_result, void>::value;
+};
+
+}
+
+template<class T>
+class dynamic_connectable_observable
+ : public rxs::source_base<T>
+{
+ struct state_type
+ : public std::enable_shared_from_this<state_type>
+ {
+ typedef std::function<void(subscriber<T>)> onsubscribe_type;
+ typedef std::function<void(composite_subscription)> onconnect_type;
+
+ onsubscribe_type on_subscribe;
+ onconnect_type on_connect;
+ };
+ std::shared_ptr<state_type> state;
+
+ template<class U>
+ void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
+ state = o.state;
+ }
+
+ template<class U>
+ void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
+ state = std::move(o.state);
+ }
+
+ template<class SO>
+ void construct(SO&& source, rxs::tag_source&&) {
+ auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
+ state->on_subscribe = [so](subscriber<T> o) mutable {
+ so->on_subscribe(std::move(o));
+ };
+ state->on_connect = [so](composite_subscription cs) mutable {
+ so->on_connect(std::move(cs));
+ };
+ }
+
+public:
+
+ typedef tag_dynamic_observable dynamic_observable_tag;
+
+ dynamic_connectable_observable()
+ {
+ }
+
+ template<class SOF>
+ explicit dynamic_connectable_observable(SOF&& sof)
+ : state(std::make_shared<state_type>())
+ {
+ construct(std::forward<SOF>(sof),
+ typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
+ }
+
+ template<class SF, class CF>
+ dynamic_connectable_observable(SF&& sf, CF&& cf)
+ : state(std::make_shared<state_type>())
+ {
+ state->on_subscribe = std::forward<SF>(sf);
+ state->on_connect = std::forward<CF>(cf);
+ }
+
+ void on_subscribe(subscriber<T> o) const {
+ state->on_subscribe(std::move(o));
+ }
+
+ template<class Subscriber>
+ typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
+ on_subscribe(Subscriber&& o) const {
+ auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
+ state->on_subscribe(make_subscriber<T>(
+ *so,
+ make_observer_dynamic<T>(
+ // on_next
+ [so](T t){
+ so->on_next(t);
+ },
+ // on_error
+ [so](std::exception_ptr e){
+ so->on_error(e);
+ },
+ // on_completed
+ [so](){
+ so->on_completed();
+ })));
+ }
+
+ void on_connect(composite_subscription cs) const {
+ state->on_connect(std::move(cs));
+ }
+};
+
+template<class T, class Source>
+connectable_observable<T> make_dynamic_connectable_observable(Source&& s) {
+ return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
+}
+
+
+
template<class T, class SourceOperator>
class connectable_observable
: public observable<T, SourceOperator>
{
-public:
typedef connectable_observable<T, SourceOperator> this_type;
- typedef tag_connectable_observable observable_tag;
typedef observable<T, SourceOperator> base_type;
+ typedef typename std::decay<SourceOperator>::type source_operator_type;
+
+ static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
+
+public:
+ typedef tag_connectable_observable observable_tag;
connectable_observable()
{
@@ -42,6 +160,13 @@ public:
: base_type(std::move(o))
{}
+ ///
+ /// performs type-forgetting conversion to a new composite_observable
+ ///
+ connectable_observable<T> as_dynamic() {
+ return *this;
+ }
+
composite_subscription connect(composite_subscription cs = composite_subscription()) {
base_type::source_operator.on_connect(cs);
return cs;
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 10cf825..dd471f7 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -273,13 +273,11 @@ public:
/// publish ->
/// turns a cold observable hot and allows connections to the source to be independent of subscriptions
///
-#if 1
auto publish() const
-> connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>> {
return connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>>(
rxo::detail::publish<T, this_type, rxsub::subject<T>>(*this));
}
-#endif
///
/// takes any function that will take this observable and produce a result value.
diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp
index 7e6f987..41cb223 100644
--- a/Rx/v2/src/rxcpp/rx-predef.hpp
+++ b/Rx/v2/src/rxcpp/rx-predef.hpp
@@ -135,11 +135,40 @@ public:
static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_observable>::value;
};
-struct tag_connectable_observable {};
+struct tag_dynamic_connectable_observable : public tag_dynamic_observable {};
-template<class T, class SourceOperator>
+template<class T>
+class is_dynamic_connectable_observable
+{
+ struct not_void {};
+ template<class C>
+ static typename C::dynamic_observable_tag* check(int);
+ template<class C>
+ static not_void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_dynamic_connectable_observable*>::value;
+};
+
+template<class T>
+class dynamic_connectable_observable;
+
+template<class T,
+ class SourceObservable = typename std::conditional<std::is_same<T, void>::value,
+ void, dynamic_connectable_observable<T>>::type>
class connectable_observable;
+struct tag_connectable_observable : public tag_observable {};
+template<class T>
+class is_connectable_observable
+{
+ template<class C>
+ static typename C::observable_tag check(int);
+ template<class C>
+ static void check(...);
+public:
+ static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_connectable_observable>::value;
+};
+
}
#endif
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 5a0fd2b..9739646 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -16,8 +16,28 @@ namespace rxt=rxcpp::test;
SCENARIO("publish range", "[range][subject][publish][operators]"){
GIVEN("a range"){
WHEN("published"){
- auto published = rxs::range<int>(0, 1000).publish().connect_now();
+ auto published = rxs::range<int>(0, 10).publish();
+ std::cout << "subscribe to published" << std::endl;
+ published.subscribe(
+ // on_next
+ [](int v){std::cout << v << ", ";},
+ // on_completed
+ [](){std::cout << " done." << std::endl;});
std::cout << "connect to published" << std::endl;
+ published.connect();
+ }
+ WHEN("ref_count is used"){
+ auto published = rxs::range<int>(0, 10).publish().ref_count();
+ std::cout << "subscribe to ref_count" << std::endl;
+ published.subscribe(
+ // on_next
+ [](int v){std::cout << v << ", ";},
+ // on_completed
+ [](){std::cout << " done." << std::endl;});
+ }
+ WHEN("connect_now is used"){
+ auto published = rxs::range<int>(0, 10).publish().connect_now();
+ std::cout << "subscribe to connect_now" << std::endl;
published.subscribe(
// on_next
[](int v){std::cout << v << ", ";},
@@ -26,3 +46,126 @@ SCENARIO("publish range", "[range][subject][publish][operators]"){
}
}
}
+
+SCENARIO("publish", "[publish][multicast][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto sc = rxsc::make_test();
+ typedef rxsc::test::messages<int> m;
+ typedef rxn::subscription life;
+ typedef m::recorded_type record;
+ auto on_next = m::on_next;
+ auto on_error = m::on_error;
+ auto on_completed = m::on_completed;
+ auto subscribe = m::subscribe;
+
+ long invoked = 0;
+
+ record messages[] = {
+ on_next(110, 7),
+ on_next(220, 3),
+ on_next(280, 4),
+ on_next(290, 1),
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(410, 13),
+ on_next(430, 2),
+ on_next(450, 9),
+ on_next(520, 11),
+ on_next(560, 20),
+ on_completed(600)
+ };
+ auto xs = sc.make_hot_observable(messages);
+
+ auto res = sc.make_subscriber<int>();
+
+ rx::connectable_observable<int> ys;
+
+ WHEN("subscribed and then connected"){
+
+ sc.schedule_absolute(rxsc::test::created_time,
+ [&invoked, &ys, &xs](const rxsc::schedulable& scbl){
+ ys = xs.publish().as_dynamic();
+ //ys = xs.publish_last().as_dynamic();
+ });
+
+ sc.schedule_absolute(rxsc::test::subscribed_time,
+ [&ys, &res](const rxsc::schedulable& scbl){
+ ys.subscribe(res);
+ });
+
+ sc.schedule_absolute(rxsc::test::unsubscribed_time,
+ [&res](const rxsc::schedulable& scbl){
+ res.unsubscribe();
+ });
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(300,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(400,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(500,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(550,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ {
+ rx::composite_subscription connection;
+
+ sc.schedule_absolute(650,
+ [connection, &ys](const rxsc::schedulable& scbl){
+ ys.connect(connection);
+ });
+ sc.schedule_absolute(800,
+ [connection](const rxsc::schedulable& scbl){
+ connection.unsubscribe();
+ });
+ }
+
+ sc.start();
+
+ THEN("the output only contains items sent while subscribed"){
+ record items[] = {
+ on_next(340, 8),
+ on_next(360, 5),
+ on_next(370, 6),
+ on_next(390, 7),
+ on_next(520, 11)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were 3 subscription/unsubscription"){
+ life items[] = {
+ subscribe(300, 400),
+ subscribe(500, 550),
+ subscribe(650, 800)
+ };
+ auto required = rxu::to_vector(items);
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+