summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-ref_count.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ref_count.hpp134
1 files changed, 119 insertions, 15 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 55dde05..b68315d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -4,10 +4,26 @@
/*! \file rx-ref_count.hpp
- \brief takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source.
- The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.
+ \brief Make some \c connectable_observable behave like an ordinary \c observable.
+ Uses a reference count of the subscribers to control the connection to the published observable.
- \return An observable that emitting the items from its source.
+ The first subscription will cause a call to \c connect(), and the last \c unsubscribe will unsubscribe the connection.
+
+ There are 2 variants of the operator:
+ \li \c ref_count(): calls \c connect on the \c source \c connectable_observable.
+ \li \c ref_count(other): calls \c connect on the \c other \c connectable_observable.
+
+ \tparam ConnectableObservable the type of the \c other \c connectable_observable (optional)
+ \param other \c connectable_observable to call \c connect on (optional)
+
+ If \c other is omitted, then \c source is used instead (which must be a \c connectable_observable).
+ Otherwise, \c source can be a regular \c observable.
+
+ \return An \c observable that emits the items from its \c source.
+
+ \sample
+ \snippet ref_count.cpp ref_count other diamond sample
+ \snippet output.txt ref_count other diamond sample
*/
#if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
@@ -30,29 +46,100 @@ struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments
};
template<class... AN>
using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
-
-template<class T, class ConnectableObservable>
+
+// ref_count(other) takes a regular observable source, not a connectable_observable.
+// use template specialization to avoid instantiating 'subscribe' for two different types
+// which would cause a compilation error.
+template <typename connectable_type, typename observable_type>
+struct ref_count_state_base {
+ ref_count_state_base(connectable_type other, observable_type source)
+ : connectable(std::move(other))
+ , subscribable(std::move(source)) {}
+
+ connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+ observable_type subscribable; // subscribes to this if non-empty.
+
+ template <typename Subscriber>
+ void subscribe(Subscriber&& o) {
+ subscribable.subscribe(std::forward<Subscriber>(o));
+ }
+};
+
+// Note: explicit specializations have to be at namespace scope prior to C++17.
+template <typename connectable_type>
+struct ref_count_state_base<connectable_type, void> {
+ explicit ref_count_state_base(connectable_type c)
+ : connectable(std::move(c)) {}
+
+ connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+
+ template <typename Subscriber>
+ void subscribe(Subscriber&& o) {
+ connectable.subscribe(std::forward<Subscriber>(o));
+ }
+};
+
+template<class T,
+ class ConnectableObservable,
+ class Observable = void> // note: type order flipped versus the operator.
struct ref_count : public operator_base<T>
{
- typedef rxu::decay_t<ConnectableObservable> source_type;
+ typedef rxu::decay_t<Observable> observable_type;
+ typedef rxu::decay_t<ConnectableObservable> connectable_type;
- struct ref_count_state : public std::enable_shared_from_this<ref_count_state>
+ // ref_count() == false
+ // ref_count(other) == true
+ using has_observable_t = rxu::negation<std::is_same<void, Observable>>;
+ static constexpr bool has_observable_v = has_observable_t::value;
+
+ struct ref_count_state : public std::enable_shared_from_this<ref_count_state>,
+ public ref_count_state_base<ConnectableObservable, Observable>
{
- explicit ref_count_state(source_type o)
- : source(std::move(o))
+ template <class HasObservable = has_observable_t,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ rxu::negation<HasObservable>>>
+ explicit ref_count_state(connectable_type source)
+ : ref_count_state_base<ConnectableObservable, Observable>(std::move(source))
+ , subscribers(0)
+ {
+ }
+
+ template <bool HasObservableV = has_observable_v>
+ ref_count_state(connectable_type other,
+ typename std::enable_if<HasObservableV, observable_type>::type source)
+ : ref_count_state_base<ConnectableObservable, Observable>(std::move(other),
+ std::move(source))
, subscribers(0)
{
}
- source_type source;
std::mutex lock;
long subscribers;
composite_subscription connection;
};
std::shared_ptr<ref_count_state> state;
- explicit ref_count(source_type o)
- : state(std::make_shared<ref_count_state>(std::move(o)))
+ // connectable_observable<T> source = ...;
+ // source.ref_count();
+ //
+ // calls connect on source after the subscribe on source.
+ template <class HasObservable = has_observable_t,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ rxu::negation<HasObservable>>>
+ explicit ref_count(connectable_type source)
+ : state(std::make_shared<ref_count_state>(std::move(source)))
+ {
+ }
+
+ // connectable_observable<?> other = ...;
+ // observable<T> source = ...;
+ // source.ref_count(other);
+ //
+ // calls connect on 'other' after the subscribe on 'source'.
+ template <bool HasObservableV = has_observable_v>
+ ref_count(connectable_type other,
+ typename std::enable_if<HasObservableV, observable_type>::type source)
+ : state(std::make_shared<ref_count_state>(std::move(other), std::move(source)))
{
}
@@ -70,9 +157,9 @@ struct ref_count : public operator_base<T>
keepAlive->connection = composite_subscription();
}
});
- keepAlive->source.subscribe(std::forward<Subscriber>(o));
+ keepAlive->subscribe(std::forward<Subscriber>(o));
if (needConnect) {
- keepAlive->source.connect(keepAlive->connection);
+ keepAlive->connectable.connect(keepAlive->connection);
}
}
};
@@ -104,11 +191,28 @@ struct member_overload<ref_count_tag>
return Result(RefCount(std::forward<ConnectableObservable>(o)));
}
+ template<class Observable,
+ class ConnectableObservable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_connectable_observable<ConnectableObservable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class RefCount = rxo::detail::ref_count<SourceValue,
+ rxu::decay_t<ConnectableObservable>,
+ rxu::decay_t<Observable>>,
+ class Value = rxu::value_type_t<RefCount>,
+ class Result = observable<Value, RefCount>
+ >
+ static Result member(Observable&& o, ConnectableObservable&& other) {
+ return Result(RefCount(std::forward<ConnectableObservable>(other),
+ std::forward<Observable>(o)));
+ }
+
template<class... AN>
static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
+ static_assert(sizeof...(AN) == 10000, "ref_count takes (optional ConnectableObservable)");
}
};