diff options
author | Igor Murashkin <iam@google.com> | 2018-08-10 17:28:30 -0700 |
---|---|---|
committer | android-build-merger <android-build-merger@google.com> | 2018-08-10 17:28:30 -0700 |
commit | 7227f0fb330c32ead16df49853d47bf1942346c5 (patch) | |
tree | a0b81cf10c973ca175ece7c4d02bde5d5c908429 /Rx/v2/src/rxcpp/rx-connectable_observable.hpp | |
parent | ef3b4d50442673ca87176d9e7b02c046c0cbdd50 (diff) | |
parent | bc2bcecae5d4067d1754b38d61db29be069fa384 (diff) | |
download | RxCpp-7227f0fb330c32ead16df49853d47bf1942346c5.tar.gz |
android: Import RxCpp from upstream am: 1cbf73c952
am: bc2bcecae5
Change-Id: I477ee6ed52d0402e89746e56e27a9879c0670d8b
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-connectable_observable.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp new file mode 100644 index 0000000..7038e24 --- /dev/null +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -0,0 +1,211 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP) +#define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP + +#include "rx-includes.hpp" + +namespace rxcpp { + +namespace detail { + +template<class T> +struct has_on_connect +{ + struct not_void {}; + template<class CT> + static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription())); + template<class CT> + static not_void check(...); + + typedef decltype(check<T>(0)) detail_result; + static const bool value = std::is_same<detail_result, void>::value; +}; + +} + +template<class T> +class dynamic_connectable_observable + : public dynamic_observable<T> +{ + struct state_type + : public std::enable_shared_from_this<state_type> + { + typedef std::function<void(composite_subscription)> onconnect_type; + + onconnect_type on_connect; + }; + std::shared_ptr<state_type> state; + + template<class U> + void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) { + state = o.state; + } + + template<class U> + void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) { + state = std::move(o.state); + } + + template<class SO> + void construct(SO&& source, rxs::tag_source&&) { + auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source)); + state->on_connect = [so](composite_subscription cs) mutable { + so->on_connect(std::move(cs)); + }; + } + +public: + + typedef tag_dynamic_observable dynamic_observable_tag; + + dynamic_connectable_observable() + { + } + + template<class SOF> + explicit dynamic_connectable_observable(SOF sof) + : dynamic_observable<T>(sof) + , state(std::make_shared<state_type>()) + { + construct(std::move(sof), + typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); + } + + template<class SF, class CF> + dynamic_connectable_observable(SF&& sf, CF&& cf) + : dynamic_observable<T>(std::forward<SF>(sf)) + , state(std::make_shared<state_type>()) + { + state->on_connect = std::forward<CF>(cf); + } + + using dynamic_observable<T>::on_subscribe; + + void on_connect(composite_subscription cs) const { + state->on_connect(std::move(cs)); + } +}; + +template<class T, class Source> +connectable_observable<T> make_dynamic_connectable_observable(Source&& s) { + return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s))); +} + + +/*! + \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called. + + \ingroup group-observable + +*/ +template<class T, class SourceOperator> +class connectable_observable + : public observable<T, SourceOperator> +{ + typedef connectable_observable<T, SourceOperator> this_type; + typedef observable<T, SourceOperator> base_type; + typedef rxu::decay_t<SourceOperator> source_operator_type; + + static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)"); + +public: + typedef tag_connectable_observable observable_tag; + + connectable_observable() + { + } + + explicit connectable_observable(const SourceOperator& o) + : base_type(o) + { + } + explicit connectable_observable(SourceOperator&& o) + : base_type(std::move(o)) + { + } + + // implicit conversion between observables of the same value_type + template<class SO> + connectable_observable(const connectable_observable<T, SO>& o) + : base_type(o) + {} + // implicit conversion between observables of the same value_type + template<class SO> + connectable_observable(connectable_observable<T, SO>&& o) + : base_type(std::move(o)) + {} + + /// + /// takes any function that will take this observable and produce a result value. + /// this is intended to allow externally defined operators, that use subscribe, + /// to be connected into the expression. + /// + template<class OperatorFactory> + auto op(OperatorFactory&& of) const + -> decltype(of(*(const this_type*)nullptr)) { + return of(*this); + static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); + } + + /// + /// performs type-forgetting conversion to a new composite_observable + /// + connectable_observable<T> as_dynamic() { + return *this; + } + + composite_subscription connect(composite_subscription cs = composite_subscription()) { + base_type::source_operator.on_connect(cs); + return cs; + } + + /*! @copydoc rx-ref_count.hpp + */ + template<class... AN> + auto ref_count(AN... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond + { + return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); + } + + /*! @copydoc rx-connect_forever.hpp + */ + template<class... AN> + auto connect_forever(AN... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond + { + return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...); + } +}; + + +} + +// +// support range() >> filter() >> subscribe() syntax +// '>>' is spelled 'stream' +// +template<class T, class SourceOperator, class OperatorFactory> +auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); +} + +// +// support range() | filter() | subscribe() syntax +// '|' is spelled 'pipe' +// +template<class T, class SourceOperator, class OperatorFactory> +auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) + -> decltype(source.op(std::forward<OperatorFactory>(of))) { + return source.op(std::forward<OperatorFactory>(of)); +} + +#endif |