diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-04 19:48:45 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-04 19:48:45 -0700 |
commit | 0a583f0cb92926aa916302f66c8e5a13f7e43ccd (patch) | |
tree | 737b348246448cb8887326da2cc3a357ed3c200d /Rx/v2 | |
parent | 0f12d12602aef386e1c3cdbea4f899144fc50129 (diff) | |
download | RxCpp-0a583f0cb92926aa916302f66c8e5a13f7e43ccd.tar.gz |
add dynamic_connectable_observable
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 129 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 2 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-predef.hpp | 33 | ||||
-rw-r--r-- | Rx/v2/test/operators/publish.cpp | 145 |
4 files changed, 302 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 39590dd..bf63dd0 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -9,14 +9,132 @@ namespace rxcpp { +namespace detail { + +template<class T> +struct has_on_connect +{ + struct not_void {}; + template<class CT> + static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription())); + template<class CT> + static not_void check(...); + + typedef decltype(check<T>(0)) detail_result; + static const bool value = std::is_same<detail_result, void>::value; +}; + +} + +template<class T> +class dynamic_connectable_observable + : public rxs::source_base<T> +{ + struct state_type + : public std::enable_shared_from_this<state_type> + { + typedef std::function<void(subscriber<T>)> onsubscribe_type; + typedef std::function<void(composite_subscription)> onconnect_type; + + onsubscribe_type on_subscribe; + onconnect_type on_connect; + }; + std::shared_ptr<state_type> state; + + template<class U> + void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) { + state = o.state; + } + + template<class U> + void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) { + state = std::move(o.state); + } + + template<class SO> + void construct(SO&& source, rxs::tag_source&&) { + auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source)); + state->on_subscribe = [so](subscriber<T> o) mutable { + so->on_subscribe(std::move(o)); + }; + state->on_connect = [so](composite_subscription cs) mutable { + so->on_connect(std::move(cs)); + }; + } + +public: + + typedef tag_dynamic_observable dynamic_observable_tag; + + dynamic_connectable_observable() + { + } + + template<class SOF> + explicit dynamic_connectable_observable(SOF&& sof) + : state(std::make_shared<state_type>()) + { + construct(std::forward<SOF>(sof), + typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); + } + + template<class SF, class CF> + dynamic_connectable_observable(SF&& sf, CF&& cf) + : state(std::make_shared<state_type>()) + { + state->on_subscribe = std::forward<SF>(sf); + state->on_connect = std::forward<CF>(cf); + } + + void on_subscribe(subscriber<T> o) const { + state->on_subscribe(std::move(o)); + } + + template<class Subscriber> + typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type + on_subscribe(Subscriber&& o) const { + auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o)); + state->on_subscribe(make_subscriber<T>( + *so, + make_observer_dynamic<T>( + // on_next + [so](T t){ + so->on_next(t); + }, + // on_error + [so](std::exception_ptr e){ + so->on_error(e); + }, + // on_completed + [so](){ + so->on_completed(); + }))); + } + + void on_connect(composite_subscription cs) const { + state->on_connect(std::move(cs)); + } +}; + +template<class T, class Source> +connectable_observable<T> make_dynamic_connectable_observable(Source&& s) { + return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s))); +} + + + template<class T, class SourceOperator> class connectable_observable : public observable<T, SourceOperator> { -public: typedef connectable_observable<T, SourceOperator> this_type; - typedef tag_connectable_observable observable_tag; typedef observable<T, SourceOperator> base_type; + typedef typename std::decay<SourceOperator>::type source_operator_type; + + static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)"); + +public: + typedef tag_connectable_observable observable_tag; connectable_observable() { @@ -42,6 +160,13 @@ public: : base_type(std::move(o)) {} + /// + /// performs type-forgetting conversion to a new composite_observable + /// + connectable_observable<T> as_dynamic() { + return *this; + } + composite_subscription connect(composite_subscription cs = composite_subscription()) { base_type::source_operator.on_connect(cs); return cs; diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 10cf825..dd471f7 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -273,13 +273,11 @@ public: /// publish -> /// turns a cold observable hot and allows connections to the source to be independent of subscriptions /// -#if 1 auto publish() const -> connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>> { return connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>>( rxo::detail::publish<T, this_type, rxsub::subject<T>>(*this)); } -#endif /// /// takes any function that will take this observable and produce a result value. diff --git a/Rx/v2/src/rxcpp/rx-predef.hpp b/Rx/v2/src/rxcpp/rx-predef.hpp index 7e6f987..41cb223 100644 --- a/Rx/v2/src/rxcpp/rx-predef.hpp +++ b/Rx/v2/src/rxcpp/rx-predef.hpp @@ -135,11 +135,40 @@ public: static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_observable>::value; }; -struct tag_connectable_observable {}; +struct tag_dynamic_connectable_observable : public tag_dynamic_observable {}; -template<class T, class SourceOperator> +template<class T> +class is_dynamic_connectable_observable +{ + struct not_void {}; + template<class C> + static typename C::dynamic_observable_tag* check(int); + template<class C> + static not_void check(...); +public: + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_dynamic_connectable_observable*>::value; +}; + +template<class T> +class dynamic_connectable_observable; + +template<class T, + class SourceObservable = typename std::conditional<std::is_same<T, void>::value, + void, dynamic_connectable_observable<T>>::type> class connectable_observable; +struct tag_connectable_observable : public tag_observable {}; +template<class T> +class is_connectable_observable +{ + template<class C> + static typename C::observable_tag check(int); + template<class C> + static void check(...); +public: + static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_connectable_observable>::value; +}; + } #endif diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp index 5a0fd2b..9739646 100644 --- a/Rx/v2/test/operators/publish.cpp +++ b/Rx/v2/test/operators/publish.cpp @@ -16,8 +16,28 @@ namespace rxt=rxcpp::test; SCENARIO("publish range", "[range][subject][publish][operators]"){ GIVEN("a range"){ WHEN("published"){ - auto published = rxs::range<int>(0, 1000).publish().connect_now(); + auto published = rxs::range<int>(0, 10).publish(); + std::cout << "subscribe to published" << std::endl; + published.subscribe( + // on_next + [](int v){std::cout << v << ", ";}, + // on_completed + [](){std::cout << " done." << std::endl;}); std::cout << "connect to published" << std::endl; + published.connect(); + } + WHEN("ref_count is used"){ + auto published = rxs::range<int>(0, 10).publish().ref_count(); + std::cout << "subscribe to ref_count" << std::endl; + published.subscribe( + // on_next + [](int v){std::cout << v << ", ";}, + // on_completed + [](){std::cout << " done." << std::endl;}); + } + WHEN("connect_now is used"){ + auto published = rxs::range<int>(0, 10).publish().connect_now(); + std::cout << "subscribe to connect_now" << std::endl; published.subscribe( // on_next [](int v){std::cout << v << ", ";}, @@ -26,3 +46,126 @@ SCENARIO("publish range", "[range][subject][publish][operators]"){ } } } + +SCENARIO("publish", "[publish][multicast][operators]"){ + GIVEN("a test hot observable of longs"){ + auto sc = rxsc::make_test(); + typedef rxsc::test::messages<int> m; + typedef rxn::subscription life; + typedef m::recorded_type record; + auto on_next = m::on_next; + auto on_error = m::on_error; + auto on_completed = m::on_completed; + auto subscribe = m::subscribe; + + long invoked = 0; + + record messages[] = { + on_next(110, 7), + on_next(220, 3), + on_next(280, 4), + on_next(290, 1), + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(410, 13), + on_next(430, 2), + on_next(450, 9), + on_next(520, 11), + on_next(560, 20), + on_completed(600) + }; + auto xs = sc.make_hot_observable(messages); + + auto res = sc.make_subscriber<int>(); + + rx::connectable_observable<int> ys; + + WHEN("subscribed and then connected"){ + + sc.schedule_absolute(rxsc::test::created_time, + [&invoked, &ys, &xs](const rxsc::schedulable& scbl){ + ys = xs.publish().as_dynamic(); + //ys = xs.publish_last().as_dynamic(); + }); + + sc.schedule_absolute(rxsc::test::subscribed_time, + [&ys, &res](const rxsc::schedulable& scbl){ + ys.subscribe(res); + }); + + sc.schedule_absolute(rxsc::test::unsubscribed_time, + [&res](const rxsc::schedulable& scbl){ + res.unsubscribe(); + }); + + { + rx::composite_subscription connection; + + sc.schedule_absolute(300, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(400, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + { + rx::composite_subscription connection; + + sc.schedule_absolute(500, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(550, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + { + rx::composite_subscription connection; + + sc.schedule_absolute(650, + [connection, &ys](const rxsc::schedulable& scbl){ + ys.connect(connection); + }); + sc.schedule_absolute(800, + [connection](const rxsc::schedulable& scbl){ + connection.unsubscribe(); + }); + } + + sc.start(); + + THEN("the output only contains items sent while subscribed"){ + record items[] = { + on_next(340, 8), + on_next(360, 5), + on_next(370, 6), + on_next(390, 7), + on_next(520, 11) + }; + auto required = rxu::to_vector(items); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there were 3 subscription/unsubscription"){ + life items[] = { + subscribe(300, 400), + subscribe(500, 550), + subscribe(650, 800) + }; + auto required = rxu::to_vector(items); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + |